This commit is contained in:
Richard Bronkhorst 2023-06-10 19:39:32 +02:00
parent eeb063f307
commit cea10ae07c
11 changed files with 229 additions and 16 deletions

56
api.py Normal file
View File

@ -0,0 +1,56 @@
import requests
from util import *
class ApiError(Exception):
def __init__(self, msg, code):
super().__init__(msg)
self.code = code
class Api:
def __init__(self, store, agent):
self.agent = agent
self.store = store
self.root = 'https://api.spacetraders.io/v2/'
def token(self):
if self.agent.token is None:
raise ApiError('no token. Please register', 1337)
return self.agent.token
def request(self, method, path, data=None, need_token=True):
headers = {}
params = {}
if need_token:
headers['Authorization'] = 'Bearer ' + self.token()
if method == 'get':
params['limit'] = 20
r = requests.request(method, self.root+path, json=data, headers=headers, params=params)
result = r.json()
self.last_result = result
if result is None:
raise ApiError('http call failed', r.status_code)
elif 'data' not in result:
error_code = sg(result, 'error.code', -1)
self.last_error = error_code
error_message = sg(result, 'error.message')
raise ApiError(error_message, error_code)
else:
self.last_error = 0
return result['data']
def register(self, faction):
callsign = self.agent.symbol
data = {
'symbol': callsign,
'faction': faction
}
result = self.request('post', 'register', data, need_token=False)
token = mg(result, 'token')
self.agent.token = token
def info(self):
data = self.request('get', 'my/agent')
self.agent.update(data)
return self.agent

View File

@ -2,6 +2,7 @@ import shlex
import inspect import inspect
import sys import sys
import importlib import importlib
import logging
def func_supports_argcount(f, cnt): def func_supports_argcount(f, cnt):
argspec = inspect.getargspec(f) argspec = inspect.getargspec(f)
@ -34,11 +35,14 @@ class CommandLine:
print(f'command not found; {c}') print(f'command not found; {c}')
def handle_error(self, cmd, args, e): def handle_error(self, cmd, args, e):
print(e) logging.error(e, exc_info=str(type(e))=='ApiError')
def handle_empty(self): def handle_empty(self):
pass pass
def after_cmd(self):
pass
def do_quit(self): def do_quit(self):
print('byebye!') print('byebye!')
self.stopping = True self.stopping = True
@ -67,6 +71,7 @@ class CommandLine:
handler(*args) handler(*args)
except Exception as e: except Exception as e:
self.handle_error(cmd, args, e) self.handle_error(cmd, args, e)
self.after_cmd()
def run(self): def run(self):
while not self.stopping and not self.reloading: while not self.stopping and not self.reloading:

View File

@ -1,23 +1,51 @@
from store import Store from store import Store
from command_line import CommandLine from command_line import CommandLine
import argparse import argparse
from models.agent import Agent
from api import Api
from util import *
class Commander(CommandLine): class Commander(CommandLine):
def __init__(self, store_dir): def __init__(self, store_dir='data', agent=None):
self.store_dir = store_dir self.store_dir = store_dir
self.store = Store(store_dir) self.store = Store(store_dir)
self.agent = self.select_agent(agent)
self.api = Api(self.store, self.agent)
self.store.flush()
super().__init__() super().__init__()
def select_agent(self, agent_str):
if agent_str is not None:
return self.store.get(Agent, agent_str)
else:
agents = self.store.all('', Agent)
agent = next(agents, None)
if agent is None:
symbol = input('agent name: ')
agent = self.store.get(Agent, symbol)
return agent
def after_cmd(self):
self.store.flush()
def do_foo(self): def do_foo(self):
self.store.foo() self.store.foo()
self.store.flush() self.store.flush()
def do_info(self):
pprint(self.api.info(), 100)
def do_register(self, faction):
self.api.register(faction.upper())
def main(args): def main(args):
c = Commander(args.store_dir) c = Commander(args.store_dir, args.agent)
c.run() c.run()
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-s', '--store-dir', default='data') parser.add_argument('-s', '--store-dir', default='data')
parser.add_argument('-a', '--agent', default=None)
args = parser.parse_args() args = parser.parse_args()
main(args) main(args)

21
models/agent.py Normal file
View File

@ -0,0 +1,21 @@
from .base import Base
class Agent(Base):
token: str = None
credits: int = 0
def update(self, d):
self.seta(d, 'credits')
def path(self):
return f'{self.symbol}.{self.ext()}'
@classmethod
def ext(self):
return 'agt'
def f(self, detail=1):
r = super().f(detail)
if detail >2:
r += f' c:{self.credits}'
return r

View File

@ -1,13 +1,28 @@
from copy import deepcopy from copy import deepcopy
from dataclasses import dataclass
@dataclass
class Base: class Base:
symbol: str symbol: str
dirty: bool
def __init__(self, symbol, store): def __init__(self, symbol, store):
self.symbol = symbol self.symbol = symbol
self.store = store self.store = store
self.dirty = True self.dirty = True
def seta(self, d, name):
if name in d:
setattr(self, name, d[name])
def __setattr__(self, name, value):
if name != 'dirty':
self.dirty = True
super().__setattr__(name, value)
def update(self, d):
pass
def dict(self): def dict(self):
r = deepcopy(self.__dict__) r = deepcopy(self.__dict__)
del r['store'] del r['store']
@ -17,6 +32,7 @@ class Base:
def path(self): def path(self):
raise NotImplementedError('path') raise NotImplementedError('path')
@classmethod
def ext(self): def ext(self):
raise NotImplementedError('extension') raise NotImplementedError('extension')
@ -24,4 +40,10 @@ class Base:
return self.__class__.__name__ return self.__class__.__name__
def __str__(self): def __str__(self):
return f'{self.symbol}.{self.ext()}' return self.f()
def f(self, detail=1):
r = self.symbol
if detail > 1:
r += '.' + self.ext()
return r

View File

@ -1,5 +1,6 @@
from .base import Base from .base import Base
class Sector(Base): class Sector(Base):
@classmethod
def ext(self): def ext(self):
return 'sct' return 'sct'

View File

@ -1,8 +0,0 @@
from .base import Base
class Setting(Base):
name: str
value: str
def ext(self):
return 'set'

View File

@ -3,6 +3,7 @@ from .base import Base
class System(Base): class System(Base):
@classmethod
def ext(self): def ext(self):
return 'stm' return 'stm'

View File

@ -2,5 +2,6 @@ from .base import Base
class Waypoint(Base): class Waypoint(Base):
@classmethod
def ext(self): def ext(self):
return 'way' return 'way'

View File

@ -2,7 +2,7 @@ from models.base import Base
from models.waypoint import Waypoint from models.waypoint import Waypoint
from models.sector import Sector from models.sector import Sector
from models.system import System from models.system import System
from models.setting import Setting from models.agent import Agent
from os.path import isfile, dirname, isdir from os.path import isfile, dirname, isdir
import os import os
import json import json
@ -22,6 +22,7 @@ class Store:
with open(path) as f: with open(path) as f:
data = json.load(f) data = json.load(f)
data['store'] = self data['store'] = self
data['dirty'] = False
obj.__dict__ = data obj.__dict__ = data
def store(self, obj): def store(self, obj):
@ -40,9 +41,27 @@ class Store:
self.data[symbol] = obj self.data[symbol] = obj
return obj return obj
def all(self, path, typ):
if hasattr(path, 'path'):
path = path.path()
path = os.path.join(self.data_dir, path)
if not isdir(path):
return
ext = '.' + typ.ext()
for f in os.listdir(path):
fil = os.path.join(path, f)
if not isfile(fil):
continue
if not fil.endswith(ext):
continue
symbol = f[:-len(ext)]
yield self.get(typ, symbol)
def flush(self): def flush(self):
for obj in self.data.values(): for obj in self.data.values():
self.store(obj) if obj.dirty:
self.store(obj)
def foo(self): def foo(self):
s = self.get(System, 'dez-hq14') s = self.get(System, 'dez-hq14')

67
util.py Normal file
View File

@ -0,0 +1,67 @@
from datetime import datetime
def must_get(d, k):
if type(k) == str:
k = k.split('.')
part = k.pop(0)
if type(d) != dict or part not in d:
raise ValueError(part + ' not found')
val = d[part]
if len(k) == 0:
return val
else:
return must_get(val, k)
mg = must_get
def should_get(d, k, default=None):
try:
return must_get(d, k)
except ValueError:
return default
sg = should_get
def find_where_eq(l, k, v):
for i in l:
if k in i and i[k] == v:
return i
return None
def all_subclasses(cls):
return set(cls.__subclasses__()).union(
[s for c in cls.__subclasses__() for s in all_subclasses(c)])
def pprint(d, detail=2):
print(pretty(d, detail=detail))
def pretty(d, ident=0, detail=2):
if type(d) in [int, str, float, bool]:
return str(d)
if hasattr(d, 'f'):
return d.f(detail)
r = ''
idt = ' ' * ident
if type(d) == list:
r += 'lst'
for i in d:
r += '\n' + idt + pretty(i, ident + 1, detail)
elif type(d) == dict:
r += 'map'
for k,v in d.items():
r += '\n' + idt + k + ': ' + pretty(v, ident + 1, detail)
return r
def trim(s, l):
s = s[:l]
s += ' ' * (l-len(s))
return s
# >>> parse_timestamp('2023-06-02T20:34:48.293Z')
# 1685738088
def parse_timestamp(ts):
return int(datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp())
def render_timestamp(ts):
return datetime.utcfromtimestamp(ts).isoformat()