diff --git a/doc/devices_classes_branch.txt b/doc/devices_classes_branch.txt index a5ce070..8ee52d7 100644 --- a/doc/devices_classes_branch.txt +++ b/doc/devices_classes_branch.txt @@ -403,19 +403,15 @@ Router written and integrated in energenie.loop() Design for discovery done +Test cases for 5 discovery variants done -------------------------------------------------------------------------------- TODO NEXT -DONE auto discovery, accept and reject +Code up join auto and join ask, and get tests passing. -* write test cases in Registry_test.py to test the 4 different discovery types - (might need some mocking to soft-test the join/ack mechanism, probably just - route in a synthetic join_req, and capture the tx log to see the join_ack - going out) - ---- PERSISTENT REGISTRY @@ -431,6 +427,8 @@ later iterate through all devices and decide what to do with them, such as displaying them all in a GUI selection list. + (Note, Registry.devices() already gives us the list and Registry.keys() + already gives us the name keys) ---- NOTIFY, UPDATE, or DATA SEQUENCE? diff --git a/src/energenie/Registry_test.py b/src/energenie/Registry_test.py index e28318c..3a38177 100644 --- a/src/energenie/Registry_test.py +++ b/src/energenie/Registry_test.py @@ -14,7 +14,8 @@ radio.DEBUG=True -class TestRegistry(unittest.TestCase): +class Dis: +##class TestRegistry(unittest.TestCase): def setUp(self): # seed the registry registry.add(Devices.MIHO005(device_id=0x68b), "tv") @@ -81,6 +82,112 @@ print("switch %s" % switch) +import OpenThings + +UNKNOWN_SENSOR_ID = 0x111 + +class TestDiscovery(unittest.TestCase): + def setUp(self): + # build a synthetic message + self.msg = OpenThings.Message(Devices.MIHO005_REPORT) + self.msg[OpenThings.PARAM_VOLTAGE]["value"] = 240 + + def XXtest_discovery_none(self): + discovery_none() + + # Poke synthetic unknown into the router and let it route to unknown handler + self.msg.set(header_sensorid=UNKNOWN_SENSOR_ID) + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + # expect unknown handler to fire + + def XXtest_discovery_auto(self): + discovery_auto() + + # Poke synthetic unknown into the router and let it route to unknown handler + self.msg.set(header_sensorid=UNKNOWN_SENSOR_ID) + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + # expect auto accept logic to fire + registry.list() + fsk_router.list() + + def XXtest_discovery_ask(self): + def yes(a,b): return True + def no(a,b): return False + + discovery_ask(no) + + # Poke synthetic unknown into the router and let it route to unknown handler + self.msg.set(header_sensorid=UNKNOWN_SENSOR_ID) + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + # expect a reject + + discovery_ask(yes) + + # Poke synthetic unknown into the router and let it route to unknown handler + self.msg.set(header_sensorid=UNKNOWN_SENSOR_ID) + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + # expect a accept + registry.list() + fsk_router.list() + + #---- HERE ---- + + def test_discovery_autojoin(self): + discovery_autojoin() + + # Poke synthetic unknown JOIN into the router and let it route to unknown handler + self.msg = OpenThings.Message(header_mfrid=Devices.MFRID_ENERGENIE, + header_productid=Devices.MIHO005, + header_sensorid=UNKNOWN_SENSOR_ID) + self.msg[OpenThings.PARAM_JOIN] = {} + + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + ####FAIL - join detect is not written + # expect auto accept and join_ack logic to fire + ##registry.list() + ##fsk_router.list() + + def XXXXtest_discovery_askjoin(self): + def no(a,b): return False + def yes(a,b): return True + + discovery_askjoin(no) + + # Poke synthetic unknown JOIN into the router and let it route to unknown handler + self.msg = OpenThings.Message(header_mfrid=Devices.MFRID_ENERGENIE, + header_productid=Devices.MIHO005, + header_sensorid=UNKNOWN_SENSOR_ID) + self.msg[OpenThings.PARAM_JOIN] = {} + + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + ####FAIL - join detect is not written + # expect auto accept and join_ack logic to fire + ##registry.list() + ##fsk_router.list() + + discovery_askjoin(no) + + fsk_router.incoming_message( + (Devices.MFRID_ENERGENIE, Devices.PRODUCTID_MIHO005, UNKNOWN_SENSOR_ID), self.msg) + + ####FAIL - join detect is not written + # expect auto accept and join_ack logic to fire + ##registry.list() + ##fsk_router.list() + + if __name__ == "__main__": import OpenThings OpenThings.init(Devices.CRYPT_PID)