302 lines
7.5 KiB
Python
302 lines
7.5 KiB
Python
from nullptr.models import *
|
|
from os.path import isfile, dirname, isdir
|
|
import os
|
|
from os.path import basename
|
|
import json
|
|
from .util import *
|
|
from time import time
|
|
import pickle
|
|
from struct import unpack, pack
|
|
from functools import partial
|
|
from io import BytesIO
|
|
|
|
class StorePickler(pickle.Pickler):
|
|
def persistent_id(self, obj):
|
|
return "STORE" if type(obj) == Store else None
|
|
|
|
class StoreUnpickler(pickle.Unpickler):
|
|
def __init__(self, stream, store):
|
|
self.store = store
|
|
super().__init__(stream)
|
|
|
|
def persistent_load(self, pers_id):
|
|
if pers_id == "STORE":
|
|
return self.store
|
|
raise pickle.UnpicklingError("I don know the persid!")
|
|
|
|
|
|
class ChunkHeader:
|
|
def __init__(self):
|
|
self.offset = 0
|
|
self.in_use = True
|
|
self.size = 0
|
|
self.used = 0
|
|
|
|
@classmethod
|
|
def parse(cls, fil):
|
|
offset = fil.tell()
|
|
d = fil.read(16)
|
|
if len(d) < 16:
|
|
return None
|
|
o = cls()
|
|
o.offset = offset
|
|
d, o.used = unpack('<QQ', d)
|
|
o.size = d & 0x7fffffffffffffff
|
|
o.in_use = d & 0x8000000000000000 != 0
|
|
# print(o)
|
|
return o
|
|
|
|
def write(self, f):
|
|
d = self.size
|
|
if self.in_use:
|
|
d |= 1 << 63
|
|
d = pack('<QQ', d, self.used)
|
|
f.write(d)
|
|
|
|
def __repr__(self):
|
|
return f'chunk {self.in_use} {self.size} {self.used}'
|
|
|
|
class Store:
|
|
def __init__(self, data_file, verbose=False):
|
|
self.init_models()
|
|
self.fil = open_file(data_file)
|
|
self.data = {m: {} for m in self.models}
|
|
self.system_members = {}
|
|
self.dirty_objects = set()
|
|
self.cleanup_interval = 600
|
|
self.last_cleanup = 0
|
|
self.slack = 0.1
|
|
self.slack_min = 64
|
|
self.slack_max = 1024
|
|
self.verbose = verbose
|
|
self.load()
|
|
|
|
def p(self, m):
|
|
if not self.verbose:
|
|
return
|
|
print(m)
|
|
|
|
def close(self):
|
|
self.flush()
|
|
self.fil.close()
|
|
|
|
def init_models(self):
|
|
self.models = all_subclasses(Base)
|
|
self.extensions = {c.ext(): c for c in self.models}
|
|
self.model_names = {c.__name__: c for c in self.models}
|
|
|
|
def dirty(self, obj):
|
|
self.dirty_objects.add(obj)
|
|
|
|
def dump_object(self, obj):
|
|
buf = BytesIO()
|
|
p = StorePickler(buf)
|
|
p.dump(obj)
|
|
return buf.getvalue()
|
|
|
|
def load_object(self, data, offset):
|
|
buf = BytesIO(data)
|
|
p = StoreUnpickler(buf, self)
|
|
obj = p.load()
|
|
obj.file_offset = offset
|
|
obj.disable_dirty = False
|
|
self.hold(obj)
|
|
|
|
def load(self):
|
|
cnt = 0
|
|
start_time = time()
|
|
total = 0
|
|
free = 0
|
|
self.fil.seek(0)
|
|
offset = 0
|
|
while (hdr := ChunkHeader.parse(self.fil)):
|
|
self.p(hdr)
|
|
total += hdr.size
|
|
if not hdr.in_use:
|
|
self.fil.seek(hdr.size, 1)
|
|
free += hdr.size
|
|
else:
|
|
data = self.fil.read(hdr.used)
|
|
self.load_object(data, offset)
|
|
self.fil.seek(hdr.size - hdr.used, 1)
|
|
cnt += 1
|
|
offset = self.fil.tell()
|
|
|
|
dur = time() - start_time
|
|
self.p(f'Loaded {cnt} objects in {dur:.2f} seconds')
|
|
self.p(f'Fragmented space: {free} / {total} bytes')
|
|
|
|
def allocate_chunk(self, sz):
|
|
used = sz
|
|
slack = sz * self.slack
|
|
slack = min(slack, self.slack_max)
|
|
slack = max(slack, self.slack_min)
|
|
sz += int(slack)
|
|
self.fil.seek(0, 2)
|
|
offset = self.fil.tell()
|
|
h = ChunkHeader()
|
|
h.size = sz
|
|
h.used = used
|
|
h.offset = self.fil.tell()
|
|
h.write(self.fil)
|
|
return offset, h
|
|
|
|
def purge(self, obj):
|
|
if obj.file_offset is None:
|
|
return
|
|
self.fil.seek(obj.file_offset)
|
|
hdr = ChunkHeader.parse(self.fil)
|
|
hdr.in_use = False
|
|
self.fil.seek(obj.file_offset)
|
|
hdr.write(self.fil)
|
|
obj.file_offset = None
|
|
|
|
def store(self, obj):
|
|
data = self.dump_object(obj)
|
|
osize = len(data)
|
|
# is there an existing chunk for this obj?
|
|
if obj.file_offset is not None:
|
|
# read chunk hdr
|
|
self.fil.seek(obj.file_offset)
|
|
hdr = ChunkHeader.parse(self.fil)
|
|
csize = hdr.size
|
|
# if the chunk is too small
|
|
if csize < osize:
|
|
# free the chunk
|
|
hdr.in_use = False
|
|
# force a new chunk
|
|
obj.file_offset = None
|
|
else:
|
|
# if it is big enough, update the used field
|
|
hdr.used = osize
|
|
self.fil.seek(hdr.offset)
|
|
hdr.write(self.fil)
|
|
|
|
if obj.file_offset is None:
|
|
obj.file_offset, hdr = self.allocate_chunk(osize)
|
|
# print(type(obj).__name__, hdr)
|
|
self.fil.write(data)
|
|
slack = b'\x00' * (hdr.size - hdr.used)
|
|
self.fil.write(slack)
|
|
|
|
def hold(self, obj):
|
|
typ = type(obj)
|
|
symbol = obj.symbol
|
|
obj.store = self
|
|
self.data[typ][symbol] = obj
|
|
if hasattr(obj, 'system'):
|
|
system_str = obj.get_system().symbol
|
|
if system_str not in self.system_members:
|
|
self.system_members[system_str] = set()
|
|
self.system_members[system_str].add(obj)
|
|
|
|
def create(self, typ, symbol):
|
|
obj = typ(symbol, self)
|
|
self.hold(obj)
|
|
self.dirty(obj)
|
|
return obj
|
|
|
|
def get(self, typ, symbol, create=False):
|
|
if type(typ) == str and typ in self.model_names:
|
|
typ = self.model_names[typ]
|
|
symbol = symbol.upper()
|
|
if typ not in self.data:
|
|
return None
|
|
if symbol not in self.data[typ]:
|
|
if create:
|
|
return self.create(typ, symbol)
|
|
else:
|
|
return None
|
|
return self.data[typ][symbol]
|
|
|
|
def getter(self, typ, create=False):
|
|
if type(typ) == str and typ in self.model_names:
|
|
typ = self.model_names[typ]
|
|
return partial(self.get, typ, create=create)
|
|
|
|
def update(self, typ, data, symbol=None):
|
|
if type(typ) == str and typ in self.model_names:
|
|
typ = self.model_names[typ]
|
|
if symbol is None:
|
|
symbol = mg(data, typ.identifier)
|
|
obj = self.get(typ, symbol, True)
|
|
obj.update(data)
|
|
return obj
|
|
|
|
def update_list(self, typ, lst):
|
|
return [self.update(typ, d) for d in lst]
|
|
|
|
def all(self, typ):
|
|
if type(typ) == str and typ in self.model_names:
|
|
typ = self.model_names[typ]
|
|
|
|
for m in self.data[typ].values():
|
|
if m.is_expired():
|
|
self.dirty(m)
|
|
continue
|
|
yield m
|
|
|
|
def all_members(self, system, typ=None):
|
|
if type(typ) == str and typ in self.model_names:
|
|
typ = self.model_names[typ]
|
|
|
|
if type(system) == System:
|
|
system = system.symbol
|
|
|
|
if system not in self.system_members:
|
|
return
|
|
|
|
garbage = set()
|
|
for m in self.system_members[system]:
|
|
if m.is_expired():
|
|
self.dirty(m)
|
|
garbage.add(m)
|
|
continue
|
|
if typ is None or type(m) == typ:
|
|
yield m
|
|
|
|
for m in garbage:
|
|
self.system_members[system].remove(m)
|
|
|
|
def cleanup(self):
|
|
if time() < self.last_cleanup + self.cleanup_interval:
|
|
return
|
|
self.last_cleanup = time()
|
|
start_time = time()
|
|
expired = list()
|
|
for t in self.data:
|
|
for o in self.all(t):
|
|
if o.is_expired():
|
|
expired.append(o)
|
|
for o in expired:
|
|
self.purge(obj)
|
|
|
|
del self.data[type(o)][o.symbol]
|
|
dur = time() - start_time
|
|
self.p(f'cleaned {len(expired)} in {dur:.03f} seconds')
|
|
|
|
def flush(self):
|
|
self.cleanup()
|
|
it = 0
|
|
start_time = time()
|
|
for obj in self.dirty_objects:
|
|
it += 1
|
|
if obj.is_expired():
|
|
self.purge(obj)
|
|
else:
|
|
self.store(obj)
|
|
self.fil.flush()
|
|
self.dirty_objects = set()
|
|
dur = time() - start_time
|
|
# print(f'flush done {it} items {dur:.2f}')
|
|
|
|
def defrag(self):
|
|
nm = self.fil.name
|
|
self.fil.close()
|
|
os.rename(nm, nm + '.bak')
|
|
self.fil = open(nm, 'ab+')
|
|
for t in self.data:
|
|
for o in self.all(t):
|
|
o.file_offset = None
|
|
self.store(o)
|