0ptr/nullptr/missions/base.py
2024-02-11 18:24:16 +01:00

339 lines
8.8 KiB
Python

from nullptr.store import Store
from nullptr.models.base import Base
from nullptr.models.waypoint import Waypoint
from nullptr.models.contract import Contract
from nullptr.models.system import System
from nullptr.models.survey import Survey
from nullptr.models.ship import Ship
from nullptr.analyzer import *
from time import time
from functools import partial
import logging
from nullptr.util import *
class MissionError(Exception):
pass
class MissionParam:
def __init__(self, cls, required=True, default=None):
self.cls = cls
self.required = required
self.default = default
def parse(self, val, store):
if self.cls == str:
return str(val)
elif self.cls == int:
return int(val)
elif self.cls == list:
if type(val) == str:
return [i.strip() for i in val.split(',')]
return val
elif issubclass(self.cls, Base):
if type(val) == str:
data = store.get(self.cls, val)
else:
data = val
if data is None:
raise ValueError('object not found')
return data.symbol
else:
raise ValueError('unknown param typr')
class Mission:
@classmethod
def params(cls):
return {
}
def __init__(self, ship, context):
self.ship = ship
self.c = context
self.store = context.store
self.api = context.api
self.wait_for = None
self.next_step = 0
self.setup()
def setup(self):
pass
def sts(self, nm, v):
if issubclass(type(v), Base):
v = v.symbol
self.ship.set_mission_state(nm, v)
def rst(self, typ, nm):
symbol = self.st(nm)
if symbol is None:
return None
return self.store.get(typ, symbol)
def st(self, nm):
if not nm in self.ship.mission_state:
return None
return self.ship.mission_state[nm]
def status(self, nw=None):
if nw is None:
return self.ship.mission_status
else:
steps = self.steps()
if nw in ['init','done', 'error']:
self.ship.mission_status = nw
return
elif nw not in steps:
self.ship.log(f"Invalid mission status {nw}", 1)
self.ship.mission_status = 'error'
return
wait_for = steps[nw][2] if len(steps[nw]) > 2 else None
self.wait_for = wait_for
self.ship.mission_status = nw
def start_state(self):
return 'done'
def error(self, msg):
self.status('error')
print(msg)
def init_state(self):
for name, param in self.params().items():
if param.required and param.default is None:
if not name in self.ship.mission_state:
return self.error(f'Param {name} not set')
self.status(self.start_state())
def steps(self):
return {
}
def step_done(self):
self.ship.log(f'mission {type(self).__name__} finished with balance {self.balance()}', 3)
def get_prio(self):
if self.next_step > time() or self.ship.cooldown > time() or self.ship.arrival > time():
return 0
if self.wait_for is not None:
p = int(self.wait_for())
if p > 0:
self.wait_for = None
return p
return 3
def is_finished(self):
return self.status() in ['done','error']
def is_ready(self):
if self.is_finished():
return 0
return self.get_prio()
def step(self):
steps = self.steps()
if self.status() == 'init':
self.init_state()
status = self.status()
if not status in steps:
self.ship.log(f"Invalid mission status {status}", 1)
self.status('error')
return
handler = steps[status][0]
next_step = steps[status][1]
try:
result = handler()
except Exception as e:
self.ship.log(fmtex(e))
self.ship.log(self.api.last_result)
self.status('error')
return
if type(next_step) == str:
self.status(next_step)
elif type(next_step) == dict:
if result not in next_step:
self.ship.log(f'Invalid step result {result}', 1)
self.status('error')
return
else:
if result is None: result=''
self.status(next_step[result])
self.ship.log(f'{status} {result} -> {self.status()}', 8)
class BaseMission(Mission):
def balance(self, amt=0):
if type(amt) == dict:
amt = self.api.transaction_cost(amt)
balance = self.st('balance')
if balance is None: balance = 0
balance += amt
self.sts('balance', balance)
return balance
def step_pass(self):
pass
def step_go_dest(self):
destination = self.rst(Waypoint, 'destination')
if self.ship.location() == destination:
return
self.api.navigate(self.ship, destination)
self.next_step = self.ship.arrival
def step_go_site(self):
site = self.rst(Waypoint,'site')
if self.ship.location() == site:
return
self.api.navigate(self.ship, site)
self.next_step = self.ship.arrival
def step_market(self):
loc = self.ship.location
self.api.marketplace(loc)
def step_shipyard(self):
loc = self.ship.location
if 'SHIPYARD' in loc.traits:
self.api.shipyard(loc)
def step_unload(self):
delivery = self.st('delivery')
if delivery == 'sell':
return self.step_sell(False)
contract = self.rst(Contract, 'contract')
typs = self.ship.deliverable_cargo(contract)
if len(typs) == 0:
return 'done'
self.api.deliver(self.ship, typs[0], contract)
if len(typs) == 1:
return 'done'
else:
return 'more'
def step_sell(self, except_resource=True):
market = self.store.get('Marketplace', self.ship.location.symbol)
sellables = market.sellable_items(self.ship.cargo.keys())
if len(sellables) == 0:
return 'done'
resource = sellables[0]
volume = market.volume(resource)
amt_cargo = self.ship.get_cargo(resource)
amount = min(amt_cargo, volume)
res = self.api.sell(self.ship, resource, amount)
self.balance(res)
if len(sellables) == 1 and amt_cargo == amount:
return 'done'
else:
return 'more'
def step_travel(self):
traject = self.st('traject')
if traject is None or traject == []:
return
dest = traject[-1]
loc = self.ship.location
hop = traject.pop(0)
if type(hop) == Waypoint:
self.api.navigate(self.ship, hop)
self.next_step = self.ship.arrival
else:
self.api.jump(self.ship, hop)
self.next_step = self.ship.cooldown
self.sts('traject', traject)
def step_navigate_traject(self):
traject = self.st('traject')
loc = self.ship.location
if traject is None or traject == []:
return 'done'
dest =traject[-1]
if dest == loc:
return 'done'
return 'more'
def step_calculate_traject(self, dest):
if type(dest) == str:
dest = self.store.get(Waypoint, dest)
loc = self.ship.location
loc_sys = loc.system
loc_jg = get_jumpgate(self.c, loc_sys)
loc_jg_wp = self.store.get(Waypoint, loc_jg.symbol)
dest_sys = dest.system
dest_jg = get_jumpgate(self.c, dest_sys)
if dest_sys == loc_sys:
result = find_nav_path(self.c, loc, dest, self.ship.range())
self.sts('traject', result)
return 'done' if len(result) == 0 else 'more'
path = find_jump_path(self.c, loc_sys, dest_sys)
result = []
if loc.symbol != loc_jg.symbol:
result.append(loc_jg_wp)
result += [s for s in path[1:]]
if dest_jg.symbol != dest.symbol:
result.append(dest)
self.sts('traject', result)
print(result)
return 'more'
def step_dock(self):
if self.ship.status == 'DOCKED':
return
self.api.dock(self.ship)
def step_refuel(self):
if self.ship.fuel_capacity == 0:
return
#if self.ship.fuel_capacity - self.ship.fuel_current > 100:
try:
self.api.refuel(self.ship)
except Exception as e:
pass
def step_orbit(self):
if self.ship.status != 'DOCKED':
return
self.api.orbit(self.ship)
def travel_steps(self, nm, destination, next_step):
destination = self.st(destination)
calc = partial(self.step_calculate_traject, destination)
steps = {
f'travel-{nm}': (calc, {
'more': f'dock-{nm}',
'done': next_step
}),
f'dock-{nm}': (self.step_dock, f'refuel-{nm}'),
f'refuel-{nm}': (self.step_refuel, f'orbit-{nm}'),
f'orbit-{nm}': (self.step_orbit, f'go-{nm}'),
f'go-{nm}': (self.step_travel, f'nav-{nm}'),
f'nav-{nm}': (self.step_navigate_traject, {
'done': next_step,
'more': f'dock-{nm}'
})
}
if self.ship.fuel_capacity == 0:
steps = {
f'travel-{nm}': (calc, f'orbit-{nm}'),
f'orbit-{nm}': (self.step_orbit, f'go-{nm}'),
f'go-{nm}': (self.step_travel, f'nav-{nm}'),
f'nav-{nm}': (self.step_navigate_traject, {
'done': next_step,
'more': f'go-{nm}'
}),
}
return steps