0ptr/nullptr/missions/base.py
2023-07-16 20:50:30 +02:00

256 lines
6.8 KiB
Python

from nullptr.store import Store
from nullptr.models.base import Base
from nullptr.models.waypoint import Waypoint
from nullptr.models.contract import Contract
from nullptr.models.system import System
from nullptr.models.survey import Survey
from nullptr.models.ship import Ship
from nullptr.analyzer import Analyzer
from time import time
from functools import partial
import logging
from nullptr.util import *
class MissionError(Exception):
pass
class MissionParam:
def __init__(self, cls, required=True, default=None):
self.cls = cls
self.required = required
self.default = default
def parse(self, val, store):
if self.cls == str:
return str(val)
elif self.cls == int:
return int(val)
elif self.cls == list:
if type(val) == str:
return [i.strip() for i in val.split(',')]
return val
elif issubclass(self.cls, Base):
if type(val) == str:
data = store.get(self.cls, val)
else:
data = val
if data is None:
raise ValueError('object not found')
return data.symbol
else:
raise ValueError('unknown param typr')
class Mission:
@classmethod
def params(cls):
return {
}
def __init__(self, ship, store, api):
self.ship = ship
self.store = store
self.api = api
self.next_step = 0
self.analyzer = Analyzer(self.store)
def sts(self, nm, v):
if issubclass(type(v), Base):
v = v.symbol
self.ship.set_mission_state(nm, v)
def rst(self, typ, nm):
symbol = self.st(nm)
return self.store.get(typ, symbol)
def st(self, nm):
if not nm in self.ship.mission_state:
return None
return self.ship.mission_state[nm]
def status(self, nw=None):
if nw is None:
return self.ship.mission_status
else:
self.ship.mission_status = nw
def start_state(self):
return 'done'
def error(self, msg):
self.status('error')
print(msg)
def init_state(self):
for name, param in self.params().items():
if param.required and param.default is None:
if not name in self.ship.mission_state:
return self.error(f'Param {name} not set')
self.status(self.start_state())
def steps(self):
return {
}
def step_done(self):
logging.info(f'mission finished for {self.ship}')
def is_waiting(self):
return self.next_step > time()
def is_finished(self):
return self.status() in ['done','error']
def is_ready(self):
return not self.is_waiting() and not self.is_finished()
def step(self):
steps = self.steps()
if self.status() == 'init':
self.init_state()
status = self.status()
if not status in steps:
logging.warning(f"Invalid mission status {status}")
self.status('error')
return
handler, next_step = steps[status]
try:
result = handler()
except Exception as e:
logging.error(e, exc_info=True)
self.status('error')
return
if type(next_step) == str:
self.status(next_step)
elif type(next_step) == dict:
if result not in next_step:
logging.warning(f'Invalid step result {result}')
self.status('error')
return
else:
self.status(next_step[result])
print(f'{self.ship} {status} -> {self.status()}')
class BaseMission(Mission):
def step_go_dest(self):
destination = self.rst(Waypoint, 'destination')
if self.ship.location() == destination:
return
self.api.navigate(self.ship, destination)
self.next_step = self.ship.arrival
def step_go_site(self):
site = self.rst(Waypoint,'site')
if self.ship.location() == site:
return
self.api.navigate(self.ship, site)
self.next_step = self.ship.arrival
def step_unload(self):
contract = self.rst(Contract, 'contract')
delivery = self.st('delivery')
if delivery == 'sell':
return self.step_sell(False)
typs = self.ship.deliverable_cargo(contract)
if len(typs) == 0:
return 'done'
self.api.deliver(self.ship, typs[0], contract)
if len(typs) == 1:
return 'done'
else:
return 'more'
def step_sell(self, except_resource=True):
target = self.st('resource')
market = self.store.get('Marketplace', self.ship.location.symbol)
sellables = market.sellable_items(self.ship.cargo.keys())
if target in sellables and except_resource:
sellables.remove(target)
if len(sellables) == 0:
return 'done'
self.api.sell(self.ship, sellables[0])
if len(sellables) == 1:
return 'done'
else:
return 'more'
def step_load(self):
cargo_space = self.ship.cargo_capacity - self.ship.cargo_units
resource = self.st('resource')
self.api.buy(self.ship, resource, cargo_space)
def step_travel(self):
traject = self.st('traject')
if traject is None or traject == []:
return 'done'
dest = traject[-1]
loc = self.ship.location
if dest == loc:
self.sts('traject', None)
return 'done'
hop = traject.pop(0)
if type(hop) == Waypoint:
self.api.navigate(self.ship, hop)
self.next_step = self.ship.arrival
else:
self.api.jump(self.ship, hop)
self.next_step = self.ship.cooldown
if traject == []:
traject= None
self.sts('traject', traject)
return 'more'
def step_calculate_traject(self, dest):
if type(dest) == str:
dest = self.store.get(Waypoint, dest)
loc = self.ship.location
loc_sys = loc.system
loc_jg = self.analyzer.get_jumpgate(loc_sys)
loc_jg_wp = self.store.get(Waypoint, loc_jg.symbol)
dest_sys = dest.system
dest_jg = self.analyzer.get_jumpgate(dest_sys)
if dest_sys == loc_sys:
result = [dest]
self.sts('traject', result)
return
path = self.analyzer.find_path(loc_sys, dest_sys)
result = []
if loc.symbol != loc_jg.symbol:
result.append(loc_jg_wp)
result += [s for s in path[1:]]
if dest_jg.symbol != dest.symbol:
result.append(dest)
self.sts('traject', result)
print(result)
return result
def step_dock(self):
self.api.dock(self.ship)
def step_refuel(self):
if self.ship.fuel_capacity == 0:
return
if self.ship.fuel_current / self.ship.fuel_capacity < 0.5:
try:
self.api.refuel(self.ship)
except Exception as e:
pass
def step_orbit(self):
self.api.orbit(self.ship)
def travel_steps(self, nm, destination, next_step):
destination = self.st(destination)
calc = partial(self.step_calculate_traject, destination)
return {
f'travel-{nm}': (self.step_orbit, f'calc-trav-{nm}'),
f'calc-trav-{nm}': (calc, f'go-{nm}'),
f'go-{nm}': (self.step_travel, {
'done': f'dock-{nm}',
'more': f'go-{nm}'
}),
f'dock-{nm}': (self.step_dock, f'refuel-{nm}'),
f'refuel-{nm}': (self.step_refuel, next_step)
}