jobs complete

This commit is contained in:
Richard 2023-11-23 20:32:17 +01:00
parent 41d14104d8
commit a659d46629
2 changed files with 64 additions and 18 deletions

57
jobs.go
View File

@ -71,6 +71,7 @@ type JobServer struct {
q chan JobRequest q chan JobRequest
clients map[uint]*JobSession clients map[uint]*JobSession
queues map[string][]QueuedJob queues map[string][]QueuedJob
waiters map[string][]uint
jobs map[uint]*Job jobs map[uint]*Job
} }
@ -82,6 +83,7 @@ func NewJobServer(port uint16) *JobServer {
make(chan JobRequest), make(chan JobRequest),
make(map[uint]*JobSession), make(map[uint]*JobSession),
make(map[string][]QueuedJob), make(map[string][]QueuedJob),
make(map[string][]uint),
make(map[uint]*Job), make(map[uint]*Job),
} }
} }
@ -108,12 +110,33 @@ func NewJobSession(sessionId uint, con net.Conn, backend chan JobRequest) *JobSe
func (s *JobServer) ensureQueue(q string) { func (s *JobServer) ensureQueue(q string) {
_, ok := s.queues[q] _, ok := s.queues[q]
if ok { return } if ! ok {
s.queues[q] = make([]QueuedJob, 0, 5) s.queues[q] = make([]QueuedJob, 0, 5)
}
_, ok = s.waiters[q]
if ! ok {
s.waiters[q] = make([]uint, 0, 5)
}
} }
func (s *JobServer) pushWaiter(q string, j *Job) bool {
for len(s.waiters[q]) > 0 {
sid := s.waiters[q][0]
s.waiters[q] = s.waiters[q][1:]
ses, ok := s.clients[sid]
if ! ok { continue }
if ! ses.isOnline { continue }
j.Worker = sid
s.rspGet(ses, j)
return true
}
return false
}
func (s *JobServer) pushJob(j *Job, q string) { func (s *JobServer) pushJob(j *Job, q string) {
s.ensureQueue(q) s.ensureQueue(q)
if s.pushWaiter(q, j) { return }
qj := QueuedJob{j.Id, j.Pri} qj := QueuedJob{j.Id, j.Pri}
s.queues[q] = InsertJobSorted(s.queues[q], qj) s.queues[q] = InsertJobSorted(s.queues[q], qj)
} }
@ -195,12 +218,23 @@ func (s *JobServer) rspOk(ses *JobSession) {
ses.q <- d ses.q <- d
} }
func (s *JobServer) waitFor(ses *JobSession, qs []string) {
for _, qname := range qs {
s.ensureQueue(qname)
s.waiters[qname] = append(s.waiters[qname], ses.sessionId)
}
}
func (s *JobServer) hdlGet(r JobRequest, ses *JobSession) { func (s *JobServer) hdlGet(r JobRequest, ses *JobSession) {
for { for {
q, ok := s.getBestQueue(r.Queues) q, ok := s.getBestQueue(r.Queues)
if ! ok { if ! ok {
// todo : wait if r.Wait {
s.rspGetNoJob(ses) s.waitFor(ses, r.Queues)
} else {
s.rspGetNoJob(ses)
}
return return
} }
j, err := s.popQueue(q) j, err := s.popQueue(q)
@ -252,11 +286,10 @@ func (s *JobServer) hdlClose(ses *JobSession) {
} }
func (s *JobServer) central(){ func (s *JobServer) central(){
for m := range s.q { for m := range s.q {
ses, ok := s.clients[m.From] ses, ok := s.clients[m.From]
if ! ok { continue } if ! ok { continue }
fmt.Printf("I %+v\n", m) //fmt.Printf("I %+v\n", m)
switch m.Request { switch m.Request {
case "put": s.hdlPut(m, ses) case "put": s.hdlPut(m, ses)
case "get": s.hdlGet(m, ses) case "get": s.hdlGet(m, ses)
@ -301,8 +334,9 @@ func (s *JobServer) processClient(con net.Conn) {
} }
func (s *JobSession) close() { func (s *JobSession) close() {
fmt.Printf("close %d\n", s.sessionId) //fmt.Printf("close %d\n", s.sessionId)
if s.isOnline { if s.isOnline {
s.isOnline = false
m := JobRequest{ m := JobRequest{
s.sessionId, s.sessionId,
"close", "close",
@ -314,9 +348,7 @@ func (s *JobSession) close() {
0, 0,
} }
s.backend <- m s.backend <- m
} }
s.isOnline = false
s.con.Close() s.con.Close()
} }
@ -327,9 +359,6 @@ func ValidatePut(r JobRequest) error {
if r.Queue == "" { if r.Queue == "" {
return fmt.Errorf("missing queue field") return fmt.Errorf("missing queue field")
} }
if r.Pri == 0 {
return fmt.Errorf("missing prio field")
}
return nil return nil
} }
@ -357,9 +386,11 @@ func errorMessage(err error) []byte {
func (s *JobSession) jobReceiver() { func (s *JobSession) jobReceiver() {
defer s.close() defer s.close()
r := bufio.NewReader(s.con) r := bufio.NewReaderSize(s.con, 102400)
msg, err := r.ReadString('\n') msg, err := r.ReadString('\n')
for err == nil { for err == nil {
fmt.Printf("%s", msg)
rq, perr := ParseRequest(msg) rq, perr := ParseRequest(msg)
if perr == nil { if perr == nil {
perr = ValidateRequest(rq) perr = ValidateRequest(rq)

View File

@ -29,9 +29,12 @@ def put(s, q="queue1", pri=123):
return r["id"] return r["id"]
return None return None
def get(s, q=["queue1"]): def get(s, q=["queue1"], wait=False):
j = {"request":"get","queues":["queue1"]} j = {"request":"get","queues":["queue1"], "wait": wait}
snd(s, j) snd(s, j)
if wait:
print("Waiting...")
return None
r = rsp(s) r = rsp(s)
if "id" in r: if "id" in r:
return r["id"] return r["id"]
@ -65,7 +68,6 @@ def test(s, s2):
bull3(s) bull3(s)
put(s) put(s)
jid = get(s) jid = get(s)
print(jid)
get(s) get(s)
abort(s, jid) abort(s, jid)
get(s) get(s)
@ -77,6 +79,19 @@ def test(s, s2):
s.close() s.close()
sleep(1) sleep(1)
get(s2) get(s2)
delete(s2, jid)
def testw(s1, s2):
get(s1, wait=True)
put(s2)
rsp(s1)
test(sock(),sock()) def testw2(s1, s2):
put(s2)
get(s2)
get(s1, wait=True)
sleep(1)
s2.close()
rsp(s1)
testw2(sock(),sock())