Update commander.py, Dockerfile and twenty-five other files

This commit is contained in:
Richard Bronkhorst
2023-06-11 20:51:03 +02:00
parent 042b931133
commit 1c46d25081
18 changed files with 209 additions and 75 deletions

0
nullptr/__init__.py Normal file
View File

76
nullptr/api.py Normal file
View File

@@ -0,0 +1,76 @@
import requests
from nullptr.models.system import System
from nullptr.models.waypoint import Waypoint
from nullptr.models.marketplace import Marketplace
from .util import *
class ApiError(Exception):
def __init__(self, msg, code):
super().__init__(msg)
self.code = code
class Api:
def __init__(self, store, agent):
self.agent = agent
self.store = store
self.meta = None
self.root = 'https://api.spacetraders.io/v2/'
def token(self):
if self.agent.token is None:
raise ApiError('no token. Please register', 1337)
return self.agent.token
def request(self, method, path, data=None, need_token=True, params={}):
headers = {}
if need_token:
headers['Authorization'] = 'Bearer ' + self.token()
if method == 'get':
params['limit'] = 20
r = requests.request(method, self.root+path, json=data, headers=headers, params=params)
result = r.json()
self.last_result = result
if result is None:
raise ApiError('http call failed', r.status_code)
if 'meta' in result:
self.last_meta = result['meta']
if 'data' not in result:
error_code = sg(result, 'error.code', -1)
self.last_error = error_code
error_message = sg(result, 'error.message')
raise ApiError(error_message, error_code)
else:
self.last_error = 0
return result['data']
def register(self, faction):
callsign = self.agent.symbol
data = {
'symbol': callsign,
'faction': faction
}
result = self.request('post', 'register', data, need_token=False)
token = mg(result, 'token')
self.agent.token = token
def info(self):
data = self.request('get', 'my/agent')
self.agent.update(data)
return self.agent
def list_systems(self, page=1):
data = self.request('get', 'systems', params={'page': page})
#pprint(self.last_meta)
return self.store.update_list(System, data)
def list_waypoints(self, system):
data = self.request('get', f'systems/{system}/waypoints/')
# pprintz(self.last_meta)
return self.store.update_list(Waypoint, data)
def marketplace(self, waypoint):
system = waypoint.system()
symbol = f'{waypoint}-market'
data = self.request('get', f'systems/{system}/waypoints/{waypoint}/market')
return self.store.update(Marketplace, symbol, data)

80
nullptr/command_line.py Normal file
View File

@@ -0,0 +1,80 @@
import shlex
import inspect
import sys
import importlib
import logging
def func_supports_argcount(f, cnt):
argspec = inspect.getargspec(f)
posargs = 0 if argspec.args is None else len(argspec.args)
if argspec.args[0] == 'self':
posargs = posargs - 1
defargs = 0 if argspec.defaults is None else len(argspec.defaults)
varargs = 0 if argspec.varargs is None else len(argspec.varargs)
minargs = posargs - defargs
maxargs = posargs
if cnt < minargs:
return False
if cnt > maxargs:
return varargs > 0
return True
class CommandLine:
def __init__(self):
self.reloading = False
self.stopping = False
def stop(self):
self.stopping = True
def prompt(self):
return '> '
def handle_not_found(self, c, args):
print(f'command not found; {c}')
def handle_error(self, cmd, args, e):
logging.error(e, exc_info=str(type(e))=='ApiErrorp')
def handle_empty(self):
pass
def after_cmd(self):
pass
def do_quit(self):
print('byebye!')
self.stopping = True
def do_reload(self):
self.reloading = True
def handle_cmd(self, c):
if c == '':
return self.handle_empty()
args = shlex.split(c)
cmd = args.pop(0)
str_handler = f'do_{cmd}'
if not hasattr(self, str_handler):
try:
self.handle_not_found(cmd, args)
return
except Exception as e:
self.handle_error(cmd, args, e)
handler = getattr(self, str_handler)
if not func_supports_argcount(handler, len(args)):
expect_args = ', '.join(inspect.getargspec(handler).args[1:])
print('expected args: ' + expect_args)
return
try:
handler(*args)
except Exception as e:
self.handle_error(cmd, args, e)
self.after_cmd()
def run(self):
while not self.stopping and not self.reloading:
c = input(self.prompt())
self.handle_cmd(c)

98
nullptr/commander.py Normal file
View File

