# OpenThings_test.py 21/05/2016 D.J.Whale
#
# Test harness for OpenThings protocol encoder and decoder
#TODO: Turn this into unittest.TestCase
from OpenThings import *
import pprint
import unittest
def printhex(payload):
line = ""
for b in payload:
line += hex(b) + " "
print(line)
TEST_PAYLOAD = [
0x1C, #len 16 + 10 + 2 = 0001 1100
0x04, #mfrid
0x02, #prodid
0x01, #pipmsb
0x00, #piplsb
0x00, 0x06, 0x8B, #sensorid
0x70, 0x82, 0x00, 0x07, #SINT(2) power
0x71, 0x82, 0xFF, 0xFD, #SINT(2) reactive_power
0x76, 0x01, 0xF0, #UINT(1) voltage
0x66, 0x22, 0x31, 0xDA, #UINT_BP8(2) freq
0x73, 0x01, 0x01, #UINT(1) switch_state
0x00, #NUL
0x97, 0x64 #CRC
]
def test_payload_unencrypted():
init(242)
printhex(TEST_PAYLOAD)
spec = decode(TEST_PAYLOAD, decrypt=False)
pprint.pprint(spec)
payload = encode(spec, encrypt=False)
printhex(payload)
spec2 = decode(payload, decrypt=False)
pprint.pprint(spec2)
payload2 = encode(spec2, encrypt=False)
printhex(TEST_PAYLOAD)
printhex(payload2)
if TEST_PAYLOAD != payload:
print("FAILED")
else:
print("PASSED")
def test_payload_encrypted():
init(242)
printhex(TEST_PAYLOAD)
spec = decode(TEST_PAYLOAD, decrypt=False)
pprint.pprint(spec)
payload = encode(spec, encrypt=True)
printhex(payload)
spec2 = decode(payload, decrypt=True)
pprint.pprint(spec2)
payload2 = encode(spec2, encrypt=False)
printhex(TEST_PAYLOAD)
printhex(payload2)
if TEST_PAYLOAD != payload:
print("FAILED")
else:
print("PASSED")
def test_value_encoder():
pass
# test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1
# UINT
# UINT_BP4
# UINT_BP8
# UINT_BP12
# UINT_BP16
# UINT_BP20
# UINT_BP24
# SINT
# SINT(2)
vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768]
for v in vin:
vout = Value.encode(v, Value.SINT)
print("encode " + str(v) + " " + str(vout))
# SINT_BP8
# SINT_BP16
# SINT_BP24
# CHAR
# FLOAT
def test_value_decoder():
pass
# test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1
# UINT
# UINT_BP4
# UINT_BP8
# UINT_BP12
# UINT_BP16
# UINT_BP20
# UINT_BP24
# SINT
vin = [255, 253]
print("input value:" + str(vin))
vout = Value.decode(vin, Value.SINT, 2)
print("encoded as:" + str(vout))
# SINT_BP8
# SINT_BP16
# SINT_BP24
# CHAR
# FLOAT
#----- UNIT TEST FOR MESSAGE --------------------------------------------------
import Devices
class TestMessage(unittest.TestCase):
def test_blank(self):
# create a blank message
msg = Message()
msg.dump()
def XXXtest_blank_create_header(self):
# create a blank message and add some header fields at creation time
msg = Message(header_mfrid=123, header_productid=456, header_sensorid=789)
msg.dump()
def XXXtest_add_header_dict(self):
# add header fields to a message after creation like a pydict
msg = Message()
msg["header"]["mfrid"] = 123
msg.dump()
def XXXtest_add_rec_dict(self):
# add rec fields to a message after creation like a pydict
msg = Message()
msg["recs"][0] = {"paramid": PARAM_SWITCH_STATE, "value": 1}
msg.dump()
def XXXtest_add_header_path(self):
# add header fields to a message after creation via pathed keys
msg = Message()
msg.add(header_mfrid=123, header_productid=456)
msg.dump()
def XXXtest_add_rec_path(self):
# add rec fields to a message after creation via pathed indexed keys
msg = Message()
msg.add(recs_0_paramid=PARAM_SWITCH_STATE, recs_0_value=1)
msg.dump()
def XXXtest_add_rec_fn(self):
# add rec fields to a message after creation via pathed PARAM name keys
msg = Message()
msg.add_rec(PARAM_SWITCH_STATE, value=1)
msg.dump()
def XXXtest_create_template(self):
# create a message from a template
msg = Message(Devices.MIHO005_REPORT)
msg.dump()
def XXXtest_alter_rec_template(self):
# alter rec fields in a template
msg = Message(Devices.MIHO005_REPORT)
msg.alter(header_productid=123)
msg.alter(recs_SWITCH_STATE_value=1)
msg.dump()
def XXXtest_dump(self):
## dump a message in printable format
msg = Message(Devices.MIHO005_REPORT)
print(msg)
print(str(msg))
def XXXtest_pydict_read(self):
## access a specific keyed entry like a normal pydict, for read
msg = Message(Devices.MIHO005_REPORT)
print(msg["header"])
print(msg["header"]["mfrid"])
def XXXtest_pydict_write(self):
## access a specific keyed entry like a normal pydict, for write
msg = Message(Devices.MIHO005_REPORT)
msg["header"]["mfrid"] = 222
msg.dump()
def XXXtest_paramid_read_struct(self):
# access a paramid entry for read of the whole structure
msg = Message(Devices.MIHO005_REPORT)
print(msg[PARAM_SWITCH_STATE])
def XXXtest_paramid_read_field(self):
## read a value from a param id field that exists
msg = Message(Devices.MIHO005_REPORT)
print(msg[PARAM_SWITCH_STATE]["value"])
def XXXtest_paramid_write(self):
## write a value to a param id field that exists
msg = Message(Devices.MIHO005_REPORT)
msg[PARAM_SWITCH_STATE]["value"] = 1
msg.dump()
def test_message():
import unittest
unittest.main()
if __name__ == "__main__":
##TODO: Change these into unittest test cases
##test_value_encoder()
##test_value_decoder()
##test_payload_unencrypted()
##test_payload_encrypted()
test_message()
# END