Compare commits

..

10 Commits

Author SHA1 Message Date
Richard
4eaf1f34cd pest done 2023-11-30 21:26:30 +01:00
Richard
00fa90c6db pest first tests passing 2023-11-30 06:13:57 +01:00
Richard
fc6585ee30 pest: client interaction complete 2023-11-28 21:07:48 +01:00
Richard
f0b7401ed7 pest initial setup 2023-11-26 21:19:00 +01:00
Richard
b8f2d5d842 vcs done 2023-11-26 12:36:03 +01:00
Richard
b09ba5740a vcs first implementation
Put working get and list todo
2023-11-25 22:05:25 +01:00
Richard
d3ff302e7a vcs spec and setup 2023-11-23 22:38:07 +01:00
Richard
51817c9f39 improve logging 2023-11-23 20:36:10 +01:00
Richard
a659d46629 jobs complete 2023-11-23 20:32:17 +01:00
Richard
41d14104d8 jobs initial without wait
All features except waiting
2023-11-23 19:37:01 +01:00
9 changed files with 1666 additions and 29 deletions

61
binutil.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"encoding/binary"
"io"
"fmt"
)
var BE = binary.BigEndian
func readU8(r io.Reader) (uint8, error) {
var result uint8
err := binary.Read(r, BE, &result);
return result, err
}
func readU16(r io.Reader) (uint16, error) {
var result uint16
err := binary.Read(r, BE, &result);
return result, err
}
func readU32(r io.Reader) (uint32, error) {
var result uint32
err := binary.Read(r, BE, &result);
return result, err
}
func readString(r io.Reader) (string, error) {
l, err := readU8(r)
if err == nil && l == 0 {
err = fmt.Errorf("invalid string length 0")
}
if err != nil { return "", err}
buf := make([]byte, l)
_, err = io.ReadFull(r, buf)
return string(buf), err
}
func readLString(r io.Reader) (string, error) {
l, err := readU32(r)
if err == nil && l == 0 {
err = fmt.Errorf("invalid string length 0")
}
if err != nil { return "", err}
buf := make([]byte, l)
_, err = io.ReadFull(r, buf)
return string(buf), err
}
func writeLString(w io.Writer, s string) error {
writeU32(w,uint32(len(s)))
_, err := w.Write([]byte(s))
return err
}
func writeU8(w io.Writer, i uint8) error {
return binary.Write(w, BE, i);
}
func writeU32(w io.Writer, i uint32) error {
return binary.Write(w, BE, i);
}

394
jobs.go Normal file
View File

