237dcc8c14
Commander cleanup First impl of ship logs Ship display improved Store debugged
170 lines
4.4 KiB
Python
170 lines
4.4 KiB
Python
import unittest
|
|
import tempfile
|
|
from nullptr.store import Store, ChunkHeader
|
|
from nullptr.models import Base
|
|
from io import BytesIO
|
|
import os
|
|
from nullptr.store_analyzer import StoreAnalyzer
|
|
|
|
class Dummy(Base):
|
|
def define(self):
|
|
self.count: int = 0
|
|
self.data: str = ""
|
|
|
|
def update(self, d):
|
|
self.seta('count', d)
|
|
|
|
@classmethod
|
|
def ext(self):
|
|
return 'dum'
|
|
|
|
def f(self, detail=1):
|
|
r = super().f(detail) + '.' + self.ext()
|
|
if detail >2:
|
|
r += f' c:{self.count}'
|
|
return r
|
|
|
|
class TestStore(unittest.TestCase):
|
|
def setUp(self):
|
|
self.store_file = tempfile.NamedTemporaryFile()
|
|
self.s = Store(self.store_file.name, False)
|
|
|
|
def tearDown(self):
|
|
self.s.close()
|
|
self.store_file.close()
|
|
|
|
def reopen(self):
|
|
self.s.flush()
|
|
self.s.close()
|
|
self.s = Store(self.store_file.name, False)
|
|
|
|
def test_single(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.count = 1337
|
|
dum.data = "A" * 1000
|
|
self.reopen()
|
|
|
|
dum = self.s.get(Dummy, "5")
|
|
self.assertEqual(1337, dum.count)
|
|
|
|
def test_grow(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.data = "A"
|
|
dum2 = self.s.get(Dummy, "7",create=True)
|
|
self.reopen()
|
|
dum = self.s.get(Dummy, "5")
|
|
old_off = dum._file_offset
|
|
self.assertTrue(old_off is not None)
|
|
dum.data = "A" * 1000
|
|
dum.count = 1337
|
|
self.s.flush()
|
|
new_off = dum._file_offset
|
|
self.assertTrue(new_off is not None)
|
|
self.assertNotEqual(old_off, new_off)
|
|
self.reopen()
|
|
dum = self.s.get(Dummy, "5")
|
|
newer_off = dum._file_offset
|
|
self.assertTrue(newer_off is not None)
|
|
self.assertEqual(new_off, newer_off)
|
|
self.assertEqual(1337, dum.count)
|
|
|
|
def test_purge(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.data = "A"
|
|
dum2 = self.s.get(Dummy, "7",create=True)
|
|
dum2.count = 1337
|
|
self.s.flush()
|
|
self.s.purge(dum)
|
|
self.reopen()
|
|
dum = self.s.get(Dummy, "5")
|
|
self.assertIsNone(dum)
|
|
dum2 = self.s.get(Dummy, "7")
|
|
self.assertEqual(1337, dum2.count)
|
|
|
|
def test_grow_last(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.data = "A"
|
|
dum2 = self.s.get(Dummy, "7",create=True)
|
|
self.reopen()
|
|
dum2 = self.s.get(Dummy, "7")
|
|
dum2.data = "A" * 1000
|
|
dum2.count = 1337
|
|
dum3 = self.s.get(Dummy, "9",create=True)
|
|
dum3.count = 1338
|
|
self.reopen()
|
|
dum2 = self.s.get(Dummy, "7")
|
|
self.assertEqual(1337, dum2.count)
|
|
dum3 = self.s.get(Dummy, "9")
|
|
self.assertEqual(1338, dum3.count)
|
|
|
|
def test_purge_last(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.data = "A"
|
|
dum2 = self.s.get(Dummy, "7",create=True)
|
|
self.reopen()
|
|
dum2 = self.s.get(Dummy, "7")
|
|
self.s.purge(dum2)
|
|
dum3 = self.s.get(Dummy, "9",create=True)
|
|
dum3.count = 1338
|
|
self.reopen()
|
|
dum2 = self.s.get(Dummy, "7")
|
|
self.assertIsNone(dum2)
|
|
dum3 = self.s.get(Dummy, "9")
|
|
self.assertEqual(1338, dum3.count)
|
|
|
|
def test_dont_relocate(self):
|
|
dum = self.s.get(Dummy, "5", create=True)
|
|
dum.data = "A"
|
|
self.s.flush()
|
|
old_off = dum._file_offset
|
|
self.reopen()
|
|
dum2 = self.s.get(Dummy, "5")
|
|
dum2.data = "BCDE"
|
|
self.s.flush()
|
|
new_off = dum._file_offset
|
|
self.assertEqual(old_off, new_off)
|
|
|
|
def test_chunk_header(self):
|
|
a = ChunkHeader()
|
|
a.size = 123
|
|
a.used = 122
|
|
a.in_use = True
|
|
b = BytesIO()
|
|
a.write(b)
|
|
b.seek(0)
|
|
c = ChunkHeader.parse(b)
|
|
self.assertEqual(c.size, a.size)
|
|
self.assertEqual(c.used, a.used)
|
|
self.assertEqual(c.in_use, True)
|
|
c.in_use = False
|
|
b.seek(0)
|
|
c.write(b)
|
|
b.seek(0)
|
|
d = ChunkHeader.parse(b)
|
|
self.assertEqual(d.size, a.size)
|
|
self.assertEqual(d.used, a.used)
|
|
self.assertEqual(d.in_use, False)
|
|
|
|
def test_mass(self):
|
|
num = 50
|
|
for i in range(num):
|
|
dum = self.s.get(Dummy, str(i), create=True)
|
|
dum.data = str(i)
|
|
dum.count = 0
|
|
self.reopen()
|
|
sz = os.stat(self.store_file.name).st_size
|
|
for j in range(50):
|
|
for i in range(num):
|
|
dum = self.s.get(Dummy, str(i))
|
|
# this works because j is max 49, and the slack is 64
|
|
# so no growing is needed
|
|
self.assertEqual(dum.data, "B" * j + str(i))
|
|
self.assertEqual(dum.count, j)
|
|
dum.data = "B" * (j+1) + str(i)
|
|
dum.count += 1
|
|
self.reopen()
|
|
sz2 = os.stat(self.store_file.name).st_size
|
|
self.assertEqual(sz, sz2)
|
|
an = StoreAnalyzer().run(self.store_file)
|
|
self.assertTrue(an)
|
|
|