from import Store from nullptr.models.base import Base from nullptr.models.waypoint import Waypoint from nullptr.models.contract import Contract from nullptr.models.system import System from nullptr.models.survey import Survey from nullptr.models.ship import Ship from nullptr.analyzer import * from time import time from functools import partial import logging from nullptr.util import * class MissionError(Exception): pass class MissionParam: def __init__(self, cls, required=True, default=None): self.cls = cls self.required = required self.default = default def parse(self, val, store): if self.cls == str: return str(val) elif self.cls == int: return int(val) elif self.cls == list: if type(val) == str: return [i.strip() for i in val.split(',')] return val elif issubclass(self.cls, Base): if type(val) == str: data = store.get(self.cls, val) else: data = val if data is None: raise ValueError('object not found') return data.symbol else: raise ValueError('unknown param typr') class Mission: @classmethod def params(cls): return { } def __init__(self, ship, context): self.ship = ship self.c = context = self.api = context.api self.wait_for = None self.next_step = 0 self.setup() def setup(self): pass def sts(self, nm, v): if issubclass(type(v), Base): v = v.symbol self.ship.set_mission_state(nm, v) def rst(self, typ, nm): symbol = if symbol is None: return None return, symbol) def st(self, nm): if not nm in self.ship.mission_state: return None return self.ship.mission_state[nm] def status(self, nw=None): if nw is None: return self.ship.mission_status else: steps = self.steps() if nw in ['init','done', 'error']: self.ship.mission_status = nw return elif nw not in steps: self.ship.log(f"Invalid mission status {nw}", 1) self.ship.mission_status = 'error' return wait_for = steps[nw][2] if len(steps[nw]) > 2 else None self.wait_for = wait_for self.ship.mission_status = nw def start_state(self): return 'done' def error(self, msg): self.status('error') print(msg) def init_state(self): for name, param in self.params().items(): if param.required and param.default is None: if not name in self.ship.mission_state: return self.error(f'Param {name} not set') self.status(self.start_state()) def steps(self): return { } def step_done(self): self.ship.log(f'mission {type(self).__name__} finished with balance {self.balance()}', 3) def get_prio(self): if self.next_step > time() or self.ship.cooldown > time() or self.ship.arrival > time(): return 0 if self.wait_for is not None: p = int(self.wait_for()) if p > 0: self.wait_for = None return p return 3 def is_finished(self): return self.status() in ['done','error'] def is_ready(self): if self.is_finished(): return 0 return self.get_prio() def step(self): steps = self.steps() if self.status() == 'init': self.init_state() status = self.status() if not status in steps: self.ship.log(f"Invalid mission status {status}", 1) self.status('error') return handler = steps[status][0] next_step = steps[status][1] try: result = handler() except Exception as e: self.ship.log(fmtex(e)) self.ship.log(self.api.last_result) self.status('error') return if type(next_step) == str: self.status(next_step) elif type(next_step) == dict: if result not in next_step: self.ship.log(f'Invalid step result {result}', 1) self.status('error') return else: if result is None: result='' self.status(next_step[result]) self.ship.log(f'{status} {result} -> {self.status()}', 8) class BaseMission(Mission): def balance(self, amt=0): if type(amt) == dict: amt = self.api.transaction_cost(amt) balance ='balance') if balance is None: balance = 0 balance += amt self.sts('balance', balance) return balance def step_pass(self): pass def step_go_dest(self): destination = self.rst(Waypoint, 'destination') if self.ship.location() == destination: return self.api.navigate(self.ship, destination) self.next_step = self.ship.arrival def step_go_site(self): site = self.rst(Waypoint,'site') if self.ship.location() == site: return self.api.navigate(self.ship, site) self.next_step = self.ship.arrival def step_market(self): loc = self.ship.location self.api.marketplace(loc) def step_unload(self): delivery ='delivery') if delivery == 'sell': return self.step_sell(False) contract = self.rst(Contract, 'contract') typs = self.ship.deliverable_cargo(contract) if len(typs) == 0: return 'done' self.api.deliver(self.ship, typs[0], contract) if len(typs) == 1: return 'done' else: return 'more' def step_sell(self, except_resource=True): market ='Marketplace', self.ship.location.symbol) sellables = market.sellable_items(self.ship.cargo.keys()) if len(sellables) == 0: return 'done' resource = sellables[0] volume = market.volume(resource) amt_cargo = self.ship.get_cargo(resource) amount = min(amt_cargo, volume) res = self.api.sell(self.ship, resource, amount) self.balance(res) if len(sellables) == 1 and amt_cargo == amount: return 'done' else: return 'more' def step_travel(self): traject ='traject') if traject is None or traject == []: return dest = traject[-1] loc = self.ship.location hop = traject.pop(0) if type(hop) == Waypoint: self.api.navigate(self.ship, hop) self.next_step = self.ship.arrival else: self.api.jump(self.ship, hop) self.next_step = self.ship.cooldown self.sts('traject', traject) def step_navigate_traject(self): traject ='traject') loc = self.ship.location if traject is None or traject == []: return 'done' dest =traject[-1] if dest == loc: return 'done' return 'more' def step_calculate_traject(self, dest): if type(dest) == str: dest =, dest) loc = self.ship.location loc_sys = loc.system loc_jg = get_jumpgate(self.c, loc_sys) loc_jg_wp =, loc_jg.symbol) dest_sys = dest.system dest_jg = get_jumpgate(self.c, dest_sys) if dest_sys == loc_sys: result = find_nav_path(self.c, loc, dest, self.ship.range()) self.sts('traject', result) return 'done' if len(result) == 0 else 'more' path = find_jump_path(self.c, loc_sys, dest_sys) result = [] if loc.symbol != loc_jg.symbol: result.append(loc_jg_wp) result += [s for s in path[1:]] if dest_jg.symbol != dest.symbol: result.append(dest) self.sts('traject', result) print(result) return 'more' def step_dock(self): if self.ship.status == 'DOCKED': return self.api.dock(self.ship) def step_refuel(self): if self.ship.fuel_capacity == 0: return #if self.ship.fuel_capacity - self.ship.fuel_current > 100: try: self.api.refuel(self.ship) except Exception as e: pass def step_orbit(self): if self.ship.status != 'DOCKED': return self.api.orbit(self.ship) def travel_steps(self, nm, destination, next_step): destination = calc = partial(self.step_calculate_traject, destination) steps = { f'travel-{nm}': (calc, { 'more': f'dock-{nm}', 'done': next_step }), f'dock-{nm}': (self.step_dock, f'refuel-{nm}'), f'refuel-{nm}': (self.step_refuel, f'orbit-{nm}'), f'orbit-{nm}': (self.step_orbit, f'go-{nm}'), f'go-{nm}': (self.step_travel, f'nav-{nm}'), f'nav-{nm}': (self.step_navigate_traject, { 'done': next_step, 'more': f'dock-{nm}' }) } if self.ship.fuel_capacity == 0: steps = { f'travel-{nm}': (calc, f'go-{nm}'), f'go-{nm}': (self.step_travel, f'nav-{nm}'), f'nav-{nm}': (self.step_navigate_traject, { 'done': next_step, 'more': f'go-{nm}' }), } return steps