Newer
Older
pyenergenie / src / energenie / OpenHEMS.py
# OpenHEMS.py  27/09/2015  D.J.Whale
#
# Implement OpenHEMS message encoding and decoding

import crypto

class OpenHEMSException(Exception):
	def __init__(self, value):
		self.value = value

	def __str__(self):
		return repr(self.value)


# report has bit 7 clear
# command has bit 7 set

PARAM_ALARM           = 0x21
PARAM_DEBUG_OUTPUT    = 0x2D
PARAM_IDENTIFY        = 0x3F
PARAM_SOURCE_SELECTOR = 0x40 # command only
PARAM_WATER_DETECTOR  = 0x41
PARAM_GLASS_BREAKAGE  = 0x42
PARAM_CLOSURES        = 0x43
PARAM_DOOR_BELL       = 0x44
PARAM_ENERGY          = 0x45
PARAM_FALL_SENSOR     = 0x46
PARAM_GAS_VOLUME      = 0x47
PARAM_AIR_PRESSURE    = 0x48
PARAM_ILLUMINANCE     = 0x49
PARAM_LEVEL           = 0x4C
PARAM_RAINFALL        = 0x4D
PARAM_APPARENT_POWER  = 0x50
PARAM_POWER_FACTOR    = 0x51
PARAM_REPORT_PERIOD   = 0x52
PARAM_SMOKE_DETECTOR  = 0x53
PARAM_TIME_AND_DATE   = 0x54
PARAM_VIBRATION       = 0x56
PARAM_WATER_VOLUME    = 0x57
PARAM_WIND_SPEED      = 0x58
PARAM_GAS_PRESSURE    = 0x61
PARAM_BATTERY_LEVEL   = 0x62
PARAM_CO_DETECTOR     = 0x63
PARAM_DOOR_SENSOR     = 0x64
PARAM_EMERGENCY       = 0x65
PARAM_FREQUENCY       = 0x66
PARAM_GAS_FLOW_RATE   = 0x67
PARAM_CURRENT         = 0x69
PARAM_JOIN            = 0x6A
PARAM_LIGHT_LEVEL     = 0x6C
PARAM_MOTION_DETECTOR = 0x6D
PARAM_OCCUPANCY       = 0x6F
PARAM_REAL_POWER      = 0x70
PARAM_REACTIVE_POWER  = 0x71
PARAM_ROTATION_SPEED  = 0x72
PARAM_SWITCH_STATE    = 0x73
PARAM_TEMPERATURE     = 0x74
PARAM_VOLTAGE         = 0x76
PARAM_WATER_FLOW_RATE = 0x77
PARAM_WATER_PRESSURE  = 0x78

PARAM_TEST            = 0xAA

param_names = {
	PARAM_ALARM           : "ALARM",
	PARAM_DEBUG_OUTPUT    : "DEBUG_OUTPUT",
	PARAM_IDENTIFY        : "IDENTIFY",
	PARAM_SOURCE_SELECTOR : "SOURCE_SELECTOR",
	PARAM_WATER_DETECTOR  : "WATER_DETECTOR",
	PARAM_GLASS_BREAKAGE  : "GLASS_BREAKAGE",
	PARAM_CLOSURES        : "CLOSURES",
	PARAM_DOOR_BELL       : "DOOR_BELL",
	PARAM_ENERGY          : "ENERGY",
	PARAM_FALL_SENSOR     : "FALL_SENSOR",
	PARAM_GAS_VOLUME      : "GAS_VOLUME",
	PARAM_AIR_PRESSURE    : "AIR_PRESSURE",
	PARAM_ILLUMINANCE     : "ILLUMINANCE",
	PARAM_LEVEL           : "LEVEL",
	PARAM_RAINFALL        : "RAINFALL",
	PARAM_APPARENT_POWER  : "APPARENT_POWER",
	PARAM_POWER_FACTOR    : "POWER_FACTOR",
	PARAM_REPORT_PERIOD   : "REPORT_PERIOD",
	PARAM_SMOKE_DETECTOR  : "SMOKE_DETECTOR",
	PARAM_TIME_AND_DATE   : "TIME_AND_DATE",
	PARAM_VIBRATION       : "VIBRATION",
	PARAM_WATER_VOLUME    : "WATER_VOLUME",
	PARAM_WIND_SPEED      : "WIND_SPEED",
	PARAM_GAS_PRESSURE    : "GAS_PRESSURE",
	PARAM_BATTERY_LEVEL   : "BATTERY_LEVEL",
	PARAM_CO_DETECTOR     : "CO_DETECTOR",
	PARAM_DOOR_SENSOR     : "DOOR_SENSOR",
	PARAM_EMERGENCY       : "EMERGENCY",
	PARAM_FREQUENCY       : "FREQUENCY",
	PARAM_GAS_FLOW_RATE   : "GAS_FLOW_RATE",
	PARAM_CURRENT         : "CURRENT",
	PARAM_JOIN            : "JOIN",
	PARAM_LIGHT_LEVEL     : "LIGHT_LEVEL",
	PARAM_MOTION_DETECTOR : "MOTION_DETECTOR",
	PARAM_OCCUPANCY       : "OCCUPANCY",
	PARAM_REAL_POWER      : "REAL_POWER",
	PARAM_REACTIVE_POWER  : "REACTIVE_POWER",
	PARAM_ROTATION_SPEED  : "ROTATION_SPEED",
	PARAM_SWITCH_STATE    : "SWITCH_STATE",
	PARAM_TEMPERATURE     : "TEMPERATURE",
	PARAM_VOLTAGE         : "VOLTAGE",
	PARAM_WATER_FLOW_RATE : "WATER_FLOW_RATE",
	PARAM_WATER_PRESSURE  : "WATER_PRESSURE"
}


