# OpenHEMS.py 27/09/2015 D.J.Whale # # Implement OpenHEMS message encoding and decoding import crypto class OpenHEMSException(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) # report has bit 7 clear # command has bit 7 set PARAM_ALARM = 0x21 PARAM_DEBUG_OUTPUT = 0x2D PARAM_IDENTIFY = 0x3F PARAM_SOURCE_SELECTOR = 0x40 # command only PARAM_WATER_DETECTOR = 0x41 PARAM_GLASS_BREAKAGE = 0x42 PARAM_CLOSURES = 0x43 PARAM_DOOR_BELL = 0x44 PARAM_ENERGY = 0x45 PARAM_FALL_SENSOR = 0x46 PARAM_GAS_VOLUME = 0x47 PARAM_AIR_PRESSURE = 0x48 PARAM_ILLUMINANCE = 0x49 PARAM_LEVEL = 0x4C PARAM_RAINFALL = 0x4D PARAM_APPARENT_POWER = 0x50 PARAM_POWER_FACTOR = 0x51 PARAM_REPORT_PERIOD = 0x52 PARAM_SMOKE_DETECTOR = 0x53 PARAM_TIME_AND_DATE = 0x54 PARAM_VIBRATION = 0x56 PARAM_WATER_VOLUME = 0x57 PARAM_WIND_SPEED = 0x58 PARAM_GAS_PRESSURE = 0x61 PARAM_BATTERY_LEVEL = 0x62 PARAM_CO_DETECTOR = 0x63 PARAM_DOOR_SENSOR = 0x64 PARAM_EMERGENCY = 0x65 PARAM_FREQUENCY = 0x66 PARAM_GAS_FLOW_RATE = 0x67 PARAM_CURRENT = 0x69 PARAM_JOIN = 0x6A PARAM_LIGHT_LEVEL = 0x6C PARAM_MOTION_DETECTOR = 0x6D PARAM_OCCUPANCY = 0x6F PARAM_REAL_POWER = 0x70 PARAM_REACTIVE_POWER = 0x71 PARAM_ROTATION_SPEED = 0x72 PARAM_SWITCH_STATE = 0x73 PARAM_TEMPERATURE = 0x74 PARAM_VOLTAGE = 0x76 PARAM_WATER_FLOW_RATE = 0x77 PARAM_WATER_PRESSURE = 0x78 PARAM_TEST = 0xAA param_names = { PARAM_ALARM : "ALARM", PARAM_DEBUG_OUTPUT : "DEBUG_OUTPUT", PARAM_IDENTIFY : "IDENTIFY", PARAM_SOURCE_SELECTOR : "SOURCE_SELECTOR", PARAM_WATER_DETECTOR : "WATER_DETECTOR", PARAM_GLASS_BREAKAGE : "GLASS_BREAKAGE", PARAM_CLOSURES : "CLOSURES", PARAM_DOOR_BELL : "DOOR_BELL", PARAM_ENERGY : "ENERGY", PARAM_FALL_SENSOR : "FALL_SENSOR", PARAM_GAS_VOLUME : "GAS_VOLUME", PARAM_AIR_PRESSURE : "AIR_PRESSURE", PARAM_ILLUMINANCE : "ILLUMINANCE", PARAM_LEVEL : "LEVEL", PARAM_RAINFALL : "RAINFALL", PARAM_APPARENT_POWER : "APPARENT_POWER", PARAM_POWER_FACTOR : "POWER_FACTOR", PARAM_REPORT_PERIOD : "REPORT_PERIOD", PARAM_SMOKE_DETECTOR : "SMOKE_DETECTOR", PARAM_TIME_AND_DATE : "TIME_AND_DATE", PARAM_VIBRATION : "VIBRATION", PARAM_WATER_VOLUME : "WATER_VOLUME", PARAM_WIND_SPEED : "WIND_SPEED", PARAM_GAS_PRESSURE : "GAS_PRESSURE", PARAM_BATTERY_LEVEL : "BATTERY_LEVEL", PARAM_CO_DETECTOR : "CO_DETECTOR", PARAM_DOOR_SENSOR : "DOOR_SENSOR", PARAM_EMERGENCY : "EMERGENCY", PARAM_FREQUENCY : "FREQUENCY", PARAM_GAS_FLOW_RATE : "GAS_FLOW_RATE", PARAM_CURRENT : "CURRENT", PARAM_JOIN : "JOIN", PARAM_LIGHT_LEVEL : "LIGHT_LEVEL", PARAM_MOTION_DETECTOR : "MOTION_DETECTOR", PARAM_OCCUPANCY : "OCCUPANCY", PARAM_REAL_POWER : "REAL_POWER", PARAM_REACTIVE_POWER : "REACTIVE_POWER", PARAM_ROTATION_SPEED : "ROTATION_SPEED", PARAM_SWITCH_STATE : "SWITCH_STATE", PARAM_TEMPERATURE : "TEMPERATURE", PARAM_VOLTAGE : "VOLTAGE", PARAM_WATER_FLOW_RATE : "WATER_FLOW_RATE", PARAM_WATER_PRESSURE : "WATER_PRESSURE" } crypt_pid = None def init(pid): global crypt_pid crypt_pid = pid def warning(msg): print("warning:" + str(msg)) #----- MESSAGE DECODER -------------------------------------------------------- #TODO if silly lengths or silly types seen in decode, this might imply #we're trying to process an encrypted packet without decrypting it. #the code should be more robust to this (by checking the CRC) def decode(payload, decrypt=True): """Decode a raw buffer into an OpenHEMS pydict""" #Note, decrypt must already have run on this for it to work length = payload[0] # CHECK LENGTH if length+1 != len(payload) or length < 10: raise OpenHEMSException("bad payload length") #return { # "type": "BADLEN", # "len_actual": len(payload), # "len_expected": length, # "payload": payload[1:] #} # DECODE HEADER mfrId = payload[1] productId = payload[2] encryptPIP = (payload[3]<<8) + payload[4] header = { "mfrid" : mfrId, "productid" : productId, "encryptPIP": encryptPIP } if decrypt: # DECRYPT PAYLOAD # [0]len,mfrid,productid,pipH,pipL,[5] crypto.init(crypt_pid, encryptPIP) crypto.cryptPayload(payload, 5, len(payload)-5) # including CRC # sensorId is in encrypted region sensorId = (payload[5]<<16) + (payload[6]<<8) + payload[7] header["sensorid"] = sensorId # CHECK CRC crc_actual = (payload[-2]<<8) + payload[-1] crc_expected = calcCRC(payload, 5, len(payload)-(5+2)) #print("crc actual:%s, expected:%s" %(hex(crc_actual), hex(crc_expected))) if crc_actual != crc_expected: raise OpenHEMSException("bad CRC") #return { # "type": "BADCRC", # "crc_actual": crc_actual, # "crc_expected": crc_expected, # "payload": payload[1:], #} # DECODE RECORDS i = 8 recs = [] while i < length and payload[i] != 0: # PARAM param = payload[i] wr = ((param & 0x80) == 0x80) paramid = param & 0x7F if param_names.has_key(paramid): paramname = param_names[paramid] else: paramname = "UNKNOWN_" + hex(paramid) i += 1 # TYPE/LEN typeid = (payload[i] & 0xF0)>>4 plen = payload[i] & 0x0F i += 1 if plen == 0: continue # no more data in this record # VALUE valuebytes = [] for x in range(plen): valuebytes.append(payload[i]) i += 1 value = "TODO" # TODO decode based on type and length # store rec recs.append({ "wr": wr, "paramid": paramid, "paramname": paramname, "typeid": typeid, "length": plen, "valuebytes": valuebytes, "value": value }) return { "type": "OK", "header": header, "recs": recs } #----- MESSAGE ENCODER -------------------------------------------------------- # # Encodes a message using the OpenHEMS message payload structure # R1 message product id 0x02 monitor and control (in switching program?) # C1 message product id 0x01 monitor only (in listening program) def encode(spec, encrypt=True): """Encode a pydict specification into a OpenHEMS binary payload""" # The message is not encrypted, but the CRC is generated here. payload = [] # HEADER payload.append(0) # length, fixup later when known header = spec["header"] payload.append(header["mfrid"]) payload.append(header["productid"]) if encrypt: if not header.has_key("encryptPIP"): warning("no encryptPIP in header, assuming 0x0100") encryptPIP = 0x0100 else: encryptPIP = header["encryptPIP"] payload.append((encryptPIP&0xFF00)>>8) # MSB payload.append((encryptPIP&0xFF)) # LSB else: payload.append(0) payload.append(0) sensorId = header["sensorid"] payload.append((sensorId>>16) & 0xFF) # HIGH payload.append((sensorId>>8) & 0xFF) # MID payload.append((sensorId) & 0XFF) # LOW # RECORDS for rec in spec["recs"]: wr = rec["wr"] paramid = rec["paramid"] typeid = rec["typeid"] length = rec["length"] value = rec["value"] # PARAMID if wr: payload.append(0x80 + paramid) # WRITE else: payload.append(paramid) # READ # TYPE/LENGTH payload.append((typeid<<4) | length) # VALUE for i in range(length): payload.append(0) # TODO encoding depends on typeid and length # FOOTER payload.append(0) # NUL crc = calcCRC(payload, 5, len(payload)-5) payload.append((crc>>8) & 0xFF) # MSB payload.append(crc&0xFF) # LSB # back-patch the length byte so it is correct payload[0] = len(payload)-1 if encrypt: # ENCRYPT # [0]len,mfrid,productid,pipH,pipL,[5] crypto.init(crypt_pid, encryptPIP) crypto.cryptPayload(payload, 5, len(payload)-5) # including CRC return payload #----- CRC CALCULATION -------------------------------------------------------- #int16_t crc(uint8_t const mes[], unsigned char siz) #{ # uint16_t rem = 0; # unsigned char byte, bit; # # for (byte = 0; byte < siz; ++byte) # { # rem ^= (mes[byte] << 8); # for (bit = 8; bit > 0; --bit) # { # rem = ((rem & (1 << 15)) ? ((rem << 1) ^ 0x1021) : (rem << 1)); # } # } # return rem; #} def calcCRC(payload, start, length): rem = 0 for b in payload[start:start+length]: rem ^= (b<<8) for bit in range(8): if rem & (1<<15) != 0: # bit is set rem = ((rem<<1) ^ 0x1021) & 0xFFFF # always maintain U16 else: # bit is clear rem = (rem<<1) & 0xFFFF # always maintain U16 return rem #----- TEST HARNESS ----------------------------------------------------------- def printhex(payload): line = "" for b in payload: line += hex(b) + " " print line if __name__ == "__main__": TEST_PAYLOAD = [ 0x1C, #len 16 + 10 + 2 = 0001 1100 0x04, #mfrid 0x02, #prodid 0x01, #pipmsb 0x00, #piplsb 0x00, 0x06, 0x8B, #sensorid 0x70, 0x82, 0x00, 0x07, #power 0x71, 0x82, 0xFF, 0xFD, #reactive_power 0x76, 0x01, 0xF0, #voltage 0x66, 0x22, 0x31, 0xDA, #freq 0x73, 0x01, 0x01, #switch_state 0x00, #NUL 0x97, 0x64 #CRC ] import pprint init(242) printhex(TEST_PAYLOAD) spec = decode(TEST_PAYLOAD, decrypt=False) pprint.pprint(spec) payload = encode(spec, encrypt=True) printhex(payload) spec2 = decode(payload, decrypt=True) pprint.pprint(spec2) # END