speed basics working

This commit is contained in:
Richard 2023-09-29 08:24:18 +02:00
parent 6e90310d75
commit 9ca099ead4
2 changed files with 321 additions and 8 deletions

286
speed.go
View File

@ -3,13 +3,43 @@ package main
import (
"net"
"os"
"sort"
"fmt"
"time"
"encoding/binary"
"io"
"math"
"bytes"
)
var BE = binary.BigEndian
func readU8(r io.Reader) (uint8, error) {
var result uint8
err := binary.Read(r, BE, &result);
return result, err
}
func readU16(r io.Reader) (uint16, error) {
var result uint16
err := binary.Read(r, BE, &result);
return result, err
}
func readU32(r io.Reader) (uint32, error) {
var result uint32
err := binary.Read(r, BE, &result);
return result, err
}
func readString(r io.Reader) (string, error) {
l, err := readU8(r)
if err == nil && l == 0 {
err = fmt.Errorf("invalid string length 0")
}
if err != nil { return "", err}
buf := make([]byte, l)
_, err = io.ReadFull(r, buf)
return string(buf), err
}
type SpeedMessage interface {
serialize() []byte
}
@ -18,8 +48,12 @@ type ErrorMessage struct {
err string
}
func serializeString(s string) []byte {
return append([]byte{byte(len(s))}, []byte(s)...)
}
func (m ErrorMessage) serialize() []byte {
return []byte{0x10}
return append([]byte{0x10}, serializeString(m.err)...)
}
type HeartbeatMessage struct {}
@ -31,41 +65,137 @@ func (m HeartbeatMessage) serialize() []byte {
type SpeedServer struct {
port uint16
tickQ chan bool
delQ chan uint
plateQ chan Observation
dspQ chan uint
clients map[uint]*SpeedClient
numClients uint
cars map[string]*Car
pendingTickets map[uint16][]Ticket
}
func NewSpeedServer(port uint16) *SpeedServer {
s := &SpeedServer{
port,
make(chan bool),
make(chan uint),
make(chan Observation),
make(chan uint),
make(map[uint]*SpeedClient),
0,
make(map[string]*Car),
make(map[uint16][]Ticket),
}
return s
}
type SpeedClient struct {
s *SpeedServer
clientId uint
con net.Conn
q chan SpeedMessage
ctype uint8
lastHeartbeat int64
heartbeat int64
camRoad uint16
camMile uint16
camLimit uint16
dspRoads []uint16
}
func NewSpeedClient(clientId uint, con net.Conn) *SpeedClient {
func NewSpeedClient(clientId uint, con net.Conn, s *SpeedServer) *SpeedClient {
return &SpeedClient{
s,
clientId,
con,
make(chan SpeedMessage),
0,
0,
0,
0,
0,
0,
nil,
}
}
type Observation struct {
plate string
timestamp uint32
road uint16
mile uint16
limit uint16
}
func (one Observation) speed(two Observation) uint16 {
dist := math.Abs(float64(one.mile) - float64(two.mile))
dur := math.Abs((float64(one.timestamp) - float64(two.timestamp)) / 3600)
speed := (dist / dur) * 100
//fmt.Printf("dist %f dur %f speed %f\n", dist, dur, speed)
return uint16(speed)
}
type Ticket struct {
plate string
road uint16
mile1 uint16
timestamp1 uint32
mile2 uint16
timestamp2 uint32
speed uint16
}
func (t Ticket) serialize() []byte {
buf := new(bytes.Buffer)
buf.Write([]byte{0x21})
binary.Write(buf, BE, uint8(len(t.plate)))
buf.Write([]byte(t.plate))
binary.Write(buf, BE, t.road)
binary.Write(buf, BE, t.mile1)
binary.Write(buf, BE, t.timestamp1)
binary.Write(buf, BE, t.mile2)
binary.Write(buf, BE, t.timestamp2)
binary.Write(buf, BE, t.speed)
return buf.Bytes()
}
type Car struct {
plate string
obs []Observation
ticketDays map[uint32]bool
}
func (c *Car) hasTicket(one uint32, two uint32) bool {
for i := one ; i <= two; i += 86400 {
day := uint32(i / 86400)
_, ok := c.ticketDays[day]
if ok {
fmt.Printf("ticket on %d for %s\n", day, c.plate)
return true
}
fmt.Printf("no ticket on %d for %s\n", day, c.plate)
}
return false
}
func (c *Car) markTicket(one uint32, two uint32) {
for i := one ; i <= two; i += 86400 {
day := uint32(i / 86400)
fmt.Printf("setting ticket on %d for %s\n", day, c.plate)
c.ticketDays[day] = true
}
}
func (c *Car) addObservation(o Observation) int {
i := sort.Search(len(c.obs), func(i int) bool { return c.obs[i].timestamp >= o.timestamp })
c.obs = append(c.obs, o)
copy(c.obs[i+1:], c.obs[i:])
c.obs[i] = o
return i
}
func (s *SpeedServer) tick() {
cur := time.Now().UnixMilli()
m := HeartbeatMessage{}
@ -86,10 +216,92 @@ func (s *SpeedServer) ticker() {
}
}
func (s *SpeedServer) main() {
func (s *SpeedServer) closeClient(cid uint) {
c := s.clients[cid]
delete(s.clients, cid)
c.close()
}
func (s *SpeedServer) addPendingTicket(t Ticket) {
l := s.pendingTickets[t.road]
s.pendingTickets[t.road] = append(l, t)
}
func (s *SpeedServer) sendTicket(t Ticket) {
for _, c := range s.clients {
if c.handlesRoad(t.road) {
c.q <- t
return
}
}
fmt.Printf("No dispatch available, pending ticket")
s.addPendingTicket(t)
}
func (s *SpeedServer) flushDispatch(cid uint) {
c, ok := s.clients[cid]
if ! ok { return }
fmt.Printf("flushing roads %v\n", c.dspRoads)
for _, road := range c.dspRoads {
fmt.Printf("finding pending tickets for %d\n", road)
l, ok := s.pendingTickets[road]
if ! ok { continue }
fmt.Printf("ticket array %v\n", l)
for _, t := range l {
fmt.Printf("sending ticket")
c.q <- t
}
delete(s.pendingTickets, road)
}
}
func (s *SpeedServer) considerTicket(car *Car, pos int) {
one := car.obs[pos]
two := car.obs[pos + 1]
if one.road != two.road { return }
speed := one.speed(two)
if speed <= one.limit { return }
if car.hasTicket(one.timestamp, two.timestamp) { return }
car.markTicket(one.timestamp, two.timestamp)
fmt.Printf("Ticket %s, %d on road %d\n", car.plate, speed, one.road)
t := Ticket {
car.plate,
one.road,
one.mile,
one.timestamp,
two.mile,
two.timestamp,
speed,
}
s.sendTicket(t)
}
func (s *SpeedServer) plate(o Observation) {
car, ok := s.cars[o.plate]
if ! ok {
car = &Car{
o.plate,
nil,
make(map[uint32]bool),
}
s.cars[o.plate] = car
}
pos := car.addObservation(o)
if pos > 0 {
s.considerTicket(car, pos-1)
}
if pos < len(car.obs) - 1 {
s.considerTicket(car, pos)
}
}
func (s *SpeedServer)main() {
for {
select {
case _ = <- s.tickQ: s.tick()
case cid := <- s.delQ: s.closeClient(cid)
case cid := <- s.dspQ: s.flushDispatch(cid)
case o := <- s.plateQ: s.plate(o)
}
}
}
@ -121,7 +333,7 @@ func (s *SpeedServer) listen() {
}
func (s *SpeedServer) start(con net.Conn) {
c := NewSpeedClient(s.numClients, con)
c := NewSpeedClient(s.numClients, con, s)
s.clients[s.numClients] = c
s.numClients += 1
go c.sender()
@ -142,11 +354,13 @@ func (c *SpeedClient) receiver() {
fmt.Printf("client %d connected\n", c.clientId)
for {
var mType uint8
err = binary.Read(c.con, BE, &mType);
mType, err = readU8(c.con)
if err != nil { break }
switch mType {
case 0x20: err = c.hdlPlate()
case 0x40: err = c.hdlWantHeartbeat()
case 0x80: err = c.hdlIAmCamera()
case 0x81: err = c.hdlIAmDispatch()
default: err = fmt.Errorf("Unknown message type 0x%x", mType)
}
if err != nil { break }
@ -156,16 +370,72 @@ func (c *SpeedClient) receiver() {
} else {
fmt.Printf("Client %d closing\n", c.clientId)
}
c.s.delQ <- c.clientId
}
func (c *SpeedClient) close() {
close(c.q)
}
func (c *SpeedClient) handlesRoad(road uint16) bool {
if c.ctype != 2 { return false }
for i := range c.dspRoads {
if c.dspRoads[i] == road { return true}
}
return false
}
func (c *SpeedClient) hdlWantHeartbeat() error {
var wantedHb uint32
err := binary.Read(c.con, BE, &wantedHb)
wantedHb, err := readU32(c.con)
if err != nil { return err }
c.heartbeat = int64(wantedHb) * 100
fmt.Printf("heartbeat for %d set to %d\n", c.clientId, wantedHb)
return nil
}
func (c *SpeedClient) hdlIAmCamera() error {
c.ctype = 1
var err error
c.camRoad, err = readU16(c.con)
if err != nil { return err }
c.camMile, err = readU16(c.con)
if err != nil { return err }
c.camLimit, err = readU16(c.con)
c.camLimit = c.camLimit * 100
fmt.Printf("Client %d is camera %d/%d limit %d\n", c.clientId, c.camRoad, c.camMile, c.camLimit)
return err
}
func (c *SpeedClient) hdlIAmDispatch() error {
c.ctype = 2
numRoads, err := readU8(c.con)
if err != nil { return err }
for i := uint8(0); i < numRoads; i++ {
road, err := readU16(c.con)
if err != nil { return err }
c.dspRoads = append(c.dspRoads, road)
}
c.s.dspQ <- c.clientId
fmt.Printf("Client %d is dispatch for %v\n", c.clientId, c.dspRoads)
return nil
}
func (c *SpeedClient) hdlPlate() error {
if c.ctype != 1 {
return fmt.Errorf("not a camera")
}
plate, err := readString(c.con)
if err != nil { return err }
timestamp, err := readU32(c.con)
if err != nil { return err }
o := Observation{
plate,
timestamp,
c.camRoad,
c.camMile,
c.camLimit,
}
fmt.Printf("Cam %d saw %s on %d/%d at %d\n", c.clientId, plate, c.camRoad, c.camMile, timestamp)
c.s.plateQ <- o
return nil
}

43
tests/speed.py Normal file
View File

@ -0,0 +1,43 @@
import socket
from struct import pack, unpack
from time import time, sleep
def sock():
addr = ("localhost", 13370)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(addr)
return s
def heartbeat(s):
s.sendall(pack(">BI", 0x40, 2))
beat1 = s.recv(1024)
start = time()
beat2 = s.recv(1024)
dur = time() - start
print(f"heartbeat {dur:.2f}")
def camera(s):
s.sendall(pack(">BHHH", 0x80,1,0,80))
s.sendall(b"\x20\x05snack" +pack(">I", 100000))
def camera2(s):
s.sendall(pack(">BHHH", 0x80,1,85,80))
s.sendall(b"\x20\x05snack" +pack(">I", 103600))
def dispatch(s):
s.sendall(pack(">BBHH", 0x81,2,2,1))
return s
def readTicket(d):
l = unpack("BB", d.recv(2))[1]
plt = d.recv(l)
dt = unpack(">HHIHIH", d.recv(16))
print(f"Ticket {plt} speed {dt[5]}")
heartbeat(sock())
camera(sock())
camera2(sock())
sleep(1)
dsp = dispatch(sock())
readTicket(dsp)