@@ -0,0 +1,98 @@
from nullptr.command_line import CommandLine
from nullptr.store import Store
import argparse
from nullptr.models.agent import Agent
from nullptr.models.system import System
from nullptr.models.waypoint import Waypoint
from nullptr.api import Api
from .util import *
from time import sleep
from threading import Thread
class Commander(CommandLine):
def __init__(self, store_dir='data', agent=None):
self.store_dir = store_dir
self.store = Store(store_dir)
self.agent = self.select_agent(agent)
self.api = Api(self.store, self.agent)
self.store.flush()
self.stop_auto= False
super().__init__()
def select_agent(self, agent_str):
if agent_str is not None:
return self.store.get(Agent, agent_str)
else:
agents = self.store.all('', Agent)
agent = next(agents, None)
if agent is None:
symbol = input('agent name: ')
agent = self.store.get(Agent, symbol)
return agent
def after_cmd(self):
self.store.flush()
def do_info(self):
pprint(self.api.info(), 100)
def do_register(self, faction):
self.api.register(faction.upper())
def wait_for_stop(self):
input()
self.stop_auto = True
print('stopping...')
def do_universe(self):
print('universe mode. hit enter to stop')
t = Thread(target=self.wait_for_stop)
t.daemon = True
t.start()
self.all_systems()
print('manual mode')
def all_specials(self, waypoints):
for w in waypoints:
if 'MARKETPLACE' in w.traits:
self.api.marketplace(w)
sleep(0.5)
def all_waypoints(self, systems):
for s in systems:
if self.stop_auto:
break
r = self.api.list_waypoints(s)
print(f'system {s}: {len(r)} waypoints')
sleep(0.5)
def all_systems(self):
self.stop_auto = False
data = self.api.list_systems(1)
pages = total_pages(self.api.last_meta)
print(f'page {1}: {len(data)} results')
self.all_waypoints(data)
print(f'{pages} more pages of systems')
for p in range(2, pages):
if self.stop_auto:
break
data = self.api.list_systems(p)
print(f'page {p}: {len(data)} systems')
self.all_waypoints(data)
sleep(0.5)
self.store.flush()
def do_systems(self, page=1):
r = self.api.list_systems(int(page))
pprint(self.api.last_meta)
def do_waypoints(self, system_str):
system = self.store.get(System, system_str.upper())
r = self.api.list_waypoints(system)
pprint(r)
def do_marketplace(self, waypoint_str):
waypoint = self.store.get(Waypoint, waypoint_str.upper())
r = self.api.marketplace(waypoint)

View File

21
nullptr/models/agent.py Normal file
View File

@@ -0,0 +1,21 @@
from .base import Base
class Agent(Base):
token: str = None
credits: int = 0
def update(self, d):
self.seta('credits', d)
def path(self):
return f'{self.symbol}.{self.ext()}'
@classmethod
def ext(self):
return 'agt'
def f(self, detail=1):
r = super().f(detail)
if detail >2:
r += f' c:{self.credits}'
return r

58
nullptr/models/base.py Normal file
View File

@@ -0,0 +1,58 @@
from copy import deepcopy
from dataclasses import dataclass
from nullptr.util import sg
@dataclass
class Base:
symbol: str
dirty: bool
def __init__(self, symbol, store):
self.symbol = symbol
self.store = store
self.dirty = True
def seta(self, attr, d, name=None):
if name is None:
name = attr
val = sg(d, name)
if val is not None:
setattr(self, attr, val)
def setlst(self, attr, d, name, member):
val = sg(d, name)
if val is not None:
lst = [sg(x, member) for x in val]
setattr(self, attr, lst)
def __setattr__(self, name, value):
if name != 'dirty':
self.dirty = True
super().__setattr__(name, value)
def update(self, d):
pass
def dict(self):
r = deepcopy(self.__dict__)
del r['store']
del r['dirty']
return r
def path(self):
raise NotImplementedError('path')
@classmethod
def ext(self):
raise NotImplementedError('extension')
def type(self):
return self.__class__.__name__
def __str__(self):
return self.f()
def f(self, detail=1):
r = self.symbol
if detail > 1:
r += '.' + self.ext()
return r

View File

@@ -0,0 +1,21 @@
from .base import Base
from typing import List
class Marketplace(Base):
imports:List[str] = []
exports:List[str] = []
exchange:List[str] = []
def update(self, d):
self.setlst('imports', d, 'imports', 'symbol')
self.setlst('exports', d, 'exports', 'symbol')
self.setlst('exchange', d, 'exchange', 'symbol')
@classmethod
def ext(self):
return 'mkt'
def path(self):
sector, system, symbol, _ = self.symbol.split('-')
return f'atlas/{sector}/{system[0:1]}/{system}/{symbol}.{self.ext()}'

6
nullptr/models/sector.py Normal file
View File

@@ -0,0 +1,6 @@
from .base import Base
class Sector(Base):
@classmethod
def ext(self):
return 'sct'

