Newer
Older
pyenergenie / src / switch.py
# switch.py  17/03/2016  D.J.Whale
#
# Control Energenie switches.
# Note, at the moment, this only works with MiHome Adaptor Plus devices
# because the 'sensorid' is discovered via the monitor message.
# You could probably fake it by preloading the directory with your sensorid
# if you know what it is.

# Note, this is *only* a test program, to exercise the lower level code.
# Don't expect this to be a good starting point for an application.
# Consider waiting for me to finish developing the device object interface first.

import time
from energenie import Devices, Messages, Registry, OpenThings, radio
from Timer import Timer


# Increase this if you have lots of switches, so that the receiver has enough
# time to receive update messages, otherwise your devices won't make it into
# the device directory.
TX_RATE = 2 # seconds between each switch change cycle


def warning(msg):
    print("warning:%s" % str(msg))


def trace(msg):
    print("monitor:%s" % str(msg))


#----- TEST APPLICATION -------------------------------------------------------

def switch_sniff_loop():
    """Listen to sensor messages and add them to the Registry"""

    # See if there is a payload, and if there is, process it
    if radio.is_receive_waiting():
        ##trace("receiving payload")
        payload = radio.receive()
        try:
            decoded = OpenThings.decode(payload)
            now = time.time()
        except OpenThings.OpenThingsException as e:
            warning("Can't decode payload:" + str(e))
            return

        OpenThings.showMessage(decoded, timestamp=now)
        # Any device that reports will be added to the non-persistent directory
        Registry.update(decoded)
        ##trace(decoded)

        # Process any JOIN messages by sending back a JOIN-ACK to turn the LED off
        if len(decoded["recs"]) == 0:
            # handle messages with zero recs in them silently
            print("Empty record:%s" % decoded)
        else:
            # assume only 1 rec in a join, for now
            if decoded["recs"][0]["paramid"] == OpenThings.PARAM_JOIN:
                mfrid     = OpenThings.getFromMessage(decoded, "header_mfrid")
                productid = OpenThings.getFromMessage(decoded, "header_productid")
                sensorid  = OpenThings.getFromMessage(decoded, "header_sensorid")
                Messages.send_join_ack(radio, mfrid, productid, sensorid)


def switch_toggle_loop():
    """Toggle the switch on all devices in the directory"""

    global switch_state

    if Registry.size() > 0 and sendSwitchTimer.check():
        print("transmit")
        radio.transmitter()

        for sensorid in Registry.get_sensorids():
            # Only try to toggle the switch for devices that actually have a switch
            header = Registry.get_info(sensorid)["header"]
            mfrid = header["mfrid"]
            productid = header["productid"]

            if Devices.hasSwitch(mfrid, productid):
                request = OpenThings.alterMessage(Messages.SWITCH,
                    header_sensorid=sensorid,
                    recs_0_value=switch_state)
                p = OpenThings.encode(request)
                print("Sending switch message to %s %s" % (hex(productid), hex(sensorid)))
                # Transmit multiple times, hope one of them gets through
                radio.transmit(p, inner_times=2)

        radio.receiver()
        switch_state = (switch_state+1) % 2 # toggle
        

if __name__ == "__main__":
    
    trace("starting switch tester")
    radio.init()
    OpenThings.init(Devices.CRYPT_PID)

    # Seed the registry with a known device, to simplify tx-only testing
    SENSOR_ID = 0x68B # captured from a real device
    device_header = OpenThings.alterMessage(Messages.REGISTERED_SENSOR,
        header_mfrid     = Devices.MFRID,
        header_productid = Devices.PRODUCTID_MIHO005, # adaptor plus
        header_sensorid  = SENSOR_ID)
    Registry.update(device_header)


    sendSwitchTimer    = Timer(TX_RATE, 1)   # every n seconds offset by initial 1
    switch_state       = 0 # OFF
    radio.receiver()

    try:
        while True:
            switch_sniff_loop()
            switch_toggle_loop()

    finally:
        radio.finished()

# END