diff --git a/src/energenie/Devices.py b/src/energenie/Devices.py index 9999641..fb10d24 100644 --- a/src/energenie/Devices.py +++ b/src/energenie/Devices.py @@ -597,59 +597,5 @@ air_interface = DeviceFactory.default_air_interface return c(air_interface, device_id) - -#----- TEMPORARY TEST HARNESS ------------------------------------------------- - -#hmm: need two addresses for legacy - use a tuple (house_address, index) - -#unless we have an adaptor class for air_interface which represents the -#collective house address for a house code. So if you use more than one -#house address, you create multiple air interface adaptors with different -#house codes, that just delegate to the same actual radio air interface? -#bit like a little local router? - -#legacy1 = AirInterface.create("OOK", address=0xC8C8C, energenie_radio) - -# Could also consider this a local network, with common parameters shared -# by all devices that use it. -#air2 = AirInterface.create("FSK", energenie_radio) - -# scheduling would then become -# scheduler = Scheduler(energenie_radio) -# legacy1 = AirInterface.create("OOK", address=0xC8C8C, scheduler) -# air2 = AirInterface.create("FSK", scheduler -# so that when a device tries to transmit, it gets air interface specific -# settings added to it as appropriate, then the scheduler decides when -# to send and receive - -# Somehow we need to associate devices with an air interface -# This might allow us to support multiple radios in the future too? -#legacy1.add(tv) - -# cooperative loop could be energenie_radio.loop() -# or wrap a thread around it with start() but beware of thread context -# and thread safety. - -import time - -def test_without_registry(): - - tv = DeviceFactory.get_device("GreenButton", device_id=(0xC8C8C, 1)) - fan = DeviceFactory.get_device("AdaptorPlus", device_id=0x68b) - - while True: - print("ON") - tv.turn_on() - fan.turn_off() - time.sleep(2) - - print("OFF") - tv.turn_off() - fan.turn_on() - time.sleep(1) - - -if __name__ == "__main__": - test_without_registry() - # END + diff --git a/src/energenie/Devices_test.py b/src/energenie/Devices_test.py new file mode 100644 index 0000000..a373800 --- /dev/null +++ b/src/energenie/Devices_test.py @@ -0,0 +1,65 @@ +# Devices_test.py 21/05/2016 D.J.Whale +# +# Test harness for Devices module + +#TODO: Turn into unittest.TestCase + + +import time +from Devices import * + +#hmm: need two addresses for legacy - use a tuple (house_address, index) + +#unless we have an adaptor class for air_interface which represents the +#collective house address for a house code. So if you use more than one +#house address, you create multiple air interface adaptors with different +#house codes, that just delegate to the same actual radio air interface? +#bit like a little local router? + +#legacy1 = AirInterface.create("OOK", address=0xC8C8C, energenie_radio) + +# Could also consider this a local network, with common parameters shared +# by all devices that use it. +#air2 = AirInterface.create("FSK", energenie_radio) + +# scheduling would then become +# scheduler = Scheduler(energenie_radio) +# legacy1 = AirInterface.create("OOK", address=0xC8C8C, scheduler) +# air2 = AirInterface.create("FSK", scheduler +# so that when a device tries to transmit, it gets air interface specific +# settings added to it as appropriate, then the scheduler decides when +# to send and receive + +# Somehow we need to associate devices with an air interface +# This might allow us to support multiple radios in the future too? +#legacy1.add(tv) + +# cooperative loop could be energenie_radio.loop() +# or wrap a thread around it with start() but beware of thread context +# and thread safety. + + +def test_without_registry(): + + ##TODO: Problem, some devices have air_interface, some have adaptor, some have nothing, + #so the DeviceFactory constructor (c) doesn't work correctly with all devices + tv = DeviceFactory.get_device("GreenButton", device_id=(0xC8C8C, 1)) + fan = DeviceFactory.get_device("AdaptorPlus", device_id=0x68b) + + while True: + print("ON") + tv.turn_on() + fan.turn_off() + time.sleep(2) + + print("OFF") + tv.turn_off() + fan.turn_on() + time.sleep(1) + + +if __name__ == "__main__": + test_without_registry() + +# END + diff --git a/src/energenie/OpenThings.py b/src/energenie/OpenThings.py index eecf089..dcd447c 100644 --- a/src/energenie/OpenThings.py +++ b/src/energenie/OpenThings.py @@ -640,134 +640,5 @@ return message[path[-1]] -#----- TEST HARNESS ----------------------------------------------------------- - -def printhex(payload): - line = "" - for b in payload: - line += hex(b) + " " - - print(line) - - -TEST_PAYLOAD = [ - 0x1C, #len 16 + 10 + 2 = 0001 1100 - 0x04, #mfrid - 0x02, #prodid - 0x01, #pipmsb - 0x00, #piplsb - 0x00, 0x06, 0x8B, #sensorid - 0x70, 0x82, 0x00, 0x07, #SINT(2) power - 0x71, 0x82, 0xFF, 0xFD, #SINT(2) reactive_power - 0x76, 0x01, 0xF0, #UINT(1) voltage - 0x66, 0x22, 0x31, 0xDA, #UINT_BP8(2) freq - 0x73, 0x01, 0x01, #UINT(1) switch_state - 0x00, #NUL - 0x97, 0x64 #CRC - -] - -import pprint - - -def test_payload_unencrypted(): - init(242) - - printhex(TEST_PAYLOAD) - spec = decode(TEST_PAYLOAD, decrypt=False) - pprint.pprint(spec) - - payload = encode(spec, encrypt=False) - printhex(payload) - - spec2 = decode(payload, decrypt=False) - pprint.pprint(spec2) - - payload2 = encode(spec2, encrypt=False) - - printhex(TEST_PAYLOAD) - printhex(payload2) - - if TEST_PAYLOAD != payload: - print("FAILED") - else: - print("PASSED") - - -def test_payload_encrypted(): - init(242) - - printhex(TEST_PAYLOAD) - spec = decode(TEST_PAYLOAD, decrypt=False) - pprint.pprint(spec) - - payload = encode(spec, encrypt=True) - printhex(payload) - - spec2 = decode(payload, decrypt=True) - pprint.pprint(spec2) - - payload2 = encode(spec2, encrypt=False) - - printhex(TEST_PAYLOAD) - printhex(payload2) - - if TEST_PAYLOAD != payload: - print("FAILED") - else: - print("PASSED") - - -def test_value_encoder(): - pass - # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 - # UINT - # UINT_BP4 - # UINT_BP8 - # UINT_BP12 - # UINT_BP16 - # UINT_BP20 - # UINT_BP24 - # SINT - # SINT(2) - vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768] - for v in vin: - vout = Value.encode(v, Value.SINT) - print("encode " + str(v) + " " + str(vout)) - # SINT_BP8 - # SINT_BP16 - # SINT_BP24 - # CHAR - # FLOAT - - -def test_value_decoder(): - pass - # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 - # UINT - # UINT_BP4 - # UINT_BP8 - # UINT_BP12 - # UINT_BP16 - # UINT_BP20 - # UINT_BP24 - # SINT - vin = [255, 253] - print("input value:" + str(vin)) - vout = Value.decode(vin, Value.SINT, 2) - print("encoded as:" + str(vout)) - - # SINT_BP8 - # SINT_BP16 - # SINT_BP24 - # CHAR - # FLOAT - - -if __name__ == "__main__": - #test_value_encoder() - #test_value_decoder() - test_payload_unencrypted() - #test_payload_encrypted() # END diff --git a/src/energenie/OpenThings_test.py b/src/energenie/OpenThings_test.py new file mode 100644 index 0000000..958e627 --- /dev/null +++ b/src/energenie/OpenThings_test.py @@ -0,0 +1,139 @@ +# OpenThings_test.py 21/05/2016 D.J.Whale +# +# Test harness for OpenThings protocol encoder and decoder + +#TODO: Turn this into unittest.TestCase + + +from OpenThings import * +import pprint + + +def printhex(payload): + line = "" + for b in payload: + line += hex(b) + " " + + print(line) + + +TEST_PAYLOAD = [ + 0x1C, #len 16 + 10 + 2 = 0001 1100 + 0x04, #mfrid + 0x02, #prodid + 0x01, #pipmsb + 0x00, #piplsb + 0x00, 0x06, 0x8B, #sensorid + 0x70, 0x82, 0x00, 0x07, #SINT(2) power + 0x71, 0x82, 0xFF, 0xFD, #SINT(2) reactive_power + 0x76, 0x01, 0xF0, #UINT(1) voltage + 0x66, 0x22, 0x31, 0xDA, #UINT_BP8(2) freq + 0x73, 0x01, 0x01, #UINT(1) switch_state + 0x00, #NUL + 0x97, 0x64 #CRC + +] + + + +def test_payload_unencrypted(): + init(242) + + printhex(TEST_PAYLOAD) + spec = decode(TEST_PAYLOAD, decrypt=False) + pprint.pprint(spec) + + payload = encode(spec, encrypt=False) + printhex(payload) + + spec2 = decode(payload, decrypt=False) + pprint.pprint(spec2) + + payload2 = encode(spec2, encrypt=False) + + printhex(TEST_PAYLOAD) + printhex(payload2) + + if TEST_PAYLOAD != payload: + print("FAILED") + else: + print("PASSED") + + +def test_payload_encrypted(): + init(242) + + printhex(TEST_PAYLOAD) + spec = decode(TEST_PAYLOAD, decrypt=False) + pprint.pprint(spec) + + payload = encode(spec, encrypt=True) + printhex(payload) + + spec2 = decode(payload, decrypt=True) + pprint.pprint(spec2) + + payload2 = encode(spec2, encrypt=False) + + printhex(TEST_PAYLOAD) + printhex(payload2) + + if TEST_PAYLOAD != payload: + print("FAILED") + else: + print("PASSED") + + +def test_value_encoder(): + pass + # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 + # UINT + # UINT_BP4 + # UINT_BP8 + # UINT_BP12 + # UINT_BP16 + # UINT_BP20 + # UINT_BP24 + # SINT + # SINT(2) + vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768] + for v in vin: + vout = Value.encode(v, Value.SINT) + print("encode " + str(v) + " " + str(vout)) + # SINT_BP8 + # SINT_BP16 + # SINT_BP24 + # CHAR + # FLOAT + + +def test_value_decoder(): + pass + # test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1 + # UINT + # UINT_BP4 + # UINT_BP8 + # UINT_BP12 + # UINT_BP16 + # UINT_BP20 + # UINT_BP24 + # SINT + vin = [255, 253] + print("input value:" + str(vin)) + vout = Value.decode(vin, Value.SINT, 2) + print("encoded as:" + str(vout)) + + # SINT_BP8 + # SINT_BP16 + # SINT_BP24 + # CHAR + # FLOAT + + +if __name__ == "__main__": + #test_value_encoder() + #test_value_decoder() + test_payload_unencrypted() + #test_payload_encrypted() + +# END \ No newline at end of file diff --git a/src/energenie/Registry.py b/src/energenie/Registry.py index 7a6f2eb..0c5638c 100644 --- a/src/energenie/Registry.py +++ b/src/energenie/Registry.py @@ -274,162 +274,5 @@ ook_router = Router("ook") -#----- SIMPLE TEST HARNESS ---------------------------------------------------- - -def test_with_registry_tx(): - """Test sending a message to a green button and a MiHome Adaptor Plus switch""" - - #TODO need a way to separate device creation from device restoration - #and the app needs to know what mode it is in. - #creation is probably just a test feature, as a user would either - #install the device, configure it, or learn it. - - - # seed the registry - registry.add(Devices.MIHO005(device_id=0x68b), "tv") - registry.add(Devices.ENER002(device_id=(0xC8C8C, 1)), "fan") - - # test the auto create mechanism - import sys - registry.auto_create(sys.modules[__name__]) - - # variables should now be created in module scope - #print("tv %s" % tv) - #print("fan %s" % fan) - - tv.turn_on() - tv.turn_off() - fan.turn_on() - fan.turn_off() - - #print("tv switch:%s" % tv.has_switch()) - #print("tv send:%s" % tv.can_send()) - #print("tv receive:%s" % tv.can_receive()) - - #print("fan switch:%s" % fan.has_switch()) - #print("fan send:%s" % fan.can_send()) - #print("fan receive:%s" % fan.can_receive()) - - -def test_with_registry_rx(): - """Test receiving a dummy message on a MiHome adaptor plus""" - - # seed the registry - registry.add(Devices.MIHO005(device_id=0x68b), "tv") - - # test the auto create mechanism - import sys - registry.auto_create(sys.modules[__name__]) - - tv.turn_on() - - #synthesise receiving a report message - #push it down the receive pipeline - #radio.receive() - # ->OpenThingsAirInterface.incoming - # ->OpenThings.decrypt - # ->OpenThings.decode - # ->OpenThingsAirInterface->route - # ->MIHO005.incoming_message() - # - #it should update voltage, power etc - ## poor mans incoming synthetic message - - MIHO005_REPORT = { - "header": { - "mfrid": Devices.MFRID_ENERGENIE, - "productid": Devices.PRODUCTID_MIHO005, - "encryptPIP": Devices.CRYPT_PIP, - "sensorid": 0 # FILL IN - }, - "recs": [ - { - "wr": False, - "paramid": OpenThings.PARAM_SWITCH_STATE, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_VOLTAGE, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_CURRENT, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_FREQUENCY, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_REAL_POWER, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_REACTIVE_POWER, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - { - "wr": False, - "paramid": OpenThings.PARAM_APPARENT_POWER, - "typeid": OpenThings.Value.UINT, - "length": 1, - "value": 0 # FILL IN - }, - - ] - } - - report = Devices.create_message(MIHO005_REPORT) - report = OpenThings.alterMessage( - report, - recs_0_value=240, # voltage - recs_1_value=2, # current - recs_2_value=50, # frequency - recs_3_value=100, # real power - recs_4_value=0, # reactive power - recs_5_value=100, # apparent power - ) - tv.incoming_message(report) - - - - # get readings from device - voltage = tv.get_voltage() - frequency = tv.get_frequency() - power = tv.get_real_power() - switch = tv.is_on() - - print("voltage %f" % voltage) - print("frequency %f" % frequency) - print("real power %f" % power) - print("switch %s" % switch) - - -if __name__ == "__main__": - import OpenThings - - OpenThings.init(Devices.CRYPT_PID) - - ##test_with_registry_tx() - - test_with_registry_rx() - # END diff --git a/src/energenie/Registry_test.py b/src/energenie/Registry_test.py new file mode 100644 index 0000000..8c85c79 --- /dev/null +++ b/src/energenie/Registry_test.py @@ -0,0 +1,148 @@ +# Registry_Test.py 21/05/2016 D.J.Whale +# +# Test harness for the Registry + +#TODO: need a way to separate device creation from device restoration +#and the app needs to know what mode it is in. +#creation is probably just a test feature, as a user would either +#install the device, configure it, or learn it. + + +import unittest +from Registry import * + +class TestRegistry(unittest.TestCase): + def setUp(self): + # seed the registry + registry.add(Devices.MIHO005(device_id=0x68b), "tv") + registry.add(Devices.ENER002(device_id=(0xC8C8C, 1)), "fan") + + # test the auto create mechanism + #import sys + #registry.auto_create(sys.modules[__name__]) + registry.auto_create(self) + + def test_capabilities(self): + print("tv switch:%s" % self.tv.has_switch()) + print("tv send:%s" % self.tv.can_send()) + print("tv receive:%s" % self.tv.can_receive()) + + print("fan switch:%s" % self.fan.has_switch()) + print("fan send:%s" % self.fan.can_send()) + print("fan receive:%s" % self.fan.can_receive()) + + def text_tx(self): + """Test the transmit pipeline""" + + tv.turn_on() + tv.turn_off() + fan.turn_on() + fan.turn_off() + + def test_fsk_rx(self): + """Test the receive pipeline for FSK MiHome adaptor""" + + #synthesise receiving a report message + #push it down the receive pipeline + #radio.receive() + # ->OpenThingsAirInterface.incoming + # ->OpenThings.decrypt + # ->OpenThings.decode + # ->OpenThingsAirInterface->route + # ->MIHO005.incoming_message() + # + #it should update voltage, power etc + ## poor mans incoming synthetic message + + MIHO005_REPORT = { + "header": { + "mfrid": Devices.MFRID_ENERGENIE, + "productid": Devices.PRODUCTID_MIHO005, + "encryptPIP": Devices.CRYPT_PIP, + "sensorid": 0 # FILL IN + }, + "recs": [ + { + "wr": False, + "paramid": OpenThings.PARAM_SWITCH_STATE, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_VOLTAGE, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_CURRENT, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_FREQUENCY, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_REAL_POWER, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_REACTIVE_POWER, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + { + "wr": False, + "paramid": OpenThings.PARAM_APPARENT_POWER, + "typeid": OpenThings.Value.UINT, + "length": 1, + "value": 0 # FILL IN + }, + + ] + } + + report = Devices.create_message(MIHO005_REPORT) + report = OpenThings.alterMessage( + report, + recs_0_value=240, # voltage + recs_1_value=2, # current + recs_2_value=50, # frequency + recs_3_value=100, # real power + recs_4_value=0, # reactive power + recs_5_value=100, # apparent power + ) + self.tv.incoming_message(report) + + # get readings from device + voltage = self.tv.get_voltage() + frequency = self.tv.get_frequency() + power = self.tv.get_real_power() + switch = self.tv.is_on() + + print("voltage %f" % voltage) + print("frequency %f" % frequency) + print("real power %f" % power) + print("switch %s" % switch) + + +if __name__ == "__main__": + import OpenThings + OpenThings.init(Devices.CRYPT_PID) + + unittest.main() + +# END \ No newline at end of file diff --git a/src/energenie/crypto_test.py b/src/energenie/crypto_test.py new file mode 100644 index 0000000..950b495 --- /dev/null +++ b/src/energenie/crypto_test.py @@ -0,0 +1,7 @@ +# crypto_test.py 21/05/2016 D.J.Whale +# +# Placeholder for test harness for crypto.py + +#TODO: + +# END