# KVS_test.py 27/05/2016 D.J.Whale # # Tester for Key Value Store import unittest from lifecycle import * from KVS import KVS # A dummy test class class TV(): def __init__(self, id): self.id = id def __repr__(self): return "TV(%s)" % self.id def get_config(self): return { "id": self.id } def remove_file(filename): import os os.unlink(filename) def show_file(filename): """Show the contents of a file on screen""" with open(filename) as f: for l in f.readlines(): l = l.strip() # remove nl print(l) class TestKVSMemory(unittest.TestCase): @test_0 def test_create_blank(self): """Create a blank kvs, not bound to any external file""" kvs = KVS() # it should not fall over @test_0 def test_add(self): """Add an object into the kvs store""" kvs = KVS() kvs["tv1"] = TV(1) kvs["tv2"] = TV(2) print(kvs.store) @test_0 def test_change(self): """Change the value associated with an existing key""" kvs = KVS() kvs["tv1"] = TV(1) kvs["tv1"] = TV(111) # change it print(kvs.store) @test_0 def test_get(self): """Get the object associated with a key in the store""" kvs = KVS() kvs["tv1"] = TV(1) t = kvs["tv1"] print(t) @test_0 def test_delete(self): """Delete an existing key in the store, and a missing key for error""" kvs = KVS() kvs["tv1"] = TV(1) del kvs["tv1"] print(kvs.store) try: del kvs["tv1"] # expect error self.fail("Did not get expected KeyError exception") except KeyError: pass # expected @test_0 def test_size(self): """How big is the kvs""" kvs = KVS() kvs["tv1"] = TV(1) print(len(kvs)) kvs["tv2"] = TV(2) print(len(kvs)) @test_0 def test_keys(self): """Get out all keys of the kvs""" kvs = KVS() kvs["tv1"] = TV(1) kvs["tv2"] = TV(2) kvs["tv3"] = TV(3) print(kvs.keys()) class TestKVSPersisted(unittest.TestCase): KVS_FILENAME = "test.kvs" @test_0 def test_write(self): """Write an in memory KVS to a file""" remove_file(self.KVS_FILENAME) kvs = KVS() kvs["tv1"] = TV(1) kvs.write(self.KVS_FILENAME) show_file(self.KVS_FILENAME) @test_0 def test_load_cache(self): """Load record from a kvs file into the kvs cache""" # create a file to test against remove_file(self.KVS_FILENAME) kvs = KVS() kvs["tv1"] = TV(1) kvs.write(self.KVS_FILENAME) kvs = KVS() # clear it out again # load the file kvs.load(self.KVS_FILENAME) # check the state of the kvs memory print(kvs.store) # check state of the kvs file at end show_file(self.KVS_FILENAME) @test_0 def test_add(self): """Add a new record to a persisted KVS""" remove_file(self.KVS_FILENAME) kvs = KVS(self.KVS_FILENAME) kvs["tv1"] = TV(1) print(kvs.store) show_file(self.KVS_FILENAME) #---- HERE ---- @test_1 def test_delete(self): """Delete an existing key from the persistent version""" remove_file(self.KVS_FILENAME) kvs = KVS(self.KVS_FILENAME) kvs["tv1"] = TV(1) show_file(self.KVS_FILENAME) del kvs["tv1"] ####FAIL remove() needs implementing @test_0 def test_change(self): """Change an existing record in a persisted KVS""" remove_file(self.KVS_FILENAME) kvs = KVS(self.KVS_FILENAME) kvs["tv1"] = TV(1) show_file(self.KVS_FILENAME) kvs["tv1"] = TV(2) ####FAIL, need to implement kvs.remove() first show_file(self.KVS_FILENAME) @test_0 def test_ADD(self): pass #TODO: do ADD records get added when parsing the file? @test_0 def test_IGN(self): pass #TODO: do IGN records get ignored when parsing the file? @test_0 def test_DEL(self): pass #TODO: do DEL records get processed when parsing the file? @test_0 def test_load_process(self): """Load and process a file with lots of records in it""" #including ADD, IGN, DEL #make sure callback processing is working too for object creation #as the callback will create the object that is stored in the cache pass #TODO if __name__ == "__main__": unittest.main() # END