@ -0,0 +1,394 @@
package main
import (
"net"
"os"
"fmt"
"bufio"
"encoding/json"
"sort"
)
func InsertJobSorted(s []QueuedJob, e QueuedJob) []QueuedJob {
i := sort.Search(len(s), func(i int) bool { return s[i].Pri > e.Pri })
s = append(s, e)
copy(s[i+1:], s[i:])
s[i] = e
return s
}
type ErrorResponse struct{
Status string `json:"status"`
Error string `json:"error"`
}
type PutResponse struct {
Status string `json:"status"`
Id uint `json:"id"`
}
type GetResponse struct {
Status string `json:"status"`
Id uint `json:"id"`
Job interface{} `json:"job"`
Pri uint `json:"pri"`
Queue string `json:"queue"`
}
type GetErrorResponse struct {
Status string `json:"status"`
}
type JobRequest struct {
From uint
Request string
Queue string
Pri uint
Job interface{}
Queues []string
Wait bool
Id uint
}
type Job struct {
Id uint
Details interface{}
Pri uint
Worker uint
Queue string
}
type QueuedJob struct {
Id uint
Pri uint
}
type JobServer struct {
port uint16
numSessions uint
numJobs uint
q chan JobRequest
clients map[uint]*JobSession
queues map[string][]QueuedJob
waiters map[string][]uint
jobs map[uint]*Job
}
func NewJobServer(port uint16) *JobServer {
return &JobServer{
port,
0,
0,
make(chan JobRequest),
make(map[uint]*JobSession),
make(map[string][]QueuedJob),
make(map[string][]uint),
make(map[uint]*Job),
}
}
type JobSession struct{
sessionId uint
q chan []byte
backend chan JobRequest
jobs map[uint]bool
con net.Conn
isOnline bool
}
func NewJobSession(sessionId uint, con net.Conn, backend chan JobRequest) *JobSession {
return &JobSession{
sessionId,
make(chan []byte),
backend,
make(map[uint]bool),
con,
true,
}
}
func (s *JobServer) ensureQueue(q string) {
_, ok := s.queues[q]
if ! ok {
s.queues[q] = make([]QueuedJob, 0, 5)
}
_, ok = s.waiters[q]
if ! ok {
s.waiters[q] = make([]uint, 0, 5)
}
}
func (s *JobServer) pushWaiter(q string, j *Job) bool {
for len(s.waiters[q]) > 0 {
sid := s.waiters[q][0]
s.waiters[q] = s.waiters[q][1:]
ses, ok := s.clients[sid]
if ! ok { continue }
if ! ses.isOnline { continue }
j.Worker = sid
s.rspGet(ses, j)
return true
}
return false
}
func (s *JobServer) pushJob(j *Job, q string) {
s.ensureQueue(q)
if s.pushWaiter(q, j) { return }
qj := QueuedJob{j.Id, j.Pri}
s.queues[q] = InsertJobSorted(s.queues[q], qj)
}
func (s *JobServer) rspPut(ses *JobSession, id uint) {
m := PutResponse{"ok", id}
d, _ := json.Marshal(m)
d = append(d, '\n')
ses.q <- d
}
func (s *JobServer) hdlPut(r JobRequest, ses *JobSession) {
s.numJobs += 1
j := &Job{s.numJobs, r.Job, r.Pri, 0, r.Queue}
s.jobs[j.Id] = j
s.pushJob(j, r.Queue)
s.rspPut(ses, j.Id)
}
func (s *JobServer) getBestQueue(qs []string) (string, bool) {
var bestPri uint
var bestQ string
found := false
for qi := range qs {
qname := qs[qi]
q, ok := s.queues[qname]
if ! ok { continue }
if len(q) == 0 { continue }
j := q[len(q) -1]
if found == false || j.Pri >= bestPri {
bestPri = j.Pri
bestQ = qname
}
found = true
}
return bestQ, found
}
func (s *JobServer) popQueue(qname string) (*Job, error) {
q, ok := s.queues[qname]
if ! ok { return nil, fmt.Errorf( "queue %s not found", qname)}
if len(q)== 0 { return nil, fmt.Errorf("queue %s empty", qname)}
it := q[len(q)-1]
s.queues[qname] = q[:len(q)-1]
j, ok := s.jobs[it.Id]
if ok {
return j, nil
} else {
return nil, fmt.Errorf("job not found")
}
}
func (s *JobServer) rspError(ses *JobSession, err string) {
m := ErrorResponse{"error", err}
d, _ := json.Marshal(m)
d = append(d, '\n')
ses.q <- d
}
func (s *JobServer) rspGet(ses *JobSession, j *Job) {
m := GetResponse{"ok", j.Id, j.Details, j.Pri, j.Queue}
d, _ := json.Marshal(m)
ses.jobs[j.Id] = true
d = append(d, '\n')
ses.q <- d
}
func (s *JobServer) rspGetNoJob(ses *JobSession) {
m := GetErrorResponse{"no-job"}
d, _ := json.Marshal(m)
d = append(d, '\n')
ses.q <- d
}
func (s *JobServer) rspOk(ses *JobSession) {
m := GetErrorResponse{"ok"}
d, _ := json.Marshal(m)
d = append(d, '\n')
ses.q <- d
}
func (s *JobServer) waitFor(ses *JobSession, qs []string) {
for _, qname := range qs {
s.ensureQueue(qname)
s.waiters[qname] = append(s.waiters[qname], ses.sessionId)
}
}
func (s *JobServer) hdlGet(r JobRequest, ses *JobSession) {
for {
q, ok := s.getBestQueue(r.Queues)
if ! ok {
if r.Wait {
s.waitFor(ses, r.Queues)
} else {
s.rspGetNoJob(ses)
}
return
}
j, err := s.popQueue(q)
if err != nil {
continue
}
s.rspGet(ses, j)
j.Worker = ses.sessionId
return
}
}
func (s *JobServer) hdlAbort(r JobRequest, ses *JobSession) {
j, ok := s.jobs[r.Id]
if ! ok {
s.rspGetNoJob(ses)
return
}
if j.Worker != ses.sessionId {
s.rspError(ses, "not your job")
return
}
if _, ok := ses.jobs[r.Id]; ok {
delete(ses.jobs, r.Id)
}
j.Worker = 0
s.pushJob(j, j.Queue)
s.rspOk(ses)
}
func (s *JobServer) hdlDelete(r JobRequest, ses *JobSession) {
_, ok := s.jobs[r.Id]
if ! ok {
s.rspGetNoJob(ses)
return
}
delete(s.jobs, r.Id)
s.rspOk(ses)
}
func (s *JobServer) hdlClose(ses *JobSession) {
for jid, _ := range ses.jobs {
j, ok := s.jobs[jid]
if ! ok { continue }
j.Worker = 0
s.pushJob(j, j.Queue)
}
}
func (s *JobServer) central(){
for m := range s.q {
ses, ok := s.clients[m.From]
if ! ok { continue }
fmt.Printf("I %+v\n", m)
switch m.Request {
case "put": s.hdlPut(m, ses)
case "get": s.hdlGet(m, ses)
case "abort": s.hdlAbort(m, ses)
case "delete": s.hdlDelete(m, ses)
case "close": s.hdlClose(ses)
default: s.rspError(ses, "unknown request type")
}
}
}
func (s *JobServer) Run() {
go s.central()
addr := fmt.Sprintf("0.0.0.0:%d", s.port)
server, err := net.Listen("tcp", addr)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer server.Close()
fmt.Println("JobServer waiting for client...")
for {
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
fmt.Println("client connected")
s.processClient(connection)
}
close(s.q)
}
func (s *JobServer) processClient(con net.Conn) {
sessionId := s.numSessions
s.numSessions += 1
session := NewJobSession( sessionId, con, s.q)
s.clients[sessionId] = session
go session.jobReceiver()
go session.jobSender()
}
func (s *JobSession) close() {
//fmt.Printf("close %d\n", s.sessionId)
if s.isOnline {
s.isOnline = false
m := JobRequest{
s.sessionId,
"close",
"",
0,
nil,
[]string{},
false,
0,
}
s.backend <- m
}
s.con.Close()
}
func ParseRequest(msg string) (JobRequest, error) {
var r JobRequest
err := json.Unmarshal([]byte(msg), &r)
if err != nil { return r, err}
//fmt.Printf("%+v\n", r)
return r, nil
}
func errorMessage(err error) []byte {
m := ErrorResponse{"error", err.Error()}
s, _ := json.Marshal(m)
s = append(s, '\n')
return s
}
func (s *JobSession) jobReceiver() {
defer s.close()
r := bufio.NewReaderSize(s.con, 102400)
msg, err := r.ReadString('\n')
for err == nil {
rq, perr := ParseRequest(msg)
if perr == nil {
rq.From = s.sessionId
s.backend <- rq
} else {
s.q <- errorMessage(perr)
}
msg, err = r.ReadString('\n')
}
}
func (s *JobSession) jobSender() {
for m := range s.q {
fmt.Printf("O %s", m)
s.con.Write(m)
}
}

