Implemented a standalone switch exerciser demo.
Note this only really works with Adaptor Plus
(unless you seed the directory with your device id's first)
1 parent 8f6cc1c commit 5da75456b1ced4756633eb6d60249d996e9be3ab
@David Whale David Whale authored on 17 Mar 2016
Showing 4 changed files
View
6
src/energenie/Devices.py
 
return "Manufacturer:%s Product:%s" % (mfr, product)
 
 
def hasSwitch(mfrid, productid):
if mfrid != MFRID: return False
if productid == PRODUCTID_R1_MONITOR_AND_CONTROL: return True
return False
 
 
# END
View
16
src/legacy.py
# legacy.py 17/03/2016 D.J.Whale
#
# Placeholder for a legacy plug (OOK) test harness
#
# This is a temporary piece of code, that will be thrown away once the device object interface
# is written.
#
# Please don't use this as the basis for an application, it is only designed to be used
# to test the lower level code.
 
#TODO
 
 
# END
 
View
18
src/monitor.py
# monitor.py 27/09/2015 D.J.Whale
#
# Monitor Energine MiHome plugs
 
# Note, this is *only* a test program, to exercise the lower level code.
# Don't expect this to be a good starting point for an application.
# Consider waiting for me to finish developing the device object interface first.
#
# However, it will log all messages from MiHome monitor, adaptor plus and house monitor
# to a CSV log file, so could be the basis for a non-controlling energy logging app.
 
import time
 
from energenie import OpenHEMS, Devices
LOG_FILENAME = "energenie.csv"
 
def warning(msg):
print("warning:%s" % str(msg))
 
def trace(msg):
print("monitor:%s" % str(msg))
 
log_file = None
radio.transmit(p)
radio.receiver()
 
 
def monitor():
def monitor_loop():
"""Capture any incoming messages and log to CSV file"""
 
radio.receiver()
 
warning("Can't decode payload:" + str(e))
continue
OpenHEMS.showMessage(decoded)
# Any device that reports will be added to the non-persistent directory
updateDirectory(decoded)
#trace(decoded)
logMessage(decoded)
 
#trace(decoded)
# Process any JOIN messages by sending back a JOIN-ACK to turn the LED off
if len(decoded["recs"]) == 0:
# handle messages with zero recs in them silently
print("Empty record:%s" % decoded)
else:
 
 
if __name__ == "__main__":
trace("starting monitor")
trace("starting monitor tester")
radio.init()
OpenHEMS.init(Devices.CRYPT_PID)
 
try:
monitor()
monitor_loop()
 
finally:
radio.finished()
 
View
205
src/switch.py
# monitor.py 27/09/2015 D.J.Whale
# switch.py 17/03/2016 D.J.Whale
#
# Monitor settings of Energine MiHome plugs
# Control Energenie switches.
# Note, at the moment, this only works with MiHome Adaptor Plus devices
# because the 'sensorid' is discovered via the monitor message.
# You could probably fake it by preloading the directory with your sensorid
# if you know what it is.
 
# Note, this is *only* a test program, to exercise the lower level code.
# Don't expect this to be a good starting point for an application.
# Consider waiting for me to finish developing the device object interface first.
 
import time
 
from energenie import OpenHEMS, Devices
from energenie import radio
from Timer import Timer
import os
 
LOG_FILENAME = "energenie.csv"
# Increase this if you have lots of switches, so that the receiver has enough
# time to receive update messages, otherwise your devices won't make it into
# the device directory.
TX_RATE = 10 # seconds between each switch change cycle
 
def warning(msg):
print("warning:%s" % str(msg))
 
def trace(msg):
print("monitor:%s" % str(msg))
 
log_file = None
 
def logMessage (msg):
HEADINGS = 'timestamp,mfrid,prodid,sensorid,flags,switch,voltage,freq,reactive,real'
 
global log_file
if log_file == None:
if not os.path.isfile(LOG_FILENAME):
log_file = open(LOG_FILENAME, 'w')
log_file.write(HEADINGS + '\n')
else:
log_file = open(LOG_FILENAME, 'a') # append
 
# get the header
header = msg['header']
timestamp = time.time()
mfrid = header['mfrid']
productid = header['productid']
sensorid = header['sensorid']
 
# set defaults for any data that doesn't appear in this message
# but build flags so we know which ones this contains
flags = [0 for i in range(7)]
switch = None
voltage = None
freq = None
reactive = None
real = None
apparent = None
current = None
 
# capture any data that we want
#print(msg)
for rec in msg['recs']:
paramid = rec['paramid']
try:
value = rec['value']
except:
value = None
if paramid == OpenHEMS.PARAM_SWITCH_STATE:
switch = value
flags[0] = 1
elif paramid == OpenHEMS.PARAM_VOLTAGE:
flags[1] = 1
voltage = value
elif paramid == OpenHEMS.PARAM_FREQUENCY:
flags[2] = 1
freq = value
elif paramid == OpenHEMS.PARAM_REACTIVE_POWER:
flags[3] = 1
reactive = value
elif paramid == OpenHEMS.PARAM_REAL_POWER:
flags[4] = 1
real = value
elif paramid == OpenHEMS.PARAM_APPARENT_POWER:
flags[5] = 1
apparent = value
elif paramid == OpenHEMS.PARAM_CURRENT:
flags[6] = 1
current = value
 
# generate a line of CSV
flags = "".join([str(a) for a in flags])
csv = "%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (timestamp, mfrid, productid, sensorid, flags, switch, voltage, freq, reactive, real, apparent, current)
log_file.write(csv + '\n')
log_file.flush()
trace(csv) # testing
 
 
#----- TEST APPLICATION -------------------------------------------------------
 
]
}
 
 
def send_join_ack(mfrid, productid, sensorid):
# send back a JOIN ACK, so that join light stops flashing
response = OpenHEMS.alterMessage(JOIN_ACK_MESSAGE,
header_mfrid=mfrid,
header_productid=productid,
header_sensorid=sensorid)
p = OpenHEMS.encode(response)
radio.transmitter()
radio.transmit(p)
radio.receiver()
 
