0ptr/nullptr/missions/base.py

242 lines
6.5 KiB
Python
Raw Normal View History

from nullptr.store import Store
from nullptr.models.base import Base
2023-06-17 18:18:14 +00:00
from nullptr.models.waypoint import Waypoint
from nullptr.models.contract import Contract
from nullptr.models.system import System
2023-06-17 18:18:14 +00:00
from nullptr.models.survey import Survey
from nullptr.models.ship import Ship
from nullptr.analyzer import Analyzer
2023-06-17 18:18:14 +00:00
from time import time
2023-06-22 12:56:51 +00:00
from functools import partial
2023-06-17 18:18:14 +00:00
import logging
from nullptr.util import *
2023-06-17 18:18:14 +00:00
class MissionError(Exception):
pass
2023-06-17 18:18:14 +00:00
class MissionParam:
def __init__(self, cls, required=True, default=None):
self.cls = cls
self.required = required
self.default = default
def parse(self, val, store):
2023-06-17 18:18:14 +00:00
if self.cls == str:
return str(val)
elif self.cls == int:
return int(val)
elif issubclass(self.cls, Base):
2023-06-17 18:18:14 +00:00
data = store.get(self.cls, val)
if data is None:
raise ValueError('object not found')
return data.symbol
2023-06-17 18:18:14 +00:00
else:
raise ValueError('unknown param typr')
class Mission:
@classmethod
def params(cls):
return {
}
def __init__(self, ship, store, api):
2023-06-17 18:18:14 +00:00
self.ship = ship
self.store = store
2023-06-17 18:18:14 +00:00
self.api = api
self.next_step = 0
self.analyzer = Analyzer(self.store)
2023-06-17 18:18:14 +00:00
def sts(self, nm, v):
if issubclass(type(v), Base):
v = v.symbol
self.ship.set_mission_state(nm, v)
2023-06-17 18:18:14 +00:00
def rst(self, typ, nm):
symbol = self.st(nm)
return self.store.get(typ, symbol)
2023-06-17 18:18:14 +00:00
def st(self, nm):
if not nm in self.ship.mission_state:
return None
return self.ship.mission_state[nm]
def status(self, nw=None):
if nw is None:
return self.ship.mission_status
else:
self.ship.mission_status = nw
def start_state(self):
return 'done'
def error(self, msg):
self.status('error')
print(msg)
def init_state(self):
for name, param in self.params().items():
if param.required and param.default is None:
if not name in self.ship.mission_state:
return self.error(f'Param {name} not set')
self.status(self.start_state())
def steps(self):
return {
}
def step_done(self):
logging.info(f'mission finished for {self.ship}')
def is_waiting(self):
return self.next_step > time()
def is_finished(self):
return self.status() in ['done','error']
def is_ready(self):
return not self.is_waiting() and not self.is_finished()
def step(self):
steps = self.steps()
if self.status() == 'init':
self.init_state()
status = self.status()
if not status in steps:
logging.warning(f"Invalid mission status {status}")
self.status('error')
return
handler, next_step = steps[status]
try:
result = handler()
except Exception as e:
logging.error(e, exc_info=True)
2023-06-17 18:18:14 +00:00
self.status('error')
return
if type(next_step) == str:
self.status(next_step)
elif type(next_step) == dict:
if result not in next_step:
logging.warning(f'Invalid step result {result}')
self.status('error')
return
else:
self.status(next_step[result])
print(f'{self.ship} {status} -> {self.status()}')
class BaseMission(Mission):
def step_go_dest(self):
destination = self.rst(Waypoint, 'destination')
if self.ship.location() == destination:
return
self.api.navigate(self.ship, destination)
self.next_step = self.ship.arrival
def step_go_site(self):
site = self.rst(Waypoint,'site')
if self.ship.location() == site:
return
self.api.navigate(self.ship, site)
self.next_step = self.ship.arrival
def step_unload(self):
contract = self.rst(Contract, 'contract')
delivery = self.st('delivery')
if delivery == 'sell':
return self.step_sell(False)
typs = self.ship.deliverable_cargo(contract)
if len(typs) == 0:
return 'done'
self.api.deliver(self.ship, typs[0], contract)
if len(typs) == 1:
return 'done'
else:
return 'more'
2023-06-17 18:18:14 +00:00
def step_sell(self, except_resource=True):
target = self.st('resource')
market = self.store.get('Marketplace', self.ship.location_str)
sellables = market.sellable_items(self.ship.cargo.keys())
if target in sellables and except_resource:
sellables.remove(target)
if len(sellables) == 0:
return 'done'
self.api.sell(self.ship, sellables[0])
if len(sellables) == 1:
return 'done'
else:
return 'more'
def step_load(self):
cargo_space = self.ship.cargo_capacity - self.ship.cargo_units
resource = self.st('resource')
self.api.buy(self.ship, resource, cargo_space)
2023-06-22 12:56:51 +00:00
def step_travel(self):
traject = self.st('traject')
2023-06-22 12:56:51 +00:00
if traject is None or traject == []:
return 'done'
dest = self.store.get(Waypoint, traject[-1])
loc = self.ship.location()
if dest == loc:
self.sts('traject', None)
return 'done'
hop = traject.pop(0)
if len(hop.split('-')) == 3:
self.api.navigate(self.ship, hop)
self.next_step = self.ship.arrival
else:
self.api.jump(self.ship, hop)
self.next_step = self.ship.cooldown
2023-06-20 21:12:57 +00:00
if traject == []:
traject= None
self.sts('traject', traject)
return 'more'
2023-06-22 12:56:51 +00:00
def step_calculate_traject(self, dest):
if type(dest) == str:
dest = self.store.get(Waypoint, dest)
loc = self.ship.location()
loc_sys = self.store.get(System, loc.system())
loc_jg = self.analyzer.get_jumpgate(loc_sys)
dest_sys = self.store.get(System, dest.system())
dest_jg = self.analyzer.get_jumpgate(dest_sys)
path = self.analyzer.find_path(loc_sys, dest_sys)
result = []
if loc.symbol != loc_jg.symbol:
result.append(loc_jg.symbol)
result += [s.symbol for s in path[1:]]
if dest_jg.symbol != dest.symbol:
result.append(dest.symbol)
2023-06-22 12:56:51 +00:00
self.sts('traject', result)
print(result)
return result
def step_dock(self):
self.api.dock(self.ship)
def step_refuel(self):
if self.ship.fuel_current < 100:
2023-06-22 12:56:51 +00:00
try:
self.api.refuel(self.ship)
except Exception as e:
pass
def step_orbit(self):
self.api.orbit(self.ship)
2023-06-22 12:56:51 +00:00
def travel_steps(self, nm, destination, next_step):
destination = self.st(destination)
calc = partial(self.step_calculate_traject, destination)
return {
f'travel-{nm}': (self.step_orbit, f'calc-trav-{nm}'),
f'calc-trav-{nm}': (calc, f'go-{nm}'),
f'go-{nm}': (self.step_travel, {
'done': f'dock-{nm}',
'more': f'go-{nm}'
}),
f'dock-{nm}': (self.step_dock, f'refuel-{nm}'),
f'refuel-{nm}': (self.step_refuel, next_step)
}