diff --git a/src/energenie/Devices_test.py b/src/energenie/Devices_test.py index 5053eeb..70d5552 100644 --- a/src/energenie/Devices_test.py +++ b/src/energenie/Devices_test.py @@ -2,15 +2,24 @@ # # Test harness for Devices module -import Devices import unittest from lifecycle import * -import OpenThings +try: + # Python 2 + import Devices + import OpenThings + import radio + +except ImportError: + # Python 3 + from . import Devices + from . import OpenThings + from . import radio class TestDevices(unittest.TestCase): - @test_0 + @test_1 def test_without_registry(self): """A simple on/off test with some devices from the device factory""" tv = Devices.DeviceFactory.get_device_from_name("GreenButton", device_id=(0xC8C8C, 1)) @@ -41,7 +50,6 @@ def init(): """Start the Energenie system running""" - import OpenThings, radio radio.DEBUG = True radio.init() OpenThings.init(Devices.CRYPT_PID) diff --git a/src/energenie/KVS_test.py b/src/energenie/KVS_test.py index d6261cd..42fa5bd 100644 --- a/src/energenie/KVS_test.py +++ b/src/energenie/KVS_test.py @@ -6,7 +6,7 @@ from lifecycle import * from KVS import KVS, NotPersistableError -# A dummy test class +#---- DUMMY TEST CLASSES ------------------------------------------------------ class TV(): def __init__(self, id): @@ -29,6 +29,7 @@ raise ValueError("Unknown device name %s" % name) +#----- FILE HELPERS ----------------------------------------------------------- def remove_file(filename): import os @@ -51,6 +52,10 @@ f.write(line + '\n') +#----- TEST KVS MEMORY -------------------------------------------------------- +# +# Test the KVS in-memory only configuration (no persistence to file) + class TestKVSMemory(unittest.TestCase): @test_1 @@ -119,6 +124,10 @@ print(kvs.keys()) +#----- TEST KVS PERSISTED ----------------------------------------------------- +# +# Test the KVS persisted to a file + class TestKVSPersisted(unittest.TestCase): KVS_FILENAME = "test.kvs" diff --git a/src/energenie/OpenThings.py b/src/energenie/OpenThings.py index 04d0070..a1cf501 100644 --- a/src/energenie/OpenThings.py +++ b/src/energenie/OpenThings.py @@ -498,7 +498,7 @@ bits = (((bits-1)/8)+1)*8 # snap to nearest byte boundary ##trace("snap bits to 8:" + str(bits)) - value &= ((2**bits)-1) + value &= ((2**int(bits))-1) neg = True else: neg = False diff --git a/src/energenie/OpenThings_test.py b/src/energenie/OpenThings_test.py index 7bfba56..986dd46 100644 --- a/src/energenie/OpenThings_test.py +++ b/src/energenie/OpenThings_test.py @@ -2,14 +2,13 @@ # # Test harness for OpenThings protocol encoder and decoder -#TODO: Turn this into unittest.TestCase - - -from OpenThings import * import pprint import unittest from lifecycle import * +from OpenThings import * +import Devices + def printhex(payload): line = "" @@ -37,105 +36,103 @@ ] +class TestPayloads(unittest.TestCase): + @test_1 + def test_payload_unencrypted(self): + init(242) -def test_payload_unencrypted(): - init(242) + printhex(TEST_PAYLOAD) + spec = decode(TEST_PAYLOAD, decrypt=False) + pprint.pprint(spec) - printhex(TEST_PAYLOAD) - spec = decode(TEST_PAYLOAD, decrypt=False) - pprint.pprint(spec) + payload = encode(spec, encrypt=False) + printhex(payload) - payload = encode(spec, encrypt=False) - printhex(payload) + spec2 = decode(payload, decrypt=False) + pprint.pprint(spec2) - spec2 = decode(payload, decrypt=False) - pprint.pprint(spec2) + payload2 = encode(spec2, encrypt=False) - payload2 = encode(spec2, encrypt=False) + printhex(TEST_PAYLOAD) + printhex(payload2) - printhex(TEST_PAYLOAD) - printhex(payload2) + if TEST_PAYLOAD != payload: + print("FAILED") + else: + print("PASSED") - if TEST_PAYLOAD != payload: - print("FAILED") - else: - print("PASSED") + @test_1 + def test_payload_encrypted(self): + init(242) + printhex(TEST_PAYLOAD) + spec = decode(TEST_PAYLOAD, decrypt=False) + pprint.pprint(spec) -def test_payload_encrypted(): - init(242) + payload = encode(spec, encrypt=True) + printhex(payload) - printhex(TEST_PAYLOAD) - spec = decode(TEST_PAYLOAD, decrypt=False) - pprint.pprint(spec) + spec2 = decode(payload, decrypt=True) + pprint.pprint(spec2) - payload = encode(spec, encrypt=True) - printhex(payload) + payload2 = encode(spec2, encrypt=False) - spec2 = decode(payload, decrypt=True) - pprint.pprint(spec2) + printhex(TEST_PAYLOAD) + printhex(payload2) - payload2 = encode(spec2, encrypt=False) + if TEST_PAYLOAD != payload: + print("FAILED") + else: + print("PASSED") - printhex(TEST_PAYLOAD) - printhex(payload2) + @test_1 + def test_value_encoder(self): + pass + # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 + # UINT + # UINT_BP4 + # UINT_BP8 + # UINT_BP12 + # UINT_BP16 + # UINT_BP20 + # UINT_BP24 + # SINT + # SINT(2) + vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768] + for v in vin: + vout = Value.encode(v, Value.SINT) + print("encode " + str(v) + " " + str(vout)) + # SINT_BP8 + # SINT_BP16 + # SINT_BP24 + # CHAR + # FLOAT - if TEST_PAYLOAD != payload: - print("FAILED") - else: - print("PASSED") + @test_1 + def test_value_decoder(self): + pass + # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 + # UINT + # UINT_BP4 + # UINT_BP8 + # UINT_BP12 + # UINT_BP16 + # UINT_BP20 + # UINT_BP24 + # SINT + vin = [255, 253] + print("input value:" + str(vin)) + vout = Value.decode(vin, Value.SINT, 2) + print("encoded as:" + str(vout)) - -def test_value_encoder(): - pass - # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 - # UINT - # UINT_BP4 - # UINT_BP8 - # UINT_BP12 - # UINT_BP16 - # UINT_BP20 - # UINT_BP24 - # SINT - # SINT(2) - vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768] - for v in vin: - vout = Value.encode(v, Value.SINT) - print("encode " + str(v) + " " + str(vout)) - # SINT_BP8 - # SINT_BP16 - # SINT_BP24 - # CHAR - # FLOAT - - -def test_value_decoder(): - pass - # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 - # UINT - # UINT_BP4 - # UINT_BP8 - # UINT_BP12 - # UINT_BP16 - # UINT_BP20 - # UINT_BP24 - # SINT - vin = [255, 253] - print("input value:" + str(vin)) - vout = Value.decode(vin, Value.SINT, 2) - print("encoded as:" + str(vout)) - - # SINT_BP8 - # SINT_BP16 - # SINT_BP24 - # CHAR - # FLOAT + # SINT_BP8 + # SINT_BP16 + # SINT_BP24 + # CHAR + # FLOAT #----- UNIT TEST FOR MESSAGE -------------------------------------------------- - -import Devices - # ACCESSOR VARIANTS # as method parameters: # 1 {pydict} @@ -155,18 +152,16 @@ #TODO: Break dependence on Devices.MIHO005_REPORT (makes tests brittle) - - class TestMessage(unittest.TestCase): - @test_0 + @test_1 def test_blank(self): """CREATE a completely blank message""" msg = Message() print(msg) msg.dump() - @test_0 + @test_1 def test_blank_template(self): """CREATE a message from a simple pydict template""" # This is useful to simplify all other tests @@ -174,21 +169,21 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_blank_create_dict(self): #1 {pydict} """CREATE a blank message with a dict parameter""" msg = Message({"header":{}, "recs":[{"wr":False, "parmid":PARAM_SWITCH_STATE, "value":1}]}) print(msg) msg.dump() - @test_0 + @test_1 def test_blank_create_header_dict(self): #2 header={pydict} """CREATE a blank message and add a header at creation time from a dict""" msg = Message(header={"mfrid":123, "productid":456, "sensorid":789}) print(msg) msg.dump() - @test_0 + @test_1 def test_create_big_template(self): #1 {pydict} """CREATE from a large template message""" # create a message from a template @@ -196,7 +191,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_rec_dict(self): #1 {pydict} """UPDATE(APPEND) rec fields from a dict parameter""" msg = Message(Message.BLANK) @@ -205,7 +200,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_header_dict(self): #2 header={pydict} """UPDATE(SET) a new header to an existing message""" msg = Message() @@ -213,7 +208,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_recs_dict(self): """UPDATE(SET) recs to an existing message""" msg = Message() @@ -221,7 +216,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_path(self): """UPDATE(SET) a pathed key to an existing message""" msg = Message() @@ -229,7 +224,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_alter_template(self): #3 header_mfrid=123 """UPDATE(SET) an existing key with a path""" msg = Message(Devices.MIHO005_REPORT) @@ -237,7 +232,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_alter_template_multiple(self): """UPDATE(SET) multiple keys with paths""" msg = Message(Devices.MIHO005_REPORT) @@ -245,14 +240,14 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_blank_create_header_paths(self): #3 header_mfrid=123 (CREATE) """CREATE message with pathed keys in constructor""" msg = Message(header_mfrid=123, header_productid=456, header_sensorid=789) print(msg) msg.dump() - @test_0 + @test_1 def test_blank_create_recs_paths(self): """CREATE message with pathed keys in constructor""" # uses integer path component to mean array index @@ -261,7 +256,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_rec_path(self): #5 recs_0_paramid=PARAM_SWITCH_STATE """UPDATE(SET) records in a message""" msg = Message(recs_0={}) # must create rec before you can change it @@ -270,7 +265,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_rec_fn_pydict(self): #6 SWITCH_STATE={pydict} """UPDATE(ADD) a rec to message using PARAM constants as keys""" #always creates a new rec at the end and then populates @@ -279,7 +274,7 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_add_rec_fn_keyed(self): #7 SWITCH_STATE,value=1 (ADD) """UPDATE(ADD) a rec to message using PARAM const and keyed values""" msg = Message() @@ -287,69 +282,69 @@ print(msg) msg.dump() - @test_0 + @test_1 def test_get_pathed(self): """READ from the message with a path key""" msg = Message(Devices.MIHO005_REPORT) print(msg.get("header_mfrid")) - @test_0 + @test_1 def test_attr_header(self): #9 msg["header"] msg["recs"][0] """READ(attr) the header""" # access a specific keyed entry like a normal pydict, for read msg = Message(Devices.MIHO005_REPORT) print(msg["header"]) - @test_0 + @test_1 def test_attr_header_field(self): #9 msg["header"] msg["recs"][0] """READ(attr) a field within the header""" # access a specific keyed entry like a normal pydict, for read msg = Message(Devices.MIHO005_REPORT) print(msg["header"]["mfrid"]) - @test_0 + @test_1 def test_attr_recs(self): #9 msg["header"] msg["recs"][0] """READ(attr) all recs""" # access a specific keyed entry like a normal pydict, for read msg = Message(Devices.MIHO005_REPORT) print(msg["recs"]) - @test_0 + @test_1 def test_attr_rec(self): #9 msg["header"] msg["recs"][0] """READ(attr) a single reg""" # access a specific keyed entry like a normal pydict, for read msg = Message(Devices.MIHO005_REPORT) print(msg["recs"][0]) - @test_0 + @test_1 def test_attr_rec_field(self): #9 msg["header"] msg["recs"][0] """READ(attr) a field in a rec""" # access a specific keyed entry like a normal pydict, for read msg = Message(Devices.MIHO005_REPORT) print(msg["recs"][0]["value"]) - @test_0 + @test_1 def test_setattr_header(self): """UPDATE(SET) the header from a setattr key index""" msg = Message(Devices.MIHO005_REPORT) msg["header"] = {"mfrid":111, "productid":222, "sensorid":333} print(msg) - @test_0 + @test_1 def test_setattr_header_field_overwrite(self): """UPDATE(SET) overwrite an existing header field""" msg = Message(Devices.MIHO005_REPORT) msg["header"]["productid"] = 999 print(msg) - @test_0 + @test_1 def test_setattr_header_field_add(self): """UPDATE(SET) add a new header field""" msg = Message(Devices.MIHO005_REPORT) msg["header"]["timestamp"] = 1234 print(msg) - @test_0 + @test_1 def test_setattr_recs(self): """UPDATE(SET) overwrite all recs""" msg = Message(Devices.MIHO005_REPORT) @@ -357,42 +352,42 @@ {"paramid":PARAM_AIR_PRESSURE, "wr":True, "value":33}] print(msg) - @test_0 + @test_1 def test_setattr_rec_overwrite(self): """UPDATE(SET) overwrite a single rec""" msg = Message(Devices.MIHO005_REPORT) msg["recs"][0] = {"paramid":9999, "wr":False, "value":9999} print(msg) - @test_0 + @test_1 def test_setattr_rec_append(self): """UPDATE(SET) add a new rec by appending""" msg = Message(Devices.MIHO005_REPORT) msg["recs"].append({"paramid":9999, "wr":True, "value":9999}) print(msg) - @test_0 - def Xtest_setattr_rec_field_overwrite(self): + @test_1 + def test_setattr_rec_field_overwrite(self): """UPDATE(SET) overwrite an existing rec field""" msg = Message(Devices.MIHO005_REPORT) msg["recs"][0]["value"] = 9999 print(msg) - @test_0 + @test_1 def test_setattr_rec_field_append(self): """UPDATE(SET) append a new field""" msg = Message(Devices.MIHO005_REPORT) msg["recs"][0]["colour"] = "**RED**" print(msg) - @test_0 + @test_1 def test_paramid_read_rec(self): #10 msg[PARAM_SWITCH_STATE] """READ a rec field keyed by the PARAMID""" #This triggers a sequential search through the recs for the first matching paramid msg = Message(Devices.MIHO005_REPORT) print(msg[PARAM_SWITCH_STATE]) - @test_0 + @test_1 def test_paramid_read_missing_rec(self): #10 msg[PARAM_SWITCH_STATE] """READ a rec field keyed by the PARAMID""" #This triggers a sequential search through the recs for the first matching paramid @@ -403,13 +398,13 @@ except Exception as e: pass #expected - @test_0 + @test_1 def test_paramid_read_field(self): """READ a field within a parameter rec""" msg = Message(Devices.MIHO005_REPORT) print(msg[PARAM_SWITCH_STATE]["value"]) - @test_0 + @test_1 def test_paramid_write_rec_overwrite(self): """WRITE (overwrite) a whole paramid rec""" msg = Message(recs_0={"paramid":PARAM_SWITCH_STATE, "wr":True, "value":33}) @@ -418,7 +413,7 @@ ##print(msg) print(msg[PARAM_SWITCH_STATE]) - @test_0 + @test_1 def test_paramid_write_rec_add(self): """WRITE (add) a whole paramid rec""" msg = Message(Devices.MIHO005_REPORT) @@ -426,33 +421,33 @@ ##print(msg) print(msg[PARAM_AIR_PRESSURE]) - @test_0 + @test_1 def test_paramid_write_field_change(self): """WRITE (change) a field in a paramid rec""" msg = Message(recs_0={"paramid":PARAM_SWITCH_STATE, "wr":True, "value":33}) msg[PARAM_SWITCH_STATE]["value"] = 123 print(msg) - @test_0 + @test_1 def test_paramid_write_field_add(self): """WRITE(add) a field to a paramid rec""" msg = Message(Devices.MIHO005_REPORT) msg[PARAM_SWITCH_STATE]["colour"] = "***RED***" print(msg) - @test_0 + @test_1 def test_repr(self): ## dump a message in printable format msg = Message(Devices.MIHO005_REPORT) print(msg) - @test_0 + @test_1 def test_str(self): ## dump a message in printable format msg = Message(Devices.MIHO005_REPORT) print(str(msg)) - @test_0 + @test_1 def test_alter_PARAM_NAME_rec(self): #8 SWITCH_STATE_value=1 """UPDATE(alter) a complete rec from a PARAM_NAME index""" msg = Message(Message.BLANK) @@ -460,7 +455,7 @@ msg.set(recs_SWITCH_STATE={"wr":False, "value":99}) print(msg) - @test_0 + @test_1 def test_alter_PARAM_NAME_field(self): #8 SWITCH_STATE_value=1 """UPDATE(alter) a rec field from a PARAM_NAME index""" msg = Message(Message.BLANK) @@ -469,17 +464,7 @@ print(msg) -def test_message(): - import unittest - unittest.main() - if __name__ == "__main__": - #TODO: Change these into unittest test cases - ##test_value_encoder() - ##test_value_decoder() - ##test_payload_unencrypted() - ##test_payload_encrypted() - - test_message() + unittest.main() # END \ No newline at end of file diff --git a/src/energenie/TwoBit_test.py b/src/energenie/TwoBit_test.py index d01a163..9226864 100644 --- a/src/energenie/TwoBit_test.py +++ b/src/energenie/TwoBit_test.py @@ -2,6 +2,7 @@ # # Test the OOK message encoder +import unittest import TwoBit def ashex(data): @@ -16,25 +17,30 @@ #----- TEST APPLICATION ------------------------------------------------------- -print("*" * 80) +class TestTwoBit(unittest.TestCase): + ALL_ON = TwoBit.encode_switch_message(True) + ALL_OFF = TwoBit.encode_switch_message(False) + ONE_ON = TwoBit.encode_switch_message(True, device_address=1) + ONE_OFF = TwoBit.encode_switch_message(False, device_address=1) + TWO_ON = TwoBit.encode_switch_message(True, device_address=2) + TWO_OFF = TwoBit.encode_switch_message(False, device_address=2) + THREE_ON = TwoBit.encode_switch_message(True, device_address=3) + THREE_OFF = TwoBit.encode_switch_message(False, device_address=3) + FOUR_ON = TwoBit.encode_switch_message(True, device_address=4) + FOUR_OFF = TwoBit.encode_switch_message(False, device_address=4) + MYHOUSE_ALL_ON = TwoBit.encode_switch_message(True, house_address=0x12345) -ALL_ON = TwoBit.encode_switch_message(True) -ALL_OFF = TwoBit.encode_switch_message(False) -ONE_ON = TwoBit.encode_switch_message(True, device_address=1) -ONE_OFF = TwoBit.encode_switch_message(False, device_address=1) -TWO_ON = TwoBit.encode_switch_message(True, device_address=2) -TWO_OFF = TwoBit.encode_switch_message(False, device_address=2) -THREE_ON = TwoBit.encode_switch_message(True, device_address=3) -THREE_OFF = TwoBit.encode_switch_message(False, device_address=3) -FOUR_ON = TwoBit.encode_switch_message(True, device_address=4) -FOUR_OFF = TwoBit.encode_switch_message(False, device_address=4) -MYHOUSE_ALL_ON = TwoBit.encode_switch_message(True, house_address=0x12345) + tests = [ALL_ON, ALL_OFF, ONE_ON, ONE_OFF, TWO_ON, TWO_OFF, THREE_ON, THREE_OFF, FOUR_ON, FOUR_OFF, MYHOUSE_ALL_ON] -tests = [ALL_ON, ALL_OFF, ONE_ON, ONE_OFF, TWO_ON, TWO_OFF, THREE_ON, THREE_OFF, FOUR_ON, FOUR_OFF, MYHOUSE_ALL_ON] + def test_all(self): + for t in self.tests: + print('') + print(ashex(t)) -for t in tests: - print('') - print(ashex(t)) + +if __name__ == "__main__": + unittest.main() + # END