0ptr/nullptr/store.py
2023-12-25 07:54:19 +01:00

302 lines
7.5 KiB
Python

from nullptr.models import *
from os.path import isfile, dirname, isdir
import os
from os.path import basename
import json
from .util import *
from time import time
import pickle
from struct import unpack, pack
from functools import partial
from io import BytesIO
class StorePickler(pickle.Pickler):
def persistent_id(self, obj):
return "STORE" if type(obj) == Store else None
class StoreUnpickler(pickle.Unpickler):
def __init__(self, stream, store):
self.store = store
super().__init__(stream)
def persistent_load(self, pers_id):
if pers_id == "STORE":
return self.store
raise pickle.UnpicklingError("I don know the persid!")
class ChunkHeader:
def __init__(self):
self.offset = 0
self.in_use = True
self.size = 0
self.used = 0
@classmethod
def parse(cls, fil):
offset = fil.tell()
d = fil.read(16)
if len(d) < 16:
return None
o = cls()
o.offset = offset
d, o.used = unpack('<QQ', d)
o.size = d & 0x7fffffffffffffff
o.in_use = d & 0x8000000000000000 != 0
# print(o)
return o
def write(self, f):
d = self.size
if self.in_use:
d |= 1 << 63
d = pack('<QQ', d, self.used)
f.write(d)
def __repr__(self):
return f'chunk {self.in_use} {self.size} {self.used}'
class Store:
def __init__(self, data_file, verbose=False):
self.init_models()
self.fil = open_file(data_file)
self.data = {m: {} for m in self.models}
self.system_members = {}
self.dirty_objects = set()
self.cleanup_interval = 600
self.last_cleanup = 0
self.slack = 0.1
self.slack_min = 64
self.slack_max = 1024
self.verbose = verbose
self.load()
def p(self, m):
if not self.verbose:
return
print(m)
def close(self):
self.flush()
self.fil.close()
def init_models(self):
self.models = all_subclasses(Base)
self.extensions = {c.ext(): c for c in self.models}
self.model_names = {c.__name__: c for c in self.models}
def dirty(self, obj):
self.dirty_objects.add(obj)
def dump_object(self, obj):
buf = BytesIO()
p = StorePickler(buf)
p.dump(obj)
return buf.getvalue()
def load_object(self, data, offset):
buf = BytesIO(data)
p = StoreUnpickler(buf, self)
obj = p.load()
obj.file_offset = offset
obj.disable_dirty = False
self.hold(obj)
def load(self):
cnt = 0
start_time = time()
total = 0
free = 0
self.fil.seek(0)
offset = 0
while (hdr := ChunkHeader.parse(self.fil)):
self.p(hdr)
total += hdr.size
if not hdr.in_use:
self.fil.seek(hdr.size, 1)
free += hdr.size
else:
data = self.fil.read(hdr.used)
self.load_object(data, offset)
self.fil.seek(hdr.size - hdr.used, 1)
cnt += 1
offset = self.fil.tell()
dur = time() - start_time
self.p(f'Loaded {cnt} objects in {dur:.2f} seconds')
self.p(f'Fragmented space: {free} / {total} bytes')
def allocate_chunk(self, sz):
used = sz
slack = sz * self.slack
slack = min(slack, self.slack_max)
slack = max(slack, self.slack_min)
sz += int(slack)
self.fil.seek(0, 2)
offset = self.fil.tell()
h = ChunkHeader()
h.size = sz
h.used = used
h.offset = self.fil.tell()
h.write(self.fil)
return offset, h
def purge(self, obj):
if obj.file_offset is None:
return
self.fil.seek(obj.file_offset)
hdr = ChunkHeader.parse(self.fil)
hdr.in_use = False
self.fil.seek(obj.file_offset)
hdr.write(self.fil)
obj.file_offset = None
def store(self, obj):
data = self.dump_object(obj)
osize = len(data)
# is there an existing chunk for this obj?
if obj.file_offset is not None:
# read chunk hdr
self.fil.seek(obj.file_offset)
hdr = ChunkHeader.parse(self.fil)
csize = hdr.size
# if the chunk is too small
if csize < osize:
# free the chunk
hdr.in_use = False
# force a new chunk
obj.file_offset = None
else:
# if it is big enough, update the used field
hdr.used = osize
self.fil.seek(hdr.offset)
hdr.write(self.fil)
if obj.file_offset is None:
obj.file_offset, hdr = self.allocate_chunk(osize)
# print(type(obj).__name__, hdr)
self.fil.write(data)
slack = b'\x00' * (hdr.size - hdr.used)
self.fil.write(slack)
def hold(self, obj):
typ = type(obj)
symbol = obj.symbol
obj.store = self
self.data[typ][symbol] = obj
if hasattr(obj, 'system'):
system_str = obj.get_system().symbol
if system_str not in self.system_members:
self.system_members[system_str] = set()
self.system_members[system_str].add(obj)
def create(self, typ, symbol):
obj = typ(symbol, self)
self.hold(obj)
self.dirty(obj)
return obj
def get(self, typ, symbol, create=False):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
symbol = symbol.upper()
if typ not in self.data:
return None
if symbol not in self.data[typ]:
if create:
return self.create(typ, symbol)
else:
return None
return self.data[typ][symbol]
def getter(self, typ, create=False):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
return partial(self.get, typ, create=create)
def update(self, typ, data, symbol=None):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
if symbol is None:
symbol = mg(data, typ.identifier)
obj = self.get(typ, symbol, True)
obj.update(data)
return obj
def update_list(self, typ, lst):
return [self.update(typ, d) for d in lst]
def all(self, typ):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
for m in self.data[typ].values():
if m.is_expired():
self.dirty(m)
continue
yield m
def all_members(self, system, typ=None):
if type(typ) == str and typ in self.model_names:
typ = self.model_names[typ]
if type(system) == System:
system = system.symbol
if system not in self.system_members:
return
garbage = set()
for m in self.system_members[system]:
if m.is_expired():
self.dirty(m)
garbage.add(m)
continue
if typ is None or type(m) == typ:
yield m
for m in garbage:
self.system_members[system].remove(m)
def cleanup(self):
if time() < self.last_cleanup + self.cleanup_interval:
return
self.last_cleanup = time()
start_time = time()
expired = list()
for t in self.data:
for o in self.all(t):
if o.is_expired():
expired.append(o)
for o in expired:
self.purge(obj)
del self.data[type(o)][o.symbol]
dur = time() - start_time
self.p(f'cleaned {len(expired)} in {dur:.03f} seconds')
def flush(self):
self.cleanup()
it = 0
start_time = time()
for obj in self.dirty_objects:
it += 1
if obj.is_expired():
self.purge(obj)
else:
self.store(obj)
self.fil.flush()
self.dirty_objects = set()
dur = time() - start_time
# print(f'flush done {it} items {dur:.2f}')
def defrag(self):
nm = self.fil.name
self.fil.close()
os.rename(nm, nm + '.bak')
self.fil = open(nm, 'ab+')
for t in self.data:
for o in self.all(t):
o.file_offset = None
self.store(o)