crypt_pid = None

def init(pid):
	global crypt_pid
	crypt_pid = pid


def warning(msg):
	print("warning:" + str(msg))


#----- MESSAGE DECODER --------------------------------------------------------

#TODO if silly lengths or silly types seen in decode, this might imply
#we're trying to process an encrypted packet without decrypting it.
#the code should be more robust to this (by checking the CRC)

def decode(payload, decrypt=True):
	"""Decode a raw buffer into an OpenHEMS pydict"""
	#Note, decrypt must already have run on this for it to work
	length = payload[0]

	# CHECK LENGTH
	if length+1 != len(payload) or length < 10:
		raise OpenHEMSException("bad payload length")
		#return {
		#	"type":         "BADLEN",
		#	"len_actual":   len(payload),
		#	"len_expected": length,
		#	"payload":      payload[1:]
		#}

	# DECODE HEADER
	mfrId      = payload[1]
	productId  = payload[2]
	encryptPIP = (payload[3]<<8) + payload[4]
	header = {
		"mfrid"     : mfrId,
		"productid" : productId,
		"encryptPIP": encryptPIP
	}


	if decrypt:
		# DECRYPT PAYLOAD
		# [0]len,mfrid,productid,pipH,pipL,[5]
		crypto.init(crypt_pid, encryptPIP)
		crypto.cryptPayload(payload, 5, len(payload)-5) # including CRC

	# sensorId is in encrypted region
	sensorId = (payload[5]<<16) + (payload[6]<<8) + payload[7]
	header["sensorid"] = sensorId


	# CHECK CRC
	crc_actual  = (payload[-2]<<8) + payload[-1]
	crc_expected = calcCRC(payload, 5, len(payload)-(5+2))
	#print("crc actual:%s, expected:%s" %(hex(crc_actual), hex(crc_expected)))

	if crc_actual != crc_expected:
		raise OpenHEMSException("bad CRC")
		#return {
		#	"type":         "BADCRC",
		#	"crc_actual":   crc_actual,
		#	"crc_expected": crc_expected,
		#	"payload":      payload[1:],
		#}


	# DECODE RECORDS
	i = 8
	recs = []
	while i < length and payload[i] != 0:
		# PARAM
		param = payload[i]
		wr = ((param & 0x80) == 0x80)
		paramid = param & 0x7F
		if param_names.has_key(paramid):
			paramname = param_names[paramid]
		else:
			paramname = "UNKNOWN_" + hex(paramid)
		i += 1

		# TYPE/LEN
		typeid = (payload[i] & 0xF0)>>4
		plen = payload[i] & 0x0F
		i += 1

		if plen == 0: continue # no more data in this record

		# VALUE
		valuebytes = []
		for x in range(plen):
			valuebytes.append(payload[i])
			i += 1
		value = "TODO" # TODO decode based on type and length

		# store rec
		recs.append({
			"wr":         wr,
			"paramid":    paramid,
			"paramname":  paramname,
			"typeid":     typeid,
			"length":     plen,
			"valuebytes": valuebytes,
			"value":      value
		})

	return {
		"type":    "OK",
		"header":  header,
		"recs":    recs
	}