def monitor():
"""Send discovery and monitor messages, and capture any responses"""
 
def switch_loop():
"""Listen to sensor messages, and turn switches on and off every few seconds"""
 
# Define the schedule of message polling
sendSwitchTimer = Timer(5, 1) # every n seconds offset by initial 1
sendSwitchTimer = Timer(TX_RATE, 1) # every n seconds offset by initial 1
switch_state = 0 # OFF
radio.receiver()
decoded = None
 
while True:
# See if there is a payload, and if there is, process it
if radio.isReceiveWaiting():
warning("Can't decode payload:" + str(e))
continue
OpenHEMS.showMessage(decoded)
# Any device that reports will be added to the non-persistent directory
updateDirectory(decoded)
logMessage(decoded)
#TODO: Should remember report time of each device,
#and reschedule command messages to avoid their transmit slot
#making it less likely to miss an incoming message due to
#the radio being in transmit mode
#trace(decoded)
 
# handle messages with zero recs in them silently
#trace(decoded)
# Process any JOIN messages by sending back a JOIN-ACK to turn the LED off
if len(decoded["recs"]) == 0:
# handle messages with zero recs in them silently
print("Empty record:%s" % decoded)
else:
# assume only 1 rec in a join, for now
#TODO: write OpenHEMS.getFromMessage("header_mfrid")
if decoded["recs"][0]["paramid"] == OpenHEMS.PARAM_JOIN:
#TODO: write OpenHEMS.getFromMessage("header_mfrid")
# send back a JOIN ACK, so that join light stops flashing
response = OpenHEMS.alterMessage(JOIN_ACK_MESSAGE,
header_mfrid=decoded["header"]["mfrid"],
header_productid=decoded["header"]["productid"],
header_sensorid=decoded["header"]["sensorid"])
p = OpenHEMS.encode(response)
radio.transmitter()
radio.transmit(p)
radio.receiver()
header = decoded["header"]
mfrid = header["mfrid"]
productid = header["productid"]
sensorid = header["sensorid"]
send_join_ack(mfrid, productid, sensorid)
 
if sendSwitchTimer.check() and decoded != None:
request = OpenHEMS.alterMessage(SWITCH_MESSAGE,
header_sensorid=decoded["header"]["sensorid"],
recs_0_value=switch_state)
p = OpenHEMS.encode(request)
 
# Toggle the switch on all devices in the directory
if len(directory) > 0 and sendSwitchTimer.check():
print("transmit")
radio.transmitter()
radio.transmit(p)
 
for sensorid in directory.keys():
# Only try to toggle the switch for devices that actually have a switch
header = directory[sensorid]["header"]
mfrid = header["mfrid"]
productid = header["productid"]
 
if Devices.hasSwitch(mfrid, productid):
request = OpenHEMS.alterMessage(SWITCH_MESSAGE,
header_sensorid=sensorid,
recs_0_value=switch_state)
p = OpenHEMS.encode(request)
print("Sending switch message to %s %s" % (hex(productid), hex(sensorid)))
# Transmit multiple times, hope one of them gets through
for i in range(4):
radio.transmit(p)
 
radio.receiver()
print("receive")
switch_state = (switch_state+1) % 2 # toggle
 
if __name__ == "__main__":
trace("starting monitor")
trace("starting switch tester")
radio.init()
OpenHEMS.init(Devices.CRYPT_PID)
 
try:
monitor()
switch_loop()
 
finally:
radio.finished()