diff --git a/jobs.go b/jobs.go new file mode 100644 index 0000000..3ee9b52 --- /dev/null +++ b/jobs.go @@ -0,0 +1,382 @@ +package main + +import ( +"net" +"os" +"fmt" +"bufio" +"encoding/json" +"sort" +) + +func InsertJobSorted(s []QueuedJob, e QueuedJob) []QueuedJob { + + i := sort.Search(len(s), func(i int) bool { return s[i].Pri > e.Pri }) + s = append(s, e) + copy(s[i+1:], s[i:]) + s[i] = e + return s +} + +type ErrorResponse struct{ + Status string `json:"status"` + Error string `json:"error"` +} + +type PutResponse struct { + Status string `json:"status"` + Id uint `json:"id"` +} + +type GetResponse struct { + Status string `json:"status"` + Id uint `json:"id"` + Job interface{} `json:"job"` + Pri uint `json:"pri"` + Queue string `json:"queue"` +} + +type GetErrorResponse struct { + Status string `json:"status"` +} + +type JobRequest struct { + From uint + Request string + Queue string + Pri uint + Job interface{} + Queues []string + Wait bool + Id uint +} + +type Job struct { + Id uint + Details interface{} + Pri uint + Worker uint + Queue string +} + +type QueuedJob struct { + Id uint + Pri uint +} + +type JobServer struct { + port uint16 + numSessions uint + numJobs uint + q chan JobRequest + clients map[uint]*JobSession + queues map[string][]QueuedJob + jobs map[uint]*Job +} + +func NewJobServer(port uint16) *JobServer { + return &JobServer{ + port, + 0, + 0, + make(chan JobRequest), + make(map[uint]*JobSession), + make(map[string][]QueuedJob), + make(map[uint]*Job), + } +} + +type JobSession struct{ + sessionId uint + q chan []byte + backend chan JobRequest + jobs map[uint]bool + con net.Conn + isOnline bool +} + +func NewJobSession(sessionId uint, con net.Conn, backend chan JobRequest) *JobSession { + return &JobSession{ + sessionId, + make(chan []byte), + backend, + make(map[uint]bool), + con, + true, + } +} + +func (s *JobServer) ensureQueue(q string) { + _, ok := s.queues[q] + if ok { return } + s.queues[q] = make([]QueuedJob, 0, 5) +} + +func (s *JobServer) pushJob(j *Job, q string) { + s.ensureQueue(q) + qj := QueuedJob{j.Id, j.Pri} + s.queues[q] = InsertJobSorted(s.queues[q], qj) +} + +func (s *JobServer) rspPut(ses *JobSession, id uint) { + m := PutResponse{"ok", id} + d, _ := json.Marshal(m) + d = append(d, '\n') + ses.q <- d +} + +func (s *JobServer) hdlPut(r JobRequest, ses *JobSession) { + s.numJobs += 1 + j := &Job{s.numJobs, r.Job, r.Pri, 0, r.Queue} + s.jobs[j.Id] = j + s.pushJob(j, r.Queue) + s.rspPut(ses, j.Id) +} + +func (s *JobServer) getBestQueue(qs []string) (string, bool) { + var bestPri uint + var bestQ string + found := false + for qi := range qs { + qname := qs[qi] + q, ok := s.queues[qname] + if ! ok { continue } + if len(q) == 0 { continue } + j := q[len(q) -1] + if found == false || j.Pri >= bestPri { + bestPri = j.Pri + bestQ = qname + } + found = true + } + return bestQ, found +} + +func (s *JobServer) popQueue(qname string) (*Job, error) { + q, ok := s.queues[qname] + if ! ok { return nil, fmt.Errorf( "queue %s not found", qname)} + if len(q)== 0 { return nil, fmt.Errorf("queue %s empty", qname)} + it := q[len(q)-1] + s.queues[qname] = q[:len(q)-1] + j, ok := s.jobs[it.Id] + if ok { + return j, nil + } else { + return nil, fmt.Errorf("job not found") + } +} + +func (s *JobServer) rspError(ses *JobSession, err string) { + m := ErrorResponse{"error", err} + d, _ := json.Marshal(m) + d = append(d, '\n') + ses.q <- d +} + +func (s *JobServer) rspGet(ses *JobSession, j *Job) { + m := GetResponse{"ok", j.Id, j.Details, j.Pri, j.Queue} + d, _ := json.Marshal(m) + ses.jobs[j.Id] = true + d = append(d, '\n') + ses.q <- d +} + +func (s *JobServer) rspGetNoJob(ses *JobSession) { + m := GetErrorResponse{"no-job"} + d, _ := json.Marshal(m) + d = append(d, '\n') + ses.q <- d +} + +func (s *JobServer) rspOk(ses *JobSession) { + m := GetErrorResponse{"ok"} + d, _ := json.Marshal(m) + d = append(d, '\n') + ses.q <- d +} + +func (s *JobServer) hdlGet(r JobRequest, ses *JobSession) { + for { + q, ok := s.getBestQueue(r.Queues) + if ! ok { + // todo : wait + s.rspGetNoJob(ses) + return + } + j, err := s.popQueue(q) + if err != nil { + continue + } + s.rspGet(ses, j) + j.Worker = ses.sessionId + return + } + +} + +func (s *JobServer) hdlAbort(r JobRequest, ses *JobSession) { + j, ok := s.jobs[r.Id] + if ! ok { + s.rspGetNoJob(ses) + return + } + if j.Worker != ses.sessionId { + s.rspError(ses, "not your job") + return + } + if _, ok := ses.jobs[r.Id]; ok { + delete(ses.jobs, r.Id) + } + j.Worker = 0 + s.pushJob(j, j.Queue) + s.rspOk(ses) +} + +func (s *JobServer) hdlDelete(r JobRequest, ses *JobSession) { + _, ok := s.jobs[r.Id] + if ! ok { + s.rspGetNoJob(ses) + return + } + delete(s.jobs, r.Id) + s.rspOk(ses) +} + +func (s *JobServer) hdlClose(ses *JobSession) { + for jid, _ := range ses.jobs { + j, ok := s.jobs[jid] + if ! ok { continue } + j.Worker = 0 + s.pushJob(j, j.Queue) + } +} + +func (s *JobServer) central(){ + + for m := range s.q { + ses, ok := s.clients[m.From] + if ! ok { continue } + fmt.Printf("I %+v\n", m) + switch m.Request { + case "put": s.hdlPut(m, ses) + case "get": s.hdlGet(m, ses) + case "abort": s.hdlAbort(m, ses) + case "delete": s.hdlDelete(m, ses) + case "close": s.hdlClose(ses) + default: s.rspError(ses, "unknown request type") + } + } +} + + +func (s *JobServer) Run() { + go s.central() + addr := fmt.Sprintf("0.0.0.0:%d", s.port) + server, err := net.Listen("tcp", addr) + if err != nil { + fmt.Println("Error listening:", err.Error()) + os.Exit(1) + } + defer server.Close() + fmt.Println("JobServer waiting for client...") + for { + connection, err := server.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + os.Exit(1) + } + fmt.Println("client connected") + s.processClient(connection) + } + close(s.q) +} + +func (s *JobServer) processClient(con net.Conn) { + sessionId := s.numSessions + s.numSessions += 1 + session := NewJobSession( sessionId, con, s.q) + s.clients[sessionId] = session + go session.jobReceiver() + go session.jobSender() +} + +func (s *JobSession) close() { + fmt.Printf("close %d\n", s.sessionId) + if s.isOnline { + m := JobRequest{ + s.sessionId, + "close", + "", + 0, + nil, + []string{}, + false, + 0, + } + s.backend <- m + + } + s.isOnline = false + s.con.Close() +} + +func ValidatePut(r JobRequest) error { + if r.Job == nil { + return fmt.Errorf("missing job field") + } + if r.Queue == "" { + return fmt.Errorf("missing queue field") + } + if r.Pri == 0 { + return fmt.Errorf("missing prio field") + } + return nil +} + +func ValidateRequest(r JobRequest) error { + switch r.Request { + case "put": return ValidatePut(r) + + } + return nil +} +func ParseRequest(msg string) (JobRequest, error) { + var r JobRequest + err := json.Unmarshal([]byte(msg), &r) + if err != nil { return r, err} + //fmt.Printf("%+v\n", r) + return r, nil +} + +func errorMessage(err error) []byte { + m := ErrorResponse{"error", err.Error()} + s, _ := json.Marshal(m) + s = append(s, '\n') + return s +} + +func (s *JobSession) jobReceiver() { + defer s.close() + r := bufio.NewReader(s.con) + msg, err := r.ReadString('\n') + for err == nil { + rq, perr := ParseRequest(msg) + if perr == nil { + perr = ValidateRequest(rq) + } + if perr == nil { + rq.From = s.sessionId + s.backend <- rq + } else { + s.q <- errorMessage(perr) + } + msg, err = r.ReadString('\n') + } +} + +func (s *JobSession) jobSender() { + for m := range s.q { + s.con.Write(m) + } +} + diff --git a/main.go b/main.go index 4de2d32..12297f2 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ type Server interface { func main() { var challenge int - flag.IntVar(&challenge, "challenge",8, "Challenge number") + flag.IntVar(&challenge, "challenge",9, "Challenge number") flag.Parse() var port uint16 @@ -37,6 +37,8 @@ func main() { server = NewReverseServer(port); case 8: server = NewSecureServer(port); + case 9: + server = NewJobServer(port); default: fmt.Printf("Unknown challenge\n") os.Exit(1) diff --git a/tests/jobs.py b/tests/jobs.py new file mode 100644 index 0000000..32dc925 --- /dev/null +++ b/tests/jobs.py @@ -0,0 +1,82 @@ +import socket +from struct import pack, unpack +from time import time, sleep +from binascii import unhexlify +import json + +def sock(): + addr = ("localhost", 13370) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(addr) + return s + +def snd(s, m): + d = json.dumps(m) + print("O", d) + s.sendall(d.encode() +b'\n') + +def rsp(s): + d = s.recv(1024).decode().strip() + print("I", d) + j = json.loads(d) + return j + +def put(s, q="queue1", pri=123): + j = {"request":"put","queue":"queue1","job":{"title": "blah"},"pri":123} + snd(s, j) + r = rsp(s) + if "id" in r: + return r["id"] + return None + +def get(s, q=["queue1"]): + j = {"request":"get","queues":["queue1"]} + snd(s, j) + r = rsp(s) + if "id" in r: + return r["id"] + return None + +def abort(s, i): + j = {"request":"abort","id": i} + snd(s, j) + rsp(s) + +def delete(s, i): + j = {"request":"delete","id": i} + snd(s, j) + rsp(s) + +def bull1(s): + snd(s, "snack") + rsp(s) + +def bull2(s): + snd(s, {}) + rsp(s) + +def bull3(s): + snd(s,{"request":"snack"}) + rsp(s) + +def test(s, s2): + bull1(s) + bull2(s) + bull3(s) + put(s) + jid = get(s) + print(jid) + get(s) + abort(s, jid) + get(s) + delete(s, jid) + abort(s, jid) + jid = put(s) + get(s) + get(s2) + s.close() + sleep(1) + get(s2) + + +test(sock(),sock()) \ No newline at end of file