View File

@ -12,7 +12,7 @@ type Server interface {
func main() { func main() {
var challenge int var challenge int
flag.IntVar(&challenge, "challenge",8, "Challenge number") flag.IntVar(&challenge, "challenge",11, "Challenge number")
flag.Parse() flag.Parse()
var port uint16 var port uint16
@ -37,6 +37,12 @@ func main() {
server = NewReverseServer(port); server = NewReverseServer(port);
case 8: case 8:
server = NewSecureServer(port); server = NewSecureServer(port);
case 9:
server = NewJobServer(port);
case 10:
server = NewVcsServer(port);
case 11:
server = NewPestServer(port);
default: default:
fmt.Printf("Unknown challenge\n") fmt.Printf("Unknown challenge\n")
os.Exit(1) os.Exit(1)

607
pest.go Normal file
View File

@ -0,0 +1,607 @@
package main
import (
"net"
"os"
"fmt"
"bufio"
"bytes"
"time"
)
const PACKET_HELLO = 0x50
const PACKET_ERROR = 0x51
const PACKET_SITE_VISIT = 0x58
const PACKET_DIAL_AUTHORITY = 0x53
const PACKET_TARGET_POPULATIONS = 0x54
const PACKET_CREATE_POLICY = 0x55
const PACKET_DELETE_POLICY = 0x56
const PACKET_DELETE_POLICY_OK = 0x52
const PACKET_CREATE_POLICY_OK = 0x57
type PacketReader struct {
count uint
sum byte
ptype byte
plen uint
base *bufio.Reader
started bool
}
func newPacketReader(r *bufio.Reader) *PacketReader {
return &PacketReader{
0,
0,
0,
0,
r,
false,
}
}
func (r *PacketReader) Read(b []byte) (int, error) {
bytesLeft := r.plen - r.count
if len(b) > int(bytesLeft) {
return 0, fmt.Errorf("not enough bytes left in package")
}
if ! r.started {
return 0, fmt.Errorf("read called but not started")
}
fmt.Printf("rd %d cnt %d left %d\n", len(b), r.count, bytesLeft)
n, err := r.base.Read(b)
if err != nil { return 0, err }
for i := 0; i < n; i++ {
r.sum += b[i]
}
r.count += uint(n)
return n, err
}
func (r *PacketReader) readError() error {
plen, err := readU32(r) //plen
r.plen = uint(plen)
if err != nil { return err }
d, err := readLString(r)
_, err = readU8(r) // checksum
r.started = false
if err != nil { return err }
return fmt.Errorf("Server error: %s", d)
}
func (r *PacketReader) start(ptype byte) error {
if r.started {
return fmt.Errorf("already started")
}
var err error
r.plen = 5
r.sum = 0
r.count = 0
r.started = true
r.ptype, _ = readU8(r)
if r.ptype == PACKET_ERROR {
err = r.readError()
return err
}
if r.ptype != ptype {
return fmt.Errorf("expected packet type %x but found %x", ptype, r.ptype)
}
plen, _ := readU32(r)
r.plen = uint(plen)
return nil
}
func (r *PacketReader) finish() error {
if ! r.started {
return fmt.Errorf("not started")
}
_, err := readU8(r) //checksum
r.started = false
if err != nil { return err }
if r.count != r.plen {
return fmt.Errorf("packet len mismatch exp %d read %d", r.plen, r.count)
}
if r.sum != 0 {
return fmt.Errorf("checksum error: %d", r.sum)
}
return nil
}
type PacketWriter struct {
count uint
sum byte
ptype byte
buf *bytes.Buffer
base *bufio.Writer
started bool
}
func newPacketWriter(r *bufio.Writer) *PacketWriter {
return &PacketWriter{
0,
0,
0,
nil,
r,
false,
}
}
func (w *PacketWriter) start(ptype byte) error {
if w.started {
return fmt.Errorf("already started")
}
w.buf = bytes.NewBuffer([]byte{})
w.count = 0
w.sum = ptype
w.started = true
w.ptype = ptype
return nil
}
func (w *PacketWriter) finish() error {
if ! w.started {
return fmt.Errorf("not started")
}
w.started = false
plen := uint32(w.count + 6)
c := byte(plen) + byte(plen >> 8) + byte(plen >> 16) + byte(plen >> 24) + w.sum
c = byte(- c)
err := writeU8(w.base, w.ptype)
if err != nil { return err }
err = writeU32(w.base, plen)
if err != nil { return err }
_,err = w.base.Write(w.buf.Bytes())
if err != nil { return err }
err = writeU8(w.base, c)
if err != nil { return err }
//fmt.Printf("Sending packet %x length %d, data %d\n", w.ptype, plen, w.buf.Len())
err = w.base.Flush()
return err
}
func (w *PacketWriter) Write(b []byte) (int, error) {
if ! w.started {
return 0, fmt.Errorf("write called but not started")
}
n, err := w.buf.Write(b)
w.count += uint(n)
for i := 0; i < n; i++ {
w.sum += b[i]
}
return n, err
}
type HelloPacket struct {
protocol string
version uint
}
func readHelloPacket(r *PacketReader) (HelloPacket, error) {
var p HelloPacket
var err error
err = r.start(PACKET_HELLO)
if err != nil { return p, err }
p.protocol, err = readLString(r)
if err != nil { return p, err }
version, err := readU32(r)
if err != nil { return p, err }
p.version = uint(version)
err = r.finish()
return p, err
}
func writeHelloPacket(w *PacketWriter, p HelloPacket) error {
err := w.start(PACKET_HELLO)
if err != nil { return err }
err = writeLString(w, p.protocol)
if err != nil { return err }
err = writeU32(w, uint32(p.version))
if err != nil { return err }
err = w.finish()
return err
}
func readHello(r *PacketReader) error {
hello, err := readHelloPacket(r)
if err != nil { return err }
fmt.Printf("%+v\n", hello)
if hello.protocol != "pestcontrol" { return fmt.Errorf("unknown protocol")}
if hello.version != 1 {
return fmt.Errorf("unknown version")
}
return nil
}
func sendHello(w *PacketWriter) error {
p := HelloPacket{"pestcontrol", 1}
err := writeHelloPacket(w, p)
return err
}
type Tally struct {
site uint
species string
count uint
}
type SiteVisitPacket struct {
siteId uint
populations []Tally
}
func validateSiteVisit(v SiteVisitPacket) error {
s := make(map[string]uint)
for _, t := range v.populations {
t2, ok := s[t.species]
if ok && t.count != t2 {
return fmt.Errorf("conflicting counts for %s", t.species)
}
s[t.species] = t.count
}
return nil
}
func readSiteVisitPacket(r *PacketReader) (SiteVisitPacket, error) {
var p SiteVisitPacket
var err error
err = r.start(PACKET_SITE_VISIT)
if err != nil { return p, err }
siteId, err := readU32(r)
if err != nil { return p, err }
p.siteId = uint(siteId)
talCount, err := readU32(r)
if err != nil { return p, err }
for i := 0 ; i < int(talCount) ; i++ {
var t Tally
var count uint32
t.site = uint(siteId)
t.species, err = readLString(r)
if err != nil { return p, err }
count, err = readU32(r)
if err != nil { return p, err }
t.count = uint(count)
p.populations = append(p.populations, t)
}
err = r.finish()
return p, err
}
type DialAuthorityPacket struct {
siteId uint
}
func writeDialAuthorityPacket(w *PacketWriter, p DialAuthorityPacket) error {
err := w.start(PACKET_DIAL_AUTHORITY)
if err != nil { return err }
err = writeU32(w, uint32(p.siteId))
if err != nil { return err }
err = w.finish()
return err
}
type MaxPop struct {
species string
max uint
min uint
}
type TargetPopulationsPacket struct {
siteId uint
populations []MaxPop
}
func readTargetPopulationsPacket(r *PacketReader) (TargetPopulationsPacket, error) {
var p TargetPopulationsPacket
var err error
err = r.start(PACKET_TARGET_POPULATIONS)
if err != nil { return p, err }
siteId, err := readU32(r)
if err != nil { return p, err }
p.siteId = uint(siteId)
talCount, err := readU32(r)
if err != nil { return p, err }
for i := 0 ; i < int(talCount) ; i++ {
var t MaxPop
var max uint32
var min uint32
t.species, err = readLString(r)
if err != nil { return p, err }
max, err = readU32(r)
if err != nil { return p, err }
t.max = uint(max)
min, err = readU32(r)
if err != nil { return p, err }
t.min = uint(min)
p.populations = append(p.populations, t)
}
err = r.finish()
return p, err
}
type CreatePolicyPacket struct {
species string
policy byte
}
func writeCreatePolicyPacket(w *PacketWriter, p CreatePolicyPacket) error {
err := w.start(PACKET_CREATE_POLICY)
if err != nil { return err }
err = writeLString(w, p.species)
if err != nil { return err }
err = writeU8(w, p.policy)
if err != nil { return err }
err = w.finish()
return err
}
type DeletePolicyPacket struct {
policyId uint
}
func writeDeletePolicyPacket(w *PacketWriter, p DeletePolicyPacket) error {
err := w.start(PACKET_DELETE_POLICY)
if err != nil { return err }
err = writeU32(w, uint32(p.policyId))
if err != nil { return err }
err = w.finish()
return err
}
type CreatePolicyOkPacket struct {
policyId uint
}
func readCreatePolicyOkPacket(r *PacketReader) (CreatePolicyOkPacket, error) {
var err error
var p CreatePolicyOkPacket
err = r.start(PACKET_CREATE_POLICY_OK)
if err != nil { return p, err }
pid, err := readU32(r)
if err != nil { return p, err }
p.policyId = uint(pid)
err = r.finish()
return p, err
}
func readDeletePolicyOkPacket(r *PacketReader) error {
var err error
err = r.start(PACKET_DELETE_POLICY_OK)
if err != nil { return err }
err = r.finish()
return err
}
func writeError(w *PacketWriter, e error) error {
fmt.Printf("sending error: %s\n", e)
err := w.start(PACKET_ERROR)
if err != nil { return err }
err = writeLString(w, e.Error())
if err != nil { return err }
err = w.finish()
return err
}
type Policy struct {
policyId uint
policy byte
}
type Authority struct {
siteId uint
q chan SiteVisitPacket
r *PacketReader
w *PacketWriter
pops map[string]MaxPop
policies map[string]Policy
}
func newAuthority(siteId uint) (*Authority) {
a := &Authority{
siteId,
make(chan SiteVisitPacket, 10),
nil,
nil,
make(map[string]MaxPop),
make(map[string]Policy),
}
go a.run()
return a
}
func (a *Authority) connect() error {
addr := "pestcontrol.protohackers.com:20547"
con, err := net.Dial("tcp", addr)
if err != nil { return err }
a.r = newPacketReader(bufio.NewReader(con))
a.w = newPacketWriter(bufio.NewWriter(con))
sendHello(a.w)
err = readHello(a.r)
if err != nil { return err}
p := DialAuthorityPacket{a.siteId}
err = writeDialAuthorityPacket(a.w, p)
if err != nil { return err}
mp, err := readTargetPopulationsPacket(a.r)
if err != nil { return err}
for _, p := range mp.populations {
a.pops[p.species] = p
}
return nil
}
func getMeasuredCount(v SiteVisitPacket, species string) uint {
for _, p := range v.populations {
if p.species == species {
return p.count
}
}
return 0
}
func (a *Authority) deletePolicy(policyId uint) error {
p := DeletePolicyPacket{policyId}
err := writeDeletePolicyPacket(a.w, p)
if err != nil { return err }
err = readDeletePolicyOkPacket(a.r)
fmt.Printf("delpol %d\n", policyId)
return err
}
func (a *Authority) createPolicy(species string, policy byte) (uint, error) {
p := CreatePolicyPacket{species, policy}
err := writeCreatePolicyPacket(a.w, p)
if err != nil { return 0, err }
okp, err := readCreatePolicyOkPacket(a.r)
fmt.Printf("pol %s set to %d id %d\n", species, policy, okp.policyId)
return okp.policyId, err
}
func (a *Authority) setPolicy(species string, policy byte) error {
var err error
pol, ok := a.policies[species]
// if the policy matches, done!
if pol.policy == policy {
return nil
}
if ok {
err = a.deletePolicy(pol.policyId)
}
if err != nil { return err }
pol.policy = policy
pol.policyId, err = a.createPolicy(species, policy)
a.policies[species] = pol
return err
}
func(a *Authority) hdlSiteVisit(v SiteVisitPacket) error {
for _, p := range a.pops {
cnt := getMeasuredCount(v, p.species)
policy := byte(0)
if cnt > p.max { policy = 0x90 }
if cnt < p.min { policy = 0xa0 }
err := a.setPolicy(p.species, policy)
if err != nil { return err }
}
return nil
}
func (a *Authority) run() {
fmt.Printf("connecting to site %d\n", a.siteId)
err := a.connect()
if err != nil {
fmt.Printf("ERROR authority connect fail %s\n", err)
return // todo retry later
}
for v := range a.q {
err = a.hdlSiteVisit(v)
if err != nil {
fmt.Printf("ERROR %s\n", err)
}
}
}
type PestServer struct {
port uint16
q chan SiteVisitPacket
authorities map[uint]*Authority
}
func NewPestServer(port uint16) *PestServer {
return &PestServer{
port,
make(chan SiteVisitPacket, 10),
make(map[uint]*Authority),
}
}
type PestSession struct{
con net.Conn
backend chan SiteVisitPacket
r *PacketReader
w *PacketWriter
}
func NewPestSession(con net.Conn, backend chan SiteVisitPacket) *PestSession {
return &PestSession{
con,
backend,
newPacketReader(bufio.NewReader(con)),
newPacketWriter(bufio.NewWriter(con)),
}
}
func (s *PestServer) central() {
for t := range s.q {
a, ok := s.authorities[t.siteId]
if ! ok {
a = newAuthority(t.siteId)
s.authorities[t.siteId] = a
}
a.q <- t
}
}
func (s *PestServer) Run() {
go s.central()
addr := fmt.Sprintf("0.0.0.0:%d", s.port)
server, err := net.Listen("tcp", addr)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer server.Close()
fmt.Println("PestServer waiting for client...")
for {
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
fmt.Println("client connected")
s.processClient(connection)
}
}
func (s *PestServer) processClient(con net.Conn) {
session := NewPestSession(con, s.q)
go session.pestHandler()
}
func (s *PestSession) pestHandler() error {
err := readHello(s.r)
fmt.Println("got hello, sending back")
sendHello(s.w)
if err != nil {
writeError(s.w, err)
return err
}
var v SiteVisitPacket
for err == nil {
s.con.SetReadDeadline(time.Now().Add(30*time.Second))
v, err = readSiteVisitPacket(s.r)
if err != nil { break }
fmt.Printf("I %+v\n", v)
err := validateSiteVisit(v)
if err != nil {
writeError(s.w, err)
} else {
s.backend <- v
}
}
writeError(s.w, err)
fmt.Printf("ERROR: %s\n", err)
return err
}
func (s *PestSession) run() {
err := s.pestHandler()
fmt.Printf("%e\n", err)
if err != nil {
writeError(s.w, err)
}
}

138
spec/vcs.md Normal file
View File

@ -0,0 +1,138 @@
# Discovery
```
$ nc vcs.protohackers.com 30307
READY
help
OK usage: HELP|GET|PUT|LIST
READY
list /
OK 0
READY
put /snack 4
bla
OK r1
READY
list /
OK 1
snack r1
READY
get /snack
OK 4
bla
READY
put /snack 3
al
OK r2
READY
READY
list /
OK 1
snack r2
READY
put /snik/snak
ERR usage: PUT file length newline data
READY
put /snik/snak 3
ji
OK r1
READY
list /
OK 2
snack r2
snik/ DIR
READY
get
ERR usage: GET file [revision]
READY
get /snack r1
OK 4
bla
READY
```
# overview
this system is a simple filesystem that maintains file versions. All paths are absolute, there is no current working directory.
This is a tcp protocol that strictly prescribes who talks when. it has two modes:
* in command mode, server prints READY, then reads until it finds a newline
* in data mode, server reads a prespecified amount of bytes
* on connection, the server goes into command mode
* after a newline, if the command is put and it is a valid command, the server goes into data mode. after the specified amount of bytes has been read, the server prints a response (see below) and returns to command mode
* for all other commands, the server prints a response and returns to command mode
* responses start with OK or ERR and can be multiple lines
* if a non existing command is given, the response is 'ERR illegal method: [cmd]'
The filesystem must be global, shared by all users
Filenames are case sensitive
A node is listed as a file if it has 1 or more revisions. otherwise, it is listed as a dir. it can be both at the same time
# commands
commands and arguments are separated by spaces.
## help
OK response describing all commands.
'OK usage: HELP|GET|PUT|LIST'
if arguments are given they are ignored
## LIST
argument: dir
if more or less arguments are provided, response is 'ERR usage: LIST dir'
dir must start with a / and can contain more than one / (subdirs). trailing / is optional. Two consecutive slashed are not allowed.
if dir is an illegal dir name (does not start with a /), response is 'ERR illegal dir name'
If dir does not exist or is empty, the response is 'OK 0'
the response followed by the number of lines in the list
each line starts with the name of the item
if the item is a directory, the name is postfixed with a / and followed by the word DIR
If the item is a file, the name is followed by the revision, prefixed by 'r'
## PUT
arguments: file length
if more or less arguments are provided, the response is 'ERR usage: PUT file length newline data'
filenames have the same rules as dirnames but can not end with a /
if an illegal filename is given, the response is 'ERR illegal file name'
if length is not a number or a negative number, it is interpreted as 0
if no errors are found, the server goes into data mode to read the specified amount of bytes (can be 0)
the bytes are then stored in the file. if the file is new, it is given revision 1, otherwise the revision is incremented.
the response is 'OK r1' with r1 being the latest revision of the file.
## GET
arguments: file [revision]
If the wrong number of args is given, the response is 'ERR usage: GET file [revision]'
the file argument follows the same rules as PUT
if the file is not found, the response is 'ERR no such file'
if the file is a directory, the response is the same as when the file is not found
the revision is a number, optionally prefixed by a "r"
if the revision is postfixed by non-numeric chars, these are ignored
if the revision is not a number, it is not found
if the revision is not found, the response is 'ERR no such revision'
if no revision is given, the latest one is used.
The response is 'OK 5' where 5 is the number of bytes, followed by that number of bytes.

View File

@ -12,34 +12,6 @@ import (
"bytes" "bytes"
) )
var BE = binary.BigEndian
func readU8(r io.Reader) (uint8, error) {
var result uint8
err := binary.Read(r, BE, &result);
return result, err
}
func readU16(r io.Reader) (uint16, error) {
var result uint16
err := binary.Read(r, BE, &result);
return result, err
}
func readU32(r io.Reader) (uint32, error) {
var result uint32
err := binary.Read(r, BE, &result);
return result, err
}
func readString(r io.Reader) (string, error) {
l, err := readU8(r)
if err == nil && l == 0 {
err = fmt.Errorf("invalid string length 0")
}
if err != nil { return "", err}
buf := make([]byte, l)
_, err = io.ReadFull(r, buf)
return string(buf), err
}
type SpeedMessage interface { type SpeedMessage interface {
serialize() []byte serialize() []byte
} }

97
tests/jobs.py Normal file
View File

@ -0,0 +1,97 @@
import socket
from struct import pack, unpack
from time import time, sleep
from binascii import unhexlify
import json
def sock():
addr = ("localhost", 13370)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(addr)
return s
def snd(s, m):
d = json.dumps(m)
print("O", d)
s.sendall(d.encode() +b'\n')
def rsp(s):
d = s.recv(1024).decode().strip()
print("I", d)
j = json.loads(d)
return j
def put(s, q="queue1", pri=123):
j = {"request":"put","queue":"queue1","job":{"title": "blah"},"pri":123}
snd(s, j)
r = rsp(s)
if "id" in r:
return r["id"]
return None
def get(s, q=["queue1"], wait=False):
j = {"request":"get","queues":["queue1"], "wait": wait}
snd(s, j)
if wait:
print("Waiting...")
return None
r = rsp(s)
if "id" in r:
return r["id"]
return None
def abort(s, i):
j = {"request":"abort","id": i}
snd(s, j)
rsp(s)
def delete(s, i):
j = {"request":"delete","id": i}
snd(s, j)
rsp(s)
def bull1(s):
snd(s, "snack")
rsp(s)
def bull2(s):
snd(s, {})
rsp(s)
def bull3(s):
snd(s,{"request":"snack"})
rsp(s)
def test(s, s2):
bull1(s)
bull2(s)
bull3(s)
put(s)
jid = get(s)
get(s)
abort(s, jid)
get(s)
delete(s, jid)
abort(s, jid)
jid = put(s)
get(s)
get(s2)
s.close()
sleep(1)
get(s2)
delete(s2, jid)
def testw(s1, s2):
get(s1, wait=True)
put(s2)
rsp(s1)
def testw2(s1, s2):
put(s2)
get(s2)
get(s1, wait=True)
sleep(1)
s2.close()
rsp(s1)
testw2(sock(),sock())

46
tests/pest.py Normal file
View File

@ -0,0 +1,46 @@
import socket
from struct import pack, unpack
from time import time, sleep
def sock():
addr = ("localhost", 13370)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(addr)
return s
def recv(s):
ptype, plen = unpack(">BI", s.recv(5))
data = s.recv(plen-6)
cs = s.recv(1)
print(f"I {ptype:X} {data}")
def pstr(s):
if type(s) == str:
s = s.encode()
return pack(">I", len(s)) + s
def snd(s, typ, dat):
p = pack(">BI", typ, len(dat)+6) + dat
csum = (-sum(p)) % 256
c = bytes([ csum ])
s.sendall(p + c)
def hello(s):
h = pstr("pestcontrol")
h += pack(">I", 1)
snd(s, 0x50, h)
def sivi(s):
h = pack(">II", 1337, 3)
h += pstr("green starred rat")
h += pack(">I", 765)
h += pstr("red footed elephant")
h += pack(">I", 6029)
h += pstr("black tailed unicorn")
h += pack(">I", 1234)
snd(s, 0x58, h)
s = sock()
hello(s)
recv(s)
sivi(s)

316
vcs.go Normal file
View File

@ -0,0 +1,316 @@
package main
import (
"net"
"os"
"fmt"
"bufio"
"strings"
"strconv"
"io"
"sort"
"regexp"
)
type VcsNode struct {
children map[string]*VcsNode
revisions []string
}
func (n *VcsNode) getRevision(rev string) (string, error) {
var err error
r := len(n.revisions)
if rev != "" {
if rev[0] == 'r' { rev = rev[1:]}
r, err = strconv.Atoi(rev)
if err != nil { return "", fmt.Errorf("no such revision")}
}
if r <1 || r > len(n.revisions) {
return "", fmt.Errorf("no such revision")
}
return n.revisions[r-1], nil
}
func (n *VcsNode) list() []string {
keys := make([]string, len(n.children))
i := 0
for k := range n.children {
keys[i] = k
i++
}
sort.Strings(keys)
return keys
}
type PutRequest struct {
file string
content string
callback chan PutResult
}
type PutResult struct {
rev uint
err error
}
type VcsStorage struct {
q chan *PutRequest
root *VcsNode
}
type VcsServer struct {
port uint16
storage VcsStorage
}
func NewPutRequest(file string, content string) *PutRequest {
return &PutRequest{
file,
content,
make(chan PutResult),
}
}
func NewVcsNode() *VcsNode {
return &VcsNode {
make(map[string]*VcsNode),
nil,
}
}
func NewVcsServer(port uint16) *VcsServer {
return &VcsServer{
port,
VcsStorage{
make(chan *PutRequest),
NewVcsNode(),
},
}
}
type VcsSession struct{
storage *VcsStorage
con net.Conn
}
func NewVcsSession(storage *VcsStorage, con net.Conn) *VcsSession {
return &VcsSession{
storage,
con,
}
}
func (n *VcsNode) get(path []string, create bool) (*VcsNode, error) {
if len(path) == 0 { return n, nil }
part := path[0]
nx, ok := n.children[part]
if ! ok {
if create {
nx = NewVcsNode()
n.children[part] = nx
}else{
return nil, fmt.Errorf("no such file")
}
}
return nx.get(path[1:], create)
}
func isPrintable(p string) bool {
for _, c:= range []rune(p) {
if c > 126 {
return false
}
}
return true
}
func parsePath(strPath string, isDir bool) ([]string, error) {
p := strings.Split(strPath, "/")
itype := "file"
match, _ := regexp.MatchString(`[^a-zA-Z0-9.\-_/]`, strPath)
if isDir {
itype = "dir"
}
printable := isPrintable(strPath)
if p[0] != "" || match || !printable {
return []string{}, fmt.Errorf("invalid %s name", itype)
}
if p[len(p)-1] == "" {
if !isDir {
return []string{}, fmt.Errorf("invalid file name")
}else{
p = p[:len(p)-1]
}
}
p = p[1:]
return p, nil
}
func (s *VcsStorage) hdlPut(m *PutRequest) (uint, error) {
path, err := parsePath(m.file, false )
if err != nil {
return 0, err
}
f, err := s.root.get(path, true)
if err != nil {
return 0, err
}
if len(f.revisions) < 1 || f.revisions[len(f.revisions)-1] != m.content {
f.revisions = append(f.revisions, m.content)
}
return uint(len(f.revisions)), nil
}
func (s *VcsStorage) central(){
for m := range s.q {
rev, err := s.hdlPut(m)
m.callback <- PutResult{rev, err}
}
}
func (s *VcsServer) Run() {
go s.storage.central()
addr := fmt.Sprintf("0.0.0.0:%d", s.port)
server, err := net.Listen("tcp", addr)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
defer server.Close()
fmt.Println("VcsServer waiting for client...")
for {
connection, err := server.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
os.Exit(1)
}
fmt.Println("client connected")
s.processClient(connection)
}
}
func (s *VcsServer) processClient(con net.Conn) {
session := NewVcsSession(&s.storage, con)
go session.vcsHandler()
}
func (s *VcsSession) hdlHelp() error {
s.write("OK usage: HELP|GET|PUT|LIST\n")
return nil
}
func (s *VcsSession) hdlPut(args []string, r *bufio.Reader) error {
if len(args) != 2 { return fmt.Errorf("usage: PUT file length newline data")}
file := args[0]
clen, err := strconv.Atoi(args[1])
if err != nil { clen = 0 }
buf := make([]byte, clen)
_, err = io.ReadFull(r, buf)
if err!= nil { return err }
content := string(buf)
if ! isPrintable(content) {
return fmt.Errorf("text files only")
}
rq := NewPutRequest(file, content)
s.storage.q <- rq
result := <- rq.callback
err = result.err
if err == nil {
s.write(fmt.Sprintf("OK r%d\n", result.rev))
}
return err
}
func (s *VcsSession) hdlList(args []string) error {
if len(args) != 1 { return fmt.Errorf("usage: LIST dir")}
dir := args[0]
path, err := parsePath(dir, true )
if err != nil {
return err
}
n, err := s.storage.root.get(path, false)
if err != nil {
s.write("OK 0\n")
return nil
}
s.write(fmt.Sprintf("OK %d\n", len(n.children)))
for _, nm := range n.list() {
child := n.children[nm]
meta := "DIR"
if len(child.revisions) > 0{
meta = fmt.Sprintf("r%d", len(child.revisions))
}else{
nm += "/"
}
s.write(fmt.Sprintf("%s %s\n", nm, meta))
}
return nil
}
func (s *VcsSession) hdlGet(args []string) error {
if len(args) < 1 || len(args) > 2{ return fmt.Errorf("usage: GET file [revision]")}
file := args[0]
rev := ""
if len(args) > 1 {
rev = args[1]
}
path, err := parsePath(file, false )
if err != nil {
return err
}
n, err := s.storage.root.get(path, false)
if err != nil { return err }
data, err := n.getRevision(rev)
if err != nil { return err}
s.write(fmt.Sprintf("OK %d\n", len(data)))
s.write(data)
return nil
}
func (s *VcsSession) process(cmd string, r *bufio.Reader) error {
var err error
parts := strings.Split(cmd, " ")
if len(parts) == 0 { return fmt.Errorf("illegal method:")}
method := strings.ToLower(parts[0])
args := parts[1:]
switch method {
case "help": err = s.hdlHelp()
case "put": err = s.hdlPut(args, r)
case "list": err = s.hdlList(args)
case "get": err = s.hdlGet(args)
default: err = fmt.Errorf("illegal method: %s", method)
}
return err
}
func (s *VcsSession) write(m string){
fmt.Printf("O %s", m)
s.con.Write([]byte(m))
}
func (s *VcsSession) commandMode(r *bufio.Reader) error {
s.write("READY\n")
msg, err := r.ReadString('\n')
if err != nil { return err }
fmt.Printf("I %q\n",msg)
msg = strings.TrimSpace(msg)
perr := s.process(msg, r)
if perr != nil {
s.write(fmt.Sprintf("ERR %s\n", perr.Error()))
}
return nil
}
func (s *VcsSession) vcsHandler() {
r := bufio.NewReaderSize(s.con, 102400)
var err error
for err == nil {
err = s.commandMode(r)
}
}