0ptr/nullptr/store.py
2024-01-20 20:33:50 +01:00

342 lines
8.9 KiB
Python

from nullptr.models import *
from os.path import isfile, dirname, isdir
import os
from os.path import basename
import json
from .util import *
from time import time
import pickle
from struct import unpack, pack
from functools import partial
from io import BytesIO
from copy import copy
class StorePickler(pickle.Pickler):
def persistent_id(self, obj):
return "STORE" if type(obj) == Store else None
class StoreUnpickler(pickle.Unpickler):
def __init__(self, stream, store):
self.store = store
super().__init__(stream)
def persistent_load(self, pers_id):
if pers_id == "STORE":
return self.store
raise pickle.UnpicklingError("I don know the persid!")
CHUNK_MAGIC = b'ChNkcHnK'
class ChunkHeader:
def __init__(self):
self.magic = CHUNK_MAGIC
self.offset = 0
self.in_use = True
self.size = 0
self.used = 0
@classmethod
def parse(cls, fil):
offset = fil.tell()
d = fil.read(24)
if len(d) < 24:
return None
o = cls()
o.offset = offset
o.magic, d, o.used = unpack('<8sQQ', d)
o.size = d & 0x7fffffffffffffff
o.in_use = d & 0x8000000000000000 != 0
if o.magic != CHUNK_MAGIC:
raise ValueError(f"Invalid chunk magic: {o.magic}")
# print(o)
return o
def write(self, f):
d = self.size
if self.in_use:
d |= 1 << 63
d = pack('<8sQQ', self.magic, d, self.used)
f.write(d)
def __repr__(self):
return f'chunk {self.in_use} {self.size} {self.used}'
def f(self, detail=1):
if detail == 1:
return f'chunk {self.offset} {self.used}/{self.size}'
else:
r = f'Stored at: {self.offset}\n'
slack = self.size - self.used
r += f'Used: {self.used}/{self.size} (slack {slack})'
return r
class Store:
def __init__(self, data_file, verbose=False):
self.init_models()
self.data_file = data_file
self.data_dir = os.path.dirname(data_file)
self.fil = open_file(data_file)
self.data = {m: {} for m in self.models}
self.system_members = {}
self.dirty_objects = set()
self.cleanup_interval = 600
self.last_cleanup = 0
self.slack = 0.1
self.slack_min = 64
self.slack_max = 1024
self.verbose = verbose
self.load()
def p(self, m):
if not self.verbose:
return
print(m)
def f(self, detail):
return f'Store {self.data_file}'
def close(self):
self.flush()
self.fil.close()
def init_models(self):
self.models = all_subclasses(Base)
self.extensions = {c.ext(): c for c in self.models}
self.model_names = {c.__name__: c for c in self.models}
def dirty(self, obj):
self.dirty_objects.add(obj)
def dump_object(self, obj):
buf = BytesIO()
p = StorePickler(buf)
p.dump(obj)
return buf.getvalue()
def load_object(self, data, offset):
buf = BytesIO(data)
p = StoreUnpickler(buf, self)
obj = p.load()
x = self.get(type(obj), obj.symbol)
if x is not None and x in self.dirty_objects:
self.dirty_objects.remove(obj)
obj._file_offset = offset
self.hold(obj)
def load(self):
cnt = 0
start_time = time()
total = 0
free = 0
self.fil.seek(0)
offset = 0
while (hdr := ChunkHeader.parse(self.fil)):
self.p(hdr)
total += hdr.size
if not hdr.in_use:
# print(f"skip {hdr.size} {self.fil.tell()}")
self.fil.seek(hdr.size, 1)
free += hdr.size
else:
data = self.fil.read(hdr.used)
self.load_object(data, offset)
# print(f"pad {hdr.size - hdr.used}")
self.fil.seek(hdr.size - hdr.used, 1)
cnt += 1
offset = self.fil.tell()
dur = time() - start_time
# just in case any temp objects were created
self.dirty_objects = set()
self.p(f'Loaded {cnt} objects in {dur:.2f} seconds')
self.p(f'Fragmented space: {free} / {total} bytes')
def allocate_chunk(self, sz):
used = sz
slack = sz * self.slack
slack = min(slack, self.slack_max)
slack = max(slack, self.slack_min)
sz += int(slack)
self.fil.seek(0, 2)
offset = self.fil.tell()
h = ChunkHeader()
h.size = sz
h.used = used
h.offset = offset
h.write(self.fil)
return offset, h
def get_header(self, obj):
if obj._file_offset is None:
return None
self.fil.seek(obj._file_offset)
hdr = ChunkHeader.parse(self.fil)
return hdr
def purge(self, obj):
if obj._file_offset is None:
return
self.fil.seek(obj._file_offset)
hdr = ChunkHeader.parse(self.fil)
hdr.in_use = False
self.fil.seek(obj._file_offset)
hdr.write(self.fil)
if type(obj) in self.data and obj.symbol in self.data[type(obj)]:
del self.data[type(obj)][obj.symbol]
if obj in self.dirty_objects:
self.dirty_objects.remove(obj)
obj._file_offset = None
def store(self, obj):
data = self.dump_object(obj)
osize = len(data)
# is there an existing chunk for this obj?
if obj._file_offset is not None:
# read chunk hdr
self.fil.seek(obj._file_offset)
hdr = ChunkHeader.parse(self.fil)
csize = hdr.size
# if the chunk is too small
if csize < osize:
# free the chunk
hdr.in_use = False
# force a new chunk
obj._file_offset = None
else:
# if it is big enough, update the used field
hdr.used = osize
self.fil.seek(hdr.offset)
hdr.write(self.fil)
if obj._file_offset is None:
obj._file_offset, hdr = self.allocate_chunk(osize)
# print(type(obj).__name__, hdr)
self.fil.write(data)
slack = b'\x00' * (hdr.size - hdr.used)
self.fil.write(slack)
def hold(self, obj):
typ = type(obj)
symbol = obj.symbol
obj.store = self
self.data[typ][symbol] = obj
if type(obj).__name__ in ['Waypoint','Marketplace', 'Jumpgate', 'Survey']:
system_str = obj.system.symbol
if system_str not in self.system_members:
self.system_members[system_str] = set()
self.system_members[system_str].add(obj)
def create(self, typ, symbol):
obj = typ(symbol, self)
obj.created()
self.hold(obj)
self.dirty(obj)
return obj
def get(self, typ, symbol, create=False):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
symbol = symbol.upper()
if typ not in self.data:
return None
if symbol not in self.data[typ]:
if create:
return self.create(typ, symbol)
else:
return None
return self.data[typ][symbol]
def getter(self, typ, create=False):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
return partial(self.get, typ, create=create)
def update(self, typ, data, symbol=None):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
if symbol is None:
symbol = mg(data, typ.identifier)
obj = self.get(typ, symbol, True)
obj.update(data)
return obj
def update_list(self, typ, lst):
return [self.update(typ, d) for d in lst]
def all(self, typ):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
for m in self.data[typ].values():
if m.is_expired():
self.dirty(m)
continue
yield m
def all_members(self, system, typ=None):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
if type(system) == System:
system = system.symbol
if system not in self.system_members:
return
garbage = set()
for m in self.system_members[system]:
if m.is_expired():
self.dirty(m)
garbage.add(m)
continue
if typ is None or type(m) == typ:
yield m
for m in garbage:
self.system_members[system].remove(m)
def cleanup(self):
if time() < self.last_cleanup + self.cleanup_interval:
return
self.last_cleanup = time()
start_time = time()
expired = list()
for t in self.data:
for o in self.all(t):
if o.is_expired():
expired.append(o)
for o in expired:
self.purge(obj)
del self.data[type(o)][o.symbol]
dur = time() - start_time
self.p(f'cleaned {len(expired)} in {dur:.03f} seconds')
def flush(self):
self.cleanup()
it = 0
start_time = time()
for obj in copy(self.dirty_objects):
it += 1
if obj.is_expired():
self.purge(obj)
else:
self.store(obj)
self.fil.flush()
self.dirty_objects = set()
dur = time() - start_time
self.p(f'flush done {it} items {dur:.2f}')
def defrag(self):
nm = self.fil.name
self.fil.close()
bakfile = nm+'.bak'
if os.path.isfile(bakfile):
os.remove(bakfile)
os.rename(nm, nm + '.bak')
self.fil = open_file(nm)
for t in self.data:
for o in self.all(t):
o._file_offset = None
self.store(o)