#----- MESSAGE ENCODER --------------------------------------------------------
#
# Encodes a message using the OpenHEMS message payload structure

# R1 message product id 0x02 monitor and control (in switching program?)
# C1 message product id 0x01 monitor only (in listening program)

def encode(spec, encrypt=True):
	"""Encode a pydict specification into a OpenHEMS binary payload"""
	# The message is not encrypted, but the CRC is generated here.

	payload = []

	# HEADER
	payload.append(0) # length, fixup later when known
	header = spec["header"]

	payload.append(header["mfrid"])
	payload.append(header["productid"])

	if encrypt:
		if not header.has_key("encryptPIP"):
			warning("no encryptPIP in header, assuming 0x0100")
			encryptPIP = 0x0100
		else:
			encryptPIP = header["encryptPIP"]
		payload.append((encryptPIP&0xFF00)>>8) # MSB
		payload.append((encryptPIP&0xFF))      # LSB
	else:
		payload.append(0)
		payload.append(0)

	sensorId = header["sensorid"]
	payload.append((sensorId>>16) & 0xFF) # HIGH
	payload.append((sensorId>>8) & 0xFF)  # MID
	payload.append((sensorId) & 0XFF)     # LOW

	# RECORDS
	for rec in spec["recs"]:
		wr      = rec["wr"]
		paramid = rec["paramid"]
		typeid  = rec["typeid"]
		length  = rec["length"]
		value   = rec["value"]

		# PARAMID
		if wr:
			payload.append(0x80 + paramid) # WRITE
		else:
			payload.append(paramid)        # READ

		# TYPE/LENGTH
		payload.append((typeid<<4) | length)

		# VALUE
		for i in range(length):
			payload.append(0) # TODO encoding depends on typeid and length

	# FOOTER
	payload.append(0) # NUL
	crc = calcCRC(payload, 5, len(payload)-5)
	payload.append((crc>>8) & 0xFF) # MSB
	payload.append(crc&0xFF)        # LSB

	# back-patch the length byte so it is correct
	payload[0] = len(payload)-1

	if encrypt:
		# ENCRYPT
		# [0]len,mfrid,productid,pipH,pipL,[5]
		crypto.init(crypt_pid, encryptPIP)
		crypto.cryptPayload(payload, 5, len(payload)-5) # including CRC

	return payload



#----- CRC CALCULATION --------------------------------------------------------

#int16_t crc(uint8_t const mes[], unsigned char siz)
#{
#	uint16_t rem = 0;
#	unsigned char byte, bit;
#
#	for (byte = 0; byte < siz; ++byte)
#	{
#		rem ^= (mes[byte] << 8);
#		for (bit = 8; bit > 0; --bit)
#		{
#			rem = ((rem & (1 << 15)) ? ((rem << 1) ^ 0x1021) : (rem << 1));
#		}
#	}
#	return rem;
#}

def calcCRC(payload, start, length):
	rem = 0
	for b in payload[start:start+length]:
		rem ^= (b<<8)
		for bit in range(8):
			if rem & (1<<15) != 0:
				# bit is set
				rem = ((rem<<1) ^ 0x1021) & 0xFFFF # always maintain U16
			else:
				# bit is clear
				rem = (rem<<1) & 0xFFFF # always maintain U16
	return rem


#----- TEST HARNESS -----------------------------------------------------------

def printhex(payload):
	line = ""
	for b in payload:
		line += hex(b) + " "

	print line


if __name__ == "__main__":
	TEST_PAYLOAD = [
		0x1C, 						#len   16 + 10 + 2  = 0001 1100
		0x04, 						#mfrid
		0x02, 						#prodid
		0x01, 						#pipmsb
		0x00, 						#piplsb
		0x00, 0x06, 0x8B,        	#sensorid
		0x70, 0x82, 0x00, 0x07, 	#power
		0x71, 0x82, 0xFF, 0xFD,     #reactive_power
		0x76, 0x01, 0xF0,    		#voltage
		0x66, 0x22, 0x31, 0xDA,		#freq
		0x73, 0x01, 0x01,			#switch_state
		0x00,						#NUL
		0x97, 0x64					#CRC

	]

	import pprint
	init(242)

	printhex(TEST_PAYLOAD)

	spec = decode(TEST_PAYLOAD, decrypt=False)
	pprint.pprint(spec)

	payload = encode(spec, encrypt=True)
	printhex(payload)

	spec2 = decode(payload, decrypt=True)
	pprint.pprint(spec2)

# END