21
nullptr/models/system.py Normal file
View File

@@ -0,0 +1,21 @@
from .base import Base
class System(Base):
x:int = 0
y:int = 0
type:str = 'unknown'
def update(self, d):
self.seta('x', d)
self.seta('y', d)
self.seta('type', d)
@classmethod
def ext(self):
return 'stm'
def path(self):
sector, symbol = self.symbol.split('-')
return f'atlas/{sector}/{symbol[0:1]}/{symbol}.{self.ext()}'

View File

@@ -0,0 +1,30 @@
from .base import Base
from nullptr.util import *
from typing import List
class Waypoint(Base):
x:int = 0
y:int = 0
type:str = 'unknown'
traits:List[str]=[]
faction:str = ''
def update(self, d):
self.seta('x', d)
self.seta('y', d)
self.seta('type', d)
self.seta('faction', d, 'faction.symbol')
if 'traits' in d:
self.traits = [mg(t, 'symbol') for t in d['traits'] ]
@classmethod
def ext(self):
return 'way'
def path(self):
sector, system, symbol = self.symbol.split('-')
return f'atlas/{sector}/{system[0:1]}/{system}/{symbol}.{self.ext()}'
def system(self):
p = self.symbol.split('-')
return f'{p[0]}-{p[1]}'

73
nullptr/store.py Normal file
View File

@@ -0,0 +1,73 @@
from nullptr.models.base import Base
from nullptr.models.waypoint import Waypoint
from nullptr.models.sector import Sector
from nullptr.models.system import System
from nullptr.models.agent import Agent
from os.path import isfile, dirname, isdir
import os
import json
from .util import *
class Store:
def __init__(self, data_dir):
self.data_dir = data_dir
self.data = {}
def path(self, obj):
return os.path.join(self.data_dir, obj.path())
def load(self, obj):
path = self.path(obj)
if not isfile(path):
return obj
with open(path) as f:
data = json.load(f)
data['store'] = self
data['dirty'] = False
obj.__dict__ = data
def store(self, obj):
path = self.path(obj)
path_dir = dirname(path)
data = obj.dict()
if not isdir(path_dir):
os.makedirs(path_dir, exist_ok=True)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
obj.dirty = False
def get(self, typ, symbol):
obj = typ(symbol, self)
self.load(obj)
self.data[symbol] = obj
return obj
def update(self, typ, symbol, data):
obj = self.get(typ, symbol)
obj.update(data)
return obj
def update_list(self, typ, lst):
return [self.update(typ, mg(d, 'symbol'), d) for d in lst]
def all(self, path, typ):
if hasattr(path, 'path'):
path = path.path()
path = os.path.join(self.data_dir, path)
if not isdir(path):
return
ext = '.' + typ.ext()
for f in os.listdir(path):
fil = os.path.join(path, f)
if not isfile(fil):
continue
if not fil.endswith(ext):
continue
symbol = f[:-len(ext)]
yield self.get(typ, symbol)
def flush(self):
for obj in self.data.values():
if obj.dirty:
self.store(obj)

74
nullptr/util.py Normal file
View File

@@ -0,0 +1,74 @@
from datetime import datetime
from math import ceil
def must_get(d, k):
if type(k) == str:
k = k.split('.')
part = k.pop(0)
if type(d) != dict or part not in d:
raise ValueError(part + ' not found')
val = d[part]
if len(k) == 0:
return val
else:
return must_get(val, k)
mg = must_get
def should_get(d, k, default=None):
try:
return must_get(d, k)
except ValueError:
return default
sg = should_get
def find_where_eq(l, k, v):
for i in l:
if k in i and i[k] == v:
return i
return None
def all_subclasses(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)])
def pprint(d, detail=2):
print(pretty(d, detail=detail))
def pretty(d, ident=0, detail=2):
if type(d) in [int, str, float, bool]:
return str(d)
if hasattr(d, 'f'):
return d.f(detail)
r = ''
idt = ' ' * ident
if type(d) == list:
r += 'lst'
for i in d:
r += '\n' + idt + pretty(i, ident + 1, detail)
elif type(d) == dict:
r += 'map'
for k,v in d.items():
r += '\n' + idt + k + ': ' + pretty(v, ident + 1, detail)
return r
def trim(s, l):
s = s[:l]
s += ' ' * (l-len(s))
return s
def total_pages(meta):
total = mg(meta, 'total')
limit = mg(meta, 'limit')
pages = ceil(total / limit)
return pages
# >>> parse_timestamp('2023-06-02T20:34:48.293Z')
# 1685738088
def parse_timestamp(ts):
return int(datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp())
def render_timestamp(ts):
return datetime.utcfromtimestamp(ts).isoformat()