0ptr/nullptr/missions/base.py
Richard Bronkhorst 11031599cf Update base.py
2023-06-26 05:48:19 +02:00

249 lines
6.7 KiB
Python

from nullptr.store import Store
from nullptr.models.base import Base
from nullptr.models.waypoint import Waypoint
from nullptr.models.contract import Contract
from nullptr.models.system import System
from nullptr.models.survey import Survey
from nullptr.models.ship import Ship
from nullptr.analyzer import Analyzer
from time import time
from functools import partial
import logging
from nullptr.util import *
class MissionError(Exception):
pass
class MissionParam:
def __init__(self, cls, required=True, default=None):
self.cls = cls
self.required = required
self.default = default
def parse(self, val, store):
if self.cls == str:
return str(val)
elif self.cls == int:
return int(val)
elif self.cls == list:
return [i.strip() for i in val.split(',')]
elif issubclass(self.cls, Base):
data = store.get(self.cls, val)
if data is None:
raise ValueError('object not found')
return data.symbol
else:
raise ValueError('unknown param typr')
class Mission:
@classmethod
def params(cls):
return {
}
def __init__(self, ship, store, api):
self.ship = ship
self.store = store
self.api = api
self.next_step = 0
self.analyzer = Analyzer(self.store)
def sts(self, nm, v):
if issubclass(type(v), Base):
v = v.symbol
self.ship.set_mission_state(nm, v)
def rst(self, typ, nm):
symbol = self.st(nm)
return self.store.get(typ, symbol)
def st(self, nm):
if not nm in self.ship.mission_state:
return None
return self.ship.mission_state[nm]
def status(self, nw=None):
if nw is None:
return self.ship.mission_status
else:
self.ship.mission_status = nw
def start_state(self):
return 'done'
def error(self, msg):
self.status('error')
print(msg)
def init_state(self):
for name, param in self.params().items():
if param.required and param.default is None:
if not name in self.ship.mission_state:
return self.error(f'Param {name} not set')
self.status(self.start_state())
def steps(self):
return {
}
def step_done(self):
logging.info(f'mission finished for {self.ship}')
def is_waiting(self):
return self.next_step > time()
def is_finished(self):
return self.status() in ['done','error']
def is_ready(self):
return not self.is_waiting() and not self.is_finished()
def step(self):
steps = self.steps()
if self.status() == 'init':
self.init_state()
status = self.status()
if not status in steps:
logging.warning(f"Invalid mission status {status}")
self.status('error')
return
handler, next_step = steps[status]
try:
result = handler()
except Exception as e:
logging.error(e, exc_info=True)
self.status('error')
return
if type(next_step) == str:
self.status(next_step)
elif type(next_step) == dict:
if result not in next_step:
logging.warning(f'Invalid step result {result}')
self.status('error')
return
else:
self.status(next_step[result])
print(f'{self.ship} {status} -> {self.status()}')
class BaseMission(Mission):
def step_go_dest(self):
destination = self.rst(Waypoint, 'destination')
if self.ship.location() == destination:
return
self.api.navigate(self.ship, destination)
self.next_step = self.ship.arrival
def step_go_site(self):
site = self.rst(Waypoint,'site')
if self.ship.location() == site:
return
self.api.navigate(self.ship, site)
self.next_step = self.ship.arrival
def step_unload(self):
contract = self.rst(Contract, 'contract')
delivery = self.st('delivery')
if delivery == 'sell':
return self.step_sell(False)
typs = self.ship.deliverable_cargo(contract)
if len(typs) == 0:
return 'done'
self.api.deliver(self.ship, typs[0], contract)
if len(typs) == 1:
return 'done'
else:
return 'more'
def step_sell(self, except_resource=True):
target = self.st('resource')
market = self.store.get('Marketplace', self.ship.location_str)
sellables = market.sellable_items(self.ship.cargo.keys())
if target in sellables and except_resource:
sellables.remove(target)
if len(sellables) == 0:
return 'done'
self.api.sell(self.ship, sellables[0])
if len(sellables) == 1:
return 'done'
else:
return 'more'
def step_load(self):
cargo_space = self.ship.cargo_capacity - self.ship.cargo_units
resource = self.st('resource')
self.api.buy(self.ship, resource, cargo_space)
def step_travel(self):
traject = self.st('traject')
if traject is None or traject == []:
return 'done'
dest = self.store.get(Waypoint, traject[-1])
loc = self.ship.location()
print(dest, loc)
if dest == loc:
self.sts('traject', None)
return 'done'
hop = traject.pop(0)
if len(hop.split('-')) == 3:
self.api.navigate(self.ship, hop)
self.next_step = self.ship.arrival
else:
self.api.jump(self.ship, hop)
self.next_step = self.ship.cooldown
if traject == []:
traject= None
self.sts('traject', traject)
return 'more'
def step_calculate_traject(self, dest):
if type(dest) == str:
dest = self.store.get(Waypoint, dest)
loc = self.ship.location()
loc_sys = self.store.get(System, loc.system())
loc_jg = self.analyzer.get_jumpgate(loc_sys)
dest_sys = self.store.get(System, dest.system())
dest_jg = self.analyzer.get_jumpgate(dest_sys)
if dest_sys == loc_sys:
result = [dest.symbol]
self.sts('traject', result)
return
path = self.analyzer.find_path(loc_sys, dest_sys)
result = []
if loc.symbol != loc_jg.symbol:
result.append(loc_jg.symbol)
result += [s.symbol for s in path[1:]]
if dest_jg.symbol != dest.symbol:
result.append(dest.symbol)
self.sts('traject', result)
print(result)
return result
def step_dock(self):
self.api.dock(self.ship)
def step_refuel(self):
if self.ship.fuel_current < 100:
try:
self.api.refuel(self.ship)
except Exception as e:
pass
def step_orbit(self):
self.api.orbit(self.ship)
def travel_steps(self, nm, destination, next_step):
destination = self.st(destination)
calc = partial(self.step_calculate_traject, destination)
return {
f'travel-{nm}': (self.step_orbit, f'calc-trav-{nm}'),
f'calc-trav-{nm}': (calc, f'go-{nm}'),
f'go-{nm}': (self.step_travel, {
'done': f'dock-{nm}',
'more': f'go-{nm}'
}),
f'dock-{nm}': (self.step_dock, f'refuel-{nm}'),
f'refuel-{nm}': (self.step_refuel, next_step)
}