from nullptr.models.marketplace import Marketplace from nullptr.models.jumpgate import Jumpgate from nullptr.models.system import System from nullptr.models.waypoint import Waypoint from dataclasses import dataclass from copy import copy class AnalyzerException(Exception): pass def path_dist(m): t = 0 o = Point(0,0) for w in m: t +=w.distance(o) o = w return t @dataclass class Point: x: int y: int @dataclass class TradeOption: resource: str source: Waypoint dest: Waypoint margin: int dist: int score: float @dataclass class SearchNode: system: System parent: 'SearchNode' def __hash__(self): return hash(self.system.symbol) def path(self): result = [] n = self while n is not None: result.append(n.system) n = n.parent result.reverse() return result def __repr__(self): return self.system.symbol class Analyzer: def __init__(self, store): self.store = store def find_markets(self, resource, sellbuy): for m in self.store.all(Marketplace): if 'sell' in sellbuy and resource in m.imports: yield ('sell', m) elif 'buy' in sellbuy and resource in m.exports: yield ('buy', m) elif 'exchange' in sellbuy and resource in m.exchange: yield ('exchange', m) def find_closest_markets(self, resource, sellbuy, location): if type(location) == str: location = self.store.get(Waypoint, location) mkts = self.find_markets(resource, sellbuy) candidates = [] origin = location.system for typ, m in mkts: system = m.waypoint.system d = origin.distance(system) candidates.append((typ, m, d)) possibles = sorted(candidates, key=lambda m: m[2]) possibles = possibles[:10] results = [] for typ,m,d in possibles: system = m.waypoint.system p = self.find_path(origin, system) if p is None: continue results.append((typ,m,d,len(p))) return results def solve_tsp(self, waypoints): wps = copy(waypoints) path = [] cur = Point(0,0) while len(wps) > 0: closest = wps[0] for w in wps: if w.distance(cur) < closest.distance(cur): closest = w cur = closest path.append(closest) wps.remove(closest) return path def get_jumpgate(self, system): gates = self.store.all_members(system, Jumpgate) return next(gates, None) # dijkstra shmijkstra def find_nav_path(self, orig, to, ran): path = [] mkts = [m.waypoint for m in self.store.all_members(orig.system, Marketplace)] cur = orig if orig == to: return [] while cur != to: best = cur bestdist = cur.distance(to) if bestdist < ran: path.append(to) break for m in mkts: dist = m.distance(to) if dist < bestdist and cur.distance(m) < ran: best = m bestdist = dist if best == cur: raise AnalyzerException(f'no path to {to}') cur = best path.append(cur) return path def find_jump_path(self, orig, to, depth=100, seen=None): if depth < 1: return None if seen is None: seen = set() if type(orig) == System: orig = set([SearchNode(orig,None)]) result = [n for n in orig if n==to] if len(result) > 0: return result[0].path() dest = set() for o in orig: jg = self.get_jumpgate(o) if jg is None: continue for s in jg.connections: if s in seen: continue seen.add(s) dest.add(SearchNode(s, o)) if len(dest) == 0: return None return self.find_path(dest, to, depth-1, seen) def prices(self, system): prices = {} for m in self.store.all_members(system, Marketplace): for p in m.prices.values(): r = p['symbol'] if not r in prices: prices[r] = [] prices[r].append({ 'wp': m.waypoint, 'buy': p['buy'], 'sell': p['sell'] }) return prices def find_trade(self, system): prices = self.prices(system) occupied_resources = set() for s in self.store.all('Ship'): if s.mission != 'haul': continue occupied_resources.add(s.mission_state['resource']) best = None for resource, markets in prices.items(): if resource in occupied_resources: continue source = sorted(markets, key=lambda x: x['buy'])[0] dest = sorted(markets, key=lambda x: x['sell'])[-1] margin = dest['sell'] -source['buy'] dist = source['wp'].distance(dest['wp']) dist = max(dist, 0.0001) score = margin / dist if margin < 0: continue o = TradeOption(resource, source['wp'], dest['wp'], margin, dist, score) if best is None or best.score < o.score: best = o return best