Newer
Older
pyenergenie / src / energenie / OpenThings_test.py
# OpenThings_test.py  21/05/2016  D.J.Whale
#
# Test harness for OpenThings protocol encoder and decoder

#TODO: Turn this into unittest.TestCase


from OpenThings import *
import pprint
import unittest


def printhex(payload):
	line = ""
	for b in payload:
		line += hex(b) + " "

	print(line)


TEST_PAYLOAD = [
	0x1C, 						#len   16 + 10 + 2  = 0001 1100
	0x04, 						#mfrid
	0x02, 						#prodid
	0x01, 						#pipmsb
	0x00, 						#piplsb
	0x00, 0x06, 0x8B,        	#sensorid
	0x70, 0x82, 0x00, 0x07, 	#SINT(2)     power
	0x71, 0x82, 0xFF, 0xFD,     #SINT(2)     reactive_power
	0x76, 0x01, 0xF0,    		#UINT(1)     voltage
	0x66, 0x22, 0x31, 0xDA,		#UINT_BP8(2) freq
	0x73, 0x01, 0x01,			#UINT(1)     switch_state
	0x00,						#NUL
	0x97, 0x64					#CRC

]



def test_payload_unencrypted():
	init(242)

	printhex(TEST_PAYLOAD)
	spec = decode(TEST_PAYLOAD, decrypt=False)
	pprint.pprint(spec)

	payload = encode(spec, encrypt=False)
	printhex(payload)

	spec2 = decode(payload, decrypt=False)
	pprint.pprint(spec2)

	payload2 = encode(spec2, encrypt=False)

	printhex(TEST_PAYLOAD)
	printhex(payload2)

	if TEST_PAYLOAD != payload:
		print("FAILED")
	else:
		print("PASSED")


def test_payload_encrypted():
	init(242)

	printhex(TEST_PAYLOAD)
	spec = decode(TEST_PAYLOAD, decrypt=False)
	pprint.pprint(spec)

	payload = encode(spec, encrypt=True)
	printhex(payload)

	spec2 = decode(payload, decrypt=True)
	pprint.pprint(spec2)

	payload2 = encode(spec2, encrypt=False)

	printhex(TEST_PAYLOAD)
	printhex(payload2)

	if TEST_PAYLOAD != payload:
		print("FAILED")
	else:
		print("PASSED")


def test_value_encoder():
	pass
	# test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1
	# UINT
	# UINT_BP4
	# UINT_BP8
	# UINT_BP12
	# UINT_BP16
	# UINT_BP20
	# UINT_BP24
	# SINT
	# SINT(2)
	vin = [1,255,256,32767,32768,0,-1,-2,-3,-127,-128,-129,-32767,-32768]
	for v in vin:
		vout = Value.encode(v, Value.SINT)
		print("encode " + str(v) + " " + str(vout))
	# SINT_BP8
	# SINT_BP16
	# SINT_BP24
	# CHAR
	# FLOAT


def test_value_decoder():
	pass
	# test cases (auto, forced, overflow, -min, -min-1, 0, 1, +max, +max+1
	# UINT
	# UINT_BP4
	# UINT_BP8
	# UINT_BP12
	# UINT_BP16
	# UINT_BP20
	# UINT_BP24
	# SINT
	vin = [255, 253]
	print("input value:" + str(vin))
	vout = Value.decode(vin, Value.SINT, 2)
	print("encoded as:" + str(vout))

	# SINT_BP8
	# SINT_BP16
	# SINT_BP24
	# CHAR
	# FLOAT


#----- UNIT TEST FOR MESSAGE --------------------------------------------------

import Devices

class TestMessage(unittest.TestCase):
	def test_blank(self):
		# create a blank message
		msg = Message()
		msg.dump()

	def XXXtest_blank_create_header(self):
		# create a blank message and add some header fields at creation time
		msg = Message(header_mfrid=123, header_productid=456, header_sensorid=789)
		msg.dump()

	def XXXtest_add_header_dict(self):
		# add header fields to a message after creation like a pydict
		msg = Message()
		msg["header"]["mfrid"] = 123
		msg.dump()

	def XXXtest_add_rec_dict(self):
		# add rec fields to a message after creation like a pydict
		msg = Message()
		msg["recs"][0] = {"paramid": PARAM_SWITCH_STATE, "value": 1}
		msg.dump()

	def XXXtest_add_header_path(self):
		# add header fields to a message after creation via pathed keys
		msg = Message()
		msg.add(header_mfrid=123, header_productid=456)
		msg.dump()

	def XXXtest_add_rec_path(self):
		# add rec fields to a message after creation via pathed indexed keys
		msg = Message()
		msg.add(recs_0_paramid=PARAM_SWITCH_STATE, recs_0_value=1)
		msg.dump()

	def XXXtest_add_rec_fn(self):
		# add rec fields to a message after creation via pathed PARAM name keys
		msg = Message()
		msg.add_rec(PARAM_SWITCH_STATE, value=1)
		msg.dump()

	def XXXtest_create_template(self):
		# create a message from a template
		msg = Message(Devices.MIHO005_REPORT)
		msg.dump()

	def XXXtest_alter_rec_template(self):
		# alter rec fields in a template
		msg = Message(Devices.MIHO005_REPORT)
		msg.alter(header_productid=123)
		msg.alter(recs_SWITCH_STATE_value=1)
		msg.dump()

	def XXXtest_dump(self):
		## dump a message in printable format
		msg = Message(Devices.MIHO005_REPORT)
		print(msg)
		print(str(msg))

	def XXXtest_pydict_read(self):
		## access a specific keyed entry like a normal pydict, for read
		msg = Message(Devices.MIHO005_REPORT)
		print(msg["header"])
		print(msg["header"]["mfrid"])

	def XXXtest_pydict_write(self):
		## access a specific keyed entry like a normal pydict, for write
		msg = Message(Devices.MIHO005_REPORT)
		msg["header"]["mfrid"] = 222
		msg.dump()

	def XXXtest_paramid_read_struct(self):
		# access a paramid entry for read of the whole structure
		msg = Message(Devices.MIHO005_REPORT)
		print(msg[PARAM_SWITCH_STATE])

	def XXXtest_paramid_read_field(self):
		## read a value from a param id field that exists
		msg = Message(Devices.MIHO005_REPORT)
		print(msg[PARAM_SWITCH_STATE]["value"])

	def XXXtest_paramid_write(self):
		## write a value to a param id field that exists
		msg = Message(Devices.MIHO005_REPORT)
		msg[PARAM_SWITCH_STATE]["value"] = 1
		msg.dump()


def test_message():
	import unittest
	unittest.main()

if __name__ == "__main__":
	##TODO: Change these into unittest test cases
	##test_value_encoder()
	##test_value_decoder()
	##test_payload_unencrypted()
	##test_payload_encrypted()

	test_message()

# END