diff --git a/jobs.go b/jobs.go index 3ee9b52..e21f9fa 100644 --- a/jobs.go +++ b/jobs.go @@ -71,6 +71,7 @@ type JobServer struct { q chan JobRequest clients map[uint]*JobSession queues map[string][]QueuedJob + waiters map[string][]uint jobs map[uint]*Job } @@ -82,6 +83,7 @@ func NewJobServer(port uint16) *JobServer { make(chan JobRequest), make(map[uint]*JobSession), make(map[string][]QueuedJob), + make(map[string][]uint), make(map[uint]*Job), } } @@ -108,12 +110,33 @@ func NewJobSession(sessionId uint, con net.Conn, backend chan JobRequest) *JobSe func (s *JobServer) ensureQueue(q string) { _, ok := s.queues[q] - if ok { return } - s.queues[q] = make([]QueuedJob, 0, 5) + if ! ok { + s.queues[q] = make([]QueuedJob, 0, 5) + } + _, ok = s.waiters[q] + if ! ok { + s.waiters[q] = make([]uint, 0, 5) + } } +func (s *JobServer) pushWaiter(q string, j *Job) bool { + + for len(s.waiters[q]) > 0 { + sid := s.waiters[q][0] + s.waiters[q] = s.waiters[q][1:] + ses, ok := s.clients[sid] + if ! ok { continue } + if ! ses.isOnline { continue } + j.Worker = sid + s.rspGet(ses, j) + return true + } + return false + +} func (s *JobServer) pushJob(j *Job, q string) { s.ensureQueue(q) + if s.pushWaiter(q, j) { return } qj := QueuedJob{j.Id, j.Pri} s.queues[q] = InsertJobSorted(s.queues[q], qj) } @@ -195,12 +218,23 @@ func (s *JobServer) rspOk(ses *JobSession) { ses.q <- d } +func (s *JobServer) waitFor(ses *JobSession, qs []string) { + for _, qname := range qs { + s.ensureQueue(qname) + s.waiters[qname] = append(s.waiters[qname], ses.sessionId) + } + +} + func (s *JobServer) hdlGet(r JobRequest, ses *JobSession) { for { q, ok := s.getBestQueue(r.Queues) if ! ok { - // todo : wait - s.rspGetNoJob(ses) + if r.Wait { + s.waitFor(ses, r.Queues) + } else { + s.rspGetNoJob(ses) + } return } j, err := s.popQueue(q) @@ -252,11 +286,10 @@ func (s *JobServer) hdlClose(ses *JobSession) { } func (s *JobServer) central(){ - for m := range s.q { ses, ok := s.clients[m.From] if ! ok { continue } - fmt.Printf("I %+v\n", m) + //fmt.Printf("I %+v\n", m) switch m.Request { case "put": s.hdlPut(m, ses) case "get": s.hdlGet(m, ses) @@ -301,8 +334,9 @@ func (s *JobServer) processClient(con net.Conn) { } func (s *JobSession) close() { - fmt.Printf("close %d\n", s.sessionId) + //fmt.Printf("close %d\n", s.sessionId) if s.isOnline { + s.isOnline = false m := JobRequest{ s.sessionId, "close", @@ -314,9 +348,7 @@ func (s *JobSession) close() { 0, } s.backend <- m - } - s.isOnline = false s.con.Close() } @@ -327,9 +359,6 @@ func ValidatePut(r JobRequest) error { if r.Queue == "" { return fmt.Errorf("missing queue field") } - if r.Pri == 0 { - return fmt.Errorf("missing prio field") - } return nil } @@ -357,9 +386,11 @@ func errorMessage(err error) []byte { func (s *JobSession) jobReceiver() { defer s.close() - r := bufio.NewReader(s.con) + r := bufio.NewReaderSize(s.con, 102400) msg, err := r.ReadString('\n') + for err == nil { + fmt.Printf("%s", msg) rq, perr := ParseRequest(msg) if perr == nil { perr = ValidateRequest(rq) diff --git a/tests/jobs.py b/tests/jobs.py index 32dc925..2e111b1 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -29,9 +29,12 @@ def put(s, q="queue1", pri=123): return r["id"] return None -def get(s, q=["queue1"]): - j = {"request":"get","queues":["queue1"]} +def get(s, q=["queue1"], wait=False): + j = {"request":"get","queues":["queue1"], "wait": wait} snd(s, j) + if wait: + print("Waiting...") + return None r = rsp(s) if "id" in r: return r["id"] @@ -65,7 +68,6 @@ def test(s, s2): bull3(s) put(s) jid = get(s) - print(jid) get(s) abort(s, jid) get(s) @@ -77,6 +79,19 @@ def test(s, s2): s.close() sleep(1) get(s2) + delete(s2, jid) - -test(sock(),sock()) \ No newline at end of file +def testw(s1, s2): + get(s1, wait=True) + put(s2) + rsp(s1) + +def testw2(s1, s2): + put(s2) + get(s2) + get(s1, wait=True) + sleep(1) + s2.close() + rsp(s1) + +testw2(sock(),sock()) \ No newline at end of file