# switch.py 17/03/2016 D.J.Whale
#
# Control Energenie switches.
# Note, at the moment, this only works with MiHome Adaptor Plus devices
# because the 'sensorid' is discovered via the monitor message.
# You could probably fake it by preloading the directory with your sensorid
# if you know what it is.
# Note, this is *only* a test program, to exercise the lower level code.
# Don't expect this to be a good starting point for an application.
# Consider waiting for me to finish developing the device object interface first.
import time
from energenie import OpenHEMS, Devices
from energenie import radio
from Timer import Timer
import os
# Increase this if you have lots of switches, so that the receiver has enough
# time to receive update messages, otherwise your devices won't make it into
# the device directory.
TX_RATE = 10 # seconds between each switch change cycle
def warning(msg):
print("warning:%s" % str(msg))
def trace(msg):
print("monitor:%s" % str(msg))
#----- TEST APPLICATION -------------------------------------------------------
directory = {}
def allkeys(d):
result = ""
for k in d:
if len(result) != 0:
result += ','
result += str(k)
return result
def updateDirectory(message):
"""Update the local directory with information about this device"""
now = time.time()
header = message["header"]
sensorId = header["sensorid"]
if not directory.has_key(sensorId):
# new device discovered
desc = Devices.getDescription(header["mfrid"], header["productid"])
print("ADD device:%s %s" % (hex(sensorId), desc))
directory[sensorId] = {"header": message["header"]}
#trace(allkeys(directory))
directory[sensorId]["time"] = now
#TODO would be good to keep recs, but need to iterate through all and key by paramid,
#not as a list index, else merging will be hard.
SWITCH_MESSAGE = {
"header": {
"mfrid": Devices.MFRID,
"productid": Devices.PRODUCTID_R1_MONITOR_AND_CONTROL,
"encryptPIP": Devices.CRYPT_PIP,
"sensorid": 0 # FILL IN
},
"recs": [
{
"wr": True,
"paramid": OpenHEMS.PARAM_SWITCH_STATE,
"typeid": OpenHEMS.Value.UINT,
"length": 1,
"value": 0 # FILL IN
}
]
}
JOIN_ACK_MESSAGE = {
"header": {
"mfrid": 0, # FILL IN
"productid": 0, # FILL IN
"encryptPIP": Devices.CRYPT_PIP,
"sensorid": 0 # FILL IN
},
"recs": [
{
"wr": False,
"paramid": OpenHEMS.PARAM_JOIN,
"typeid": OpenHEMS.Value.UINT,
"length": 0
}
]
}
def send_join_ack(mfrid, productid, sensorid):
# send back a JOIN ACK, so that join light stops flashing
response = OpenHEMS.alterMessage(JOIN_ACK_MESSAGE,
header_mfrid=mfrid,
header_productid=productid,
header_sensorid=sensorid)
p = OpenHEMS.encode(response)
radio.transmitter()
radio.transmit(p)
radio.receiver()
def switch_loop():
"""Listen to sensor messages, and turn switches on and off every few seconds"""
# Define the schedule of message polling
sendSwitchTimer = Timer(TX_RATE, 1) # every n seconds offset by initial 1
switch_state = 0 # OFF
radio.receiver()
while True:
# See if there is a payload, and if there is, process it
if radio.isReceiveWaiting():
#trace("receiving payload")
payload = radio.receive()
try:
decoded = OpenHEMS.decode(payload)
except OpenHEMS.OpenHEMSException as e:
warning("Can't decode payload:" + str(e))
continue
OpenHEMS.showMessage(decoded)
# Any device that reports will be added to the non-persistent directory
updateDirectory(decoded)
#trace(decoded)
# Process any JOIN messages by sending back a JOIN-ACK to turn the LED off
if len(decoded["recs"]) == 0:
# handle messages with zero recs in them silently
print("Empty record:%s" % decoded)
else:
# assume only 1 rec in a join, for now
#TODO: write OpenHEMS.getFromMessage("header_mfrid")
if decoded["recs"][0]["paramid"] == OpenHEMS.PARAM_JOIN:
header = decoded["header"]
mfrid = header["mfrid"]
productid = header["productid"]
sensorid = header["sensorid"]
send_join_ack(mfrid, productid, sensorid)
# Toggle the switch on all devices in the directory
if len(directory) > 0 and sendSwitchTimer.check():
print("transmit")
radio.transmitter()
for sensorid in directory.keys():
# Only try to toggle the switch for devices that actually have a switch
header = directory[sensorid]["header"]
mfrid = header["mfrid"]
productid = header["productid"]
if Devices.hasSwitch(mfrid, productid):
request = OpenHEMS.alterMessage(SWITCH_MESSAGE,
header_sensorid=sensorid,
recs_0_value=switch_state)
p = OpenHEMS.encode(request)
print("Sending switch message to %s %s" % (hex(productid), hex(sensorid)))
# Transmit multiple times, hope one of them gets through
for i in range(4):
radio.transmit(p)
radio.receiver()
print("receive")
switch_state = (switch_state+1) % 2 # toggle
if __name__ == "__main__":
trace("starting switch tester")
radio.init()
OpenHEMS.init(Devices.CRYPT_PID)
try:
switch_loop()
finally:
radio.finished()
# END