0ptr/nullptr/commander.py
2023-06-14 21:20:47 +02:00

97 lines
2.9 KiB
Python

from nullptr.command_line import CommandLine
from nullptr.store import Store
from nullptr.analyzer import Analyzer
import argparse
from nullptr.models.agent import Agent
from nullptr.models.system import System
from nullptr.models.waypoint import Waypoint
from nullptr.models.marketplace import Marketplace
from nullptr.models.jumpgate import Jumpgate
from nullptr.api import Api
from .util import *
from time import sleep, time
from threading import Thread
from nullptr.atlas_builder import AtlasBuilder
class Commander(CommandLine):
def __init__(self, store_dir='data', agent=None):
self.store_dir = store_dir
self.store = Store(store_dir)
self.store.load()
self.agent = self.select_agent(agent)
self.api = Api(self.store, self.agent)
self.atlas_builder = AtlasBuilder(self.store, self.api)
self.analyzer = Analyzer(self.store)
self.stop_auto= False
super().__init__()
def ask_obj(self, typ, prompt):
obj = None
while obj is None:
symbol = input(prompt)
obj = self.store.get(typ, symbol.upper())
if obj is None:
print('not found')
return obj
def select_agent(self, agent_str):
if agent_str is not None:
return self.store.get(Agent, agent_str)
else:
agents = self.store.all(Agent)
agent = next(agents, None)
if agent is None:
symbol = input('agent name: ')
agent = self.store.get(Agent, symbol)
return agent
def after_cmd(self):
self.store.flush()
def do_info(self):
pprint(self.api.info(), 100)
def do_register(self, faction):
self.api.register(faction.upper())
def do_universe(self, page=1):
self.atlas_builder.run(page)
def do_systems(self, page=1):
r = self.api.list_systems(int(page))
pprint(self.api.last_meta)
def do_waypoints(self, system_str):
system = self.store.get(System, system_str.upper())
r = self.api.list_waypoints(system)
pprint(r)
def do_marketplace(self, waypoint_str):
waypoint = self.store.get(Waypoint, waypoint_str.upper())
r = self.api.marketplace(waypoint)
def do_jumps(self, waypoint_str):
waypoint = self.store.get(Waypoint, waypoint_str.upper())
r = self.api.jumps(waypoint)
pprint(r)
def do_query(self):
location = self.ask_obj(System, 'Where are you? ')
resource = input('what resource?').upper()
sellbuy = self.ask_multichoice(['sell','buy'], 'do you want to sell or buy?')
print('Found markets:')
for m in self.analyzer.find_markets(resource, sellbuy):
system = self.store.get(System, m.system())
p = self.analyzer.find_path(location, system)
if p is None: continue
print(m, f'{len(p)-1} hops')
def do_path(self):
orig = self.ask_obj(System, 'from: ')
dest = self.ask_obj(System, 'to: ')
# orig = self.store.get(System, 'X1-KS52')
# dest = self.store.get(System, 'X1-DA90')
path = self.analyzer.find_path(orig, dest)
pprint(path)