Newer
Older
pyenergenie / src / energenie / Devices.py
# Devices.py  30/09/2015  D.J.Whale
#
# Information about specific Energenie devices
# This table is mostly reverse-engineered from various websites and web catalogues.

import OnAir
import OpenThings

# This level of indirection allows easy mocking for testing
ook_interface = OnAir.TwoBitAirInterface()
fsk_interface = OnAir.OpenThingsAirInterface()


MFRID_ENERGENIE                  = 0x04
MFRID                            = MFRID_ENERGENIE

#PRODUCTID_MIHO001               =        #         Home Hub
#PRODUCTID_MIHO002               =        #         Control only (Uses Legacy OOK protocol)
#PRODUCTID_MIHO003               = 0x0?   #         Hand Controller
PRODUCTID_MIHO004                = 0x01   #         Monitor only
PRODUCTID_MIHO005                = 0x02   #         Adaptor Plus
PRODUCTID_MIHO006                = 0x05   #         House Monitor
#PRODUCTID_MIHO007               = 0x0?   #         Double Wall Socket White
#PRODUCTID_MIHO008               = 0x0?   #         Single light switch
#PRODUCTID_MIHO009 not used
#PRODUCTID_MIHO010 not used
#PRODUCTID_MIHO011 not used
#PRODUCTID_MIHO012 not used
PRODUCTID_MIHO013                = 0x03   #         eTRV
#PRODUCTID_MIHO014               = 0x0?   #         In-line Relay
#PRODUCTID_MIHO015 not used
#PRODUCTID_MIHO016 not used
#PRODUCTID_MIHO017
#PRODUCTID_MIHO018
#PRODUCTID_MIHO019
#PRODUCTID_MIHO020
#PRODUCTID_MIHO021               = 0x0?   #         Double Wall Socket Nickel
#PRODUCTID_MIHO022               = 0x0?   #         Double Wall Socket Chrome
#PRODUCTID_MIHO023               = 0x0?   #         Double Wall Socket Brushed Steel
#PRODUCTID_MIHO024               = 0x0?   #         Style Light Nickel
#PRODUCTID_MIHO025               = 0x0?   #         Style Light Chrome
#PRODUCTID_MIHO026               = 0x0?   #         Style Light Steel
#PRODUCTID_MIHO027 starter pack bundle
#PRODUCTID_MIHO028 eco starter pack
#PRODUCTID_MIHO029 heating bundle
#PRODUCTID_MIHO030 not used
#PRODUCTID_MIHO031 not used
#PRODUCTID_MIHO032 not used
#PRODUCTID_MIHO033 not used
#PRODUCTID_MIHO034 not used
#PRODUCTID_MIHO035 not used
#PRODUCTID_MIHO036 not used
#PRODUCTID_MIHO037 Adaptor Plus Bundle
#PRODUCTID_MIHO038 2-gang socket Bundle
#PRODUCTID_MIHO039 2-gang socket Bundle black nickel
#PRODUCTID_MIHO040 2-gang socket Bundle chrome
#PRODUCTID_MIHO041 2-gang socket Bundle stainless steel

# Default keys for OpenThings encryption and decryption
CRYPT_PID                        = 242
CRYPT_PIP                        = 0x0100

# OpenThings does not support a broadcast id,
# but Energenie added one for their MiHome Adaptors.
# This makes simple discovery possible.
BROADCAST_ID                     = 0xFFFFFF # Energenie broadcast

#TODO: This might be deprecated now, and replaced with the DeviceFactory?
def getDescription(mfrid, productid):
    if mfrid == MFRID_ENERGENIE:
        mfr = "Energenie"
        if productid == PRODUCTID_MIHO004:
            product = "MIHO004 MONITOR"
        elif productid == PRODUCTID_MIHO005:
            product = "MIHO005 ADAPTOR PLUS"
        elif productid == PRODUCTID_MIHO006:
            product = "MIHO006 HOUSE MONITOR"
        elif productid == PRODUCTID_MIHO013:
            product = "MIHO013 ETRV"
        else:
            product = "UNKNOWN_%s" % str(hex(productid))
    else:
        mfr     = "UNKNOWN_%s" % str(hex(mfrid))
        product = "UNKNOWN_%s" % str(hex(productid))

    return "Manufacturer:%s Product:%s" % (mfr, product)


#TODO this might be deprecated now, and replaced with the Device classes.
#e.g. if there is a turn_on method or get_switch method, it has a switch.
def hasSwitch(mfrid, productid):
    if mfrid != MFRID:                  return False
    if productid == PRODUCTID_MIHO005:  return True
    return False


#----- DEFINED MESSAGE TEMPLATES ----------------------------------------------

import copy

def create_message(message):
    return copy.deepcopy(message)

SWITCH = {
    "header": {
        "mfrid":       MFRID_ENERGENIE,
        "productid":   PRODUCTID_MIHO005,
        "encryptPIP":  CRYPT_PIP,
        "sensorid":    0 # FILL IN
    },
    "recs": [
        {
            "wr":      True,
            "paramid": OpenThings.PARAM_SWITCH_STATE,
            "typeid":  OpenThings.Value.UINT,
            "length":  1,
            "value":   0 # FILL IN
        }
    ]
}


JOIN_ACK = {
    "header": {
        "mfrid":       0, # FILL IN
        "productid":   0, # FILL IN
        "encryptPIP":  CRYPT_PIP,
        "sensorid":    0 # FILL IN
    },
    "recs": [
        {
            "wr":      False,
            "paramid": OpenThings.PARAM_JOIN,
            "typeid":  OpenThings.Value.UINT,
            "length":  0
        }
    ]
}


REGISTERED_SENSOR = {
    "header": {
        "mfrid":       MFRID_ENERGENIE,
        "productid":   0, # FILL IN
        "encryptPIP":  CRYPT_PIP,
        "sensorid":    0 # FILL IN
    }
}


def send_join_ack(radio, mfrid, productid, sensorid):
    # send back a JOIN ACK, so that join light stops flashing
    response = OpenThings.alterMessage(create_message(JOIN_ACK),
        header_mfrid=mfrid,
        header_productid=productid,
        header_sensorid=sensorid)
    p = OpenThings.encode(response)
    radio.transmitter()
    radio.transmit(p, inner_times=2)
    radio.receiver()



#----- CONTRACT WITH AIR-INTERFACE --------------------------------------------

# this might be a real air_interface (a radio), or an adaptor interface
# (a message scheduler with a queue).
#
#   synchronous send
#   synchronous receive
#   TODO: asynchronous send (deferred)    - implies a callback on 'done, fail, timeout'
#   TODO: asynchronous receive (deferred) - implies a callback on 'done, fail, timeout'

# air_interface has:
#   configure(parameters)
#   send(payload)
#   send(payload, parameters)
#   receive() -> (radio_measurements, address, payload)


#----- NEW DEVICE CLASSES -----------------------------------------------------

class Device():
    """A generic connected device abstraction"""
    def __init__(self, air_interface):
        self.air_interface = air_interface
        class Config(): pass
        self.config = Config()
        class Capabilities(): pass
        self.capabilities = Capabilities()

    def has_switch(self):
        return hasattr(self.capabilities, "switch")

    def can_send(self):
        return hasattr(self.capabilities, "send")

    def can_receive(self):
        return hasattr(self.capabilities, "receive")

    def get_radio_config(self):
        return self.config

    def get_last_receive_time(self): # ->timestamp
        """The timestamp of the last time any message was received by this device"""
        return self.last_receive_time

    def get_next_receive_time(self): # -> timestamp
        """An estimate of the next time we expect a message from this device"""
        pass

    def incoming_message(self, payload):
        # incoming_message (OOK or OpenThings as appropriate, stripped of header? decrypted, decoded to pydict)
        # default action of base class is to just print the payload
        print("incoming(unhandled):%s" % payload)

    def send_message(self, payload):
        print("send_message %s" % payload)
        # A raw device has no knowledge of how to send, the sub class provides that.

    def __repr__(self):
        return "Device()"


class EnergenieDevice(Device):
    """An abstraction for any kind of Energenie connected device"""
    def __init__(self, air_interface, device_id=None):
        Device.__init__(self, air_interface)
        self.device_id = device_id

    def get_device_id(self): # -> id:int
        return self.device_id

    def __repr__(self):
        return "Device(%s)" % str(self.device_id)


class LegacyDevice(EnergenieDevice):
    """An abstraction for Energenie green button legacy OOK devices"""
    def __init__(self):
        EnergenieDevice.__init__(self, ook_interface)
        self.config.frequency  = 433.92
        self.config.modulation = "OOK"
        self.config.codec      = "4bit"

    def __repr__(self):
        return "LegacyDevice(%s)" % str(self.device_id)

    def send_message(self, payload):
        ####HERE#### interface with air_interface
        # Encode the payload two bits per byte as per OOK spec
        #TODO, should we just pass a payload (as a pydict or tuple) to the air_interface adaptor
        #and let it encode it, to be consistent with the FSK MiHome devices?
        #payload could be a 3-tuple of (house_address, device_address, state)
        #bytes = TwoBit.build_switch_msg(payload, house_address=self.device_id[0], device_address=self.device_id[1])

        if self.air_interface != None:
            #TODO: might want to send the config, either as a send parameter,
            #or by calling air_interface.configure() first?
            #i.e. radio.modulation(MODULATION_OOK)
            self.air_interface.send(payload) #TODO: or (ha, da, s)
        else:
            d = self.device_id
            print("send_message(mock[%s]):%s" % (str(d), payload))


class MiHomeDevice(EnergenieDevice):
    """An abstraction for Energenie new style MiHome FSK devices"""
    def __init__(self, device_id=None):
        EnergenieDevice.__init__(self, fsk_interface, device_id)
        self.config.frequency  = 433.92
        self.config.modulation = "FSK"
        self.config.codec      = "OpenThings"
        self.manufacturer_id   = MFRID_ENERGENIE
        self.product_id        = None

        #Different devices might have different PIP's
        #if we are cycling codes on each message?
        #self.config.encryptPID = CRYPT_PID
        #self.config.encryptPIP = CRYPT_PIP

    def __repr__(self):
        return "MiHomeDevice(%s,%s,%s)" % (str(self.manufacturer_id), str(self.product_id), str(self.device_id))

    def get_manufacturer_id(self): # -> id:int
        return self.manufacturer_id

    def get_product_id(self): # -> id:int
        return self.product_id

    def join_ack(self):
        """Send a join-ack to the real device"""
        self.send_message("join ack") # TODO

    def incoming_message(self, payload):
        ####HERE#### interface with air_interface
        """Handle incoming messages for this device"""
        #NOTE: we must have already decoded the message with OpenThings to be able to get the addresses out
        # so payload at this point must be a pydict?

        #we know at this point that it's a FSK message
        #OpenThingsAirInterface has already decrypted and decoded
        #so we get a pydict payload here with header and recs in it
        #the header has the address which is used for routing

        #TODO join request might be handled generically here
        #TODO: subclass can override and call back to this if it wants to
        raise RuntimeError("Method unimplemented") #TODO

    def send_message(self, payload):
        ####HERE#### interface with air_interface
        #is payload a pydict with header at this point, and we have to call OpenThings.encode?
        #should the encode be done here, or in the air_interface adaptor?

        #TODO: at what point is the payload turned into a pydict?
        #TODO: We know it's going over OpenThings,
        #do we call OpenThings.encode(payload) here?
        #also OpenThings.encrypt() - done by encode() as default
        if self.air_interface != None:
            #TODO: might want to send the config, either as a send parameter,
            #or by calling air_interface.configure() first?
            self.air_interface.send(payload)
        else:
            m = self.manufacturer_id
            p = self.product_id
            d = self.device_id
            print("send_message(mock[%s %s %s]):%s" % (str(m), str(p), str(d), payload))


class ENER002(LegacyDevice):
    """A green-button switch"""
    def __init__(self, device_id=None):
        LegacyDevice.__init__(self)
        self.device_id = device_id # (house_address, device_index)
        self.config.tx_repeats = 8
        self.capabilities.switch = True
        self.capabilities.receive = True

    def turn_on(self):
        #TODO should this be here, or in LegacyDevice??
        #addressing should probably be in LegacyDevice
        #child devices might interpret the command differently
        payload = {
            "house_address":  self.device_id[0],
            "device_index":   self.device_id[1],
            "on":             True
        }
        self.send_message(payload)

    def turn_off(self):
        #TODO: should this be here, or in LegacyDevice???
        #addressing should probably be in LegacyDevice
        #child devices might interpret the command differently
        payload = {
            "house_address":  self.device_id[0],
            "device_index":   self.device_id[1],
            "on":             False
        }
        self.send_message(payload)


class MIHO005(MiHomeDevice):
    """An Energenie MiHome Adaptor Plus"""
    def __init__(self, device_id=None):
        MiHomeDevice.__init__(self)
        self.product_id = PRODUCTID_MIHO005
        self.device_id  = device_id
        class Readings():
            switch         = None
            voltage        = None
            frequency      = None
            apparent_power = None
            reactive_power = None
            real_power     = None
        self.readings = Readings()
        self.config.tx_repeats = 4
        self.capabilities.send = True
        self.capabilities.receive = True
        self.capabilities.switch = True

    def incoming_message(self, payload):
        ####HERE####
        for rec in payload["recs"]:
            paramid = rec["paramid"]
            if paramid in [OpenThings.PARAM_SWITCH_STATE, OpenThings.PARAM_VOLTAGE]: #TODO store in cache in init
                # TODO consider making this table driven and allowing our base class to fill our readings in for us
                # then just define the mapping table in __init__ (i.e. paramid->Readings field name)
                value = rec["value"]
                if paramid == OpenThings.PARAM_SWITCH_STATE:
                    self.readings.switch = ((value == True) or (value != 0))
                    print("#### Switch state updated")

    def get_readings(self): # -> readings:pydict
        """A way to get all readings as a single consistent set"""
        return self.readings

    def turn_on(self):
        #TODO: header construction should be in MiHomeDevice as it is shared
        payload = OpenThings.alterMessage(
            create_message(SWITCH),
            header_productid = self.product_id,
            header_sensorid  = self.device_id,
            recs_0_value     = True)
        self.send_message(payload)

    def turn_off(self):
        #TODO: header construction should be in MiHomeDevice as it is shared
        payload = OpenThings.alterMessage(
            create_message(SWITCH),
            header_productid = self.product_id,
            header_sensorid  = self.device_id,
            recs_0_value     = False)
        self.send_message(payload)

    #TODO: difference between 'is on and 'is requested on'
    #TODO: difference between 'is off' and 'is requested off'
    #TODO: switch state might be 'unknown' if not heard.
    #TODO: switch state might be 'turning_on' or 'turning_off' if send request and not heard response yet

    def is_on(self): # -> boolean
        """True, False, or None if unknown"""
        s = self.get_switch()
        if s == None: return None
        return s

    def is_off(self): # -> boolean
        """True, False, or None if unknown"""
        s = self.get_switch()
        if s == None: return None
        return not s

    def get_switch(self): # -> boolean
        """Last stored state of the switch, might be None if unknown"""
        return self.readings.switch

    def get_voltage(self): # -> voltage:float
        """Last stored state of voltage reading, None if unknown"""
        if self.readings.voltage == None:
            raise RuntimeError("No voltage reading received yet")
        return self.readings.voltage

    def get_frequency(self): # -> frequency:float
        """Last stored state of frequency reading, None if unknown"""
        if self.readings.frequency == None:
            raise RuntimeError("No frequency reading received yet")
        return self.readings.frequency

    def get_apparent_power(self): # ->power:float
        """Last stored state of apparent power reading, None if unknown"""
        if self.readings.apparent_power == None:
            raise RuntimeError("No apparent power reading received yet")
        return self.readings.apparent_power

    def get_reactive_power(self): # -> power:float
        """Last stored state of reactive power reading, None if unknown"""
        if self.readings.reactive_power == None:
            raise RuntimeError("No reactive power reading received yet")
        return self.readings.reactive_power

    def get_real_power(self): #-> power:float
        """Last stored state of real power reading, None if unknown"""
        if self.readings.real_power == None:
            raise RuntimeError("No real power reading received yet")
        return self.readings.real_power


class MIHO006(MiHomeDevice):
    """An Energenie MiHome Home Monitor"""
    def __init__(self, device_id=None):
        MiHomeDevice.__init__(self)
        self.product_id = PRODUCTID_MIHO006
        self.device_id  = device_id
        class Readings():
            battery_voltage = None
            current         = None
        self.readings = Readings()
        self.capabilities.send = True

    def get_battery_voltage(self): # -> voltage:float
        return self.readings.battery_voltage

    def get_current(self): # -> current:float
        return self.readings.current


class MIHO013(MiHomeDevice):
    """An Energenie MiHome eTRV Radiator Valve"""
    def __init__(self, device_id=None):
        MiHomeDevice.__init__(self)
        self.product_id = PRODUCTID_MIHO013
        self.device_id  = device_id
        class Readings():
            battery_voltage      = None
            ambient_temperature  = None
            pipe_temperature     = None
            setpoint_temperature = None
            valve_position       = None
        self.readings = Readings()
        self.config.tx_repeats = 10
        self.capabilities.send = True
        self.capabilities.receive = True

    def get_battery_voltage(self): # ->voltage:float
        return self.readings.battery_voltage

    def get_ambient_temperature(self): # -> temperature:float
        return self.readings.ambient_temperature

    def get_pipe_temperature(self): # -> temperature:float
        return self.readings.pipe_temperature

    def get_setpoint_temperature(self): #-> temperature:float
        return self.readings.setpoint_temperature

    def set_setpoint_temperature(self, temperature):
        self.send_message("set setpoint temp") # TODO command

    def get_valve_position(self): # -> position:int?
        pass # TODO is this possible?

    def set_valve_position(self, position):
        pass # TODO command, is this possible?
        self.send_message("set valve pos") #TODO

    #TODO: difference between 'is on and 'is requested on'
    #TODO: difference between 'is off' and 'is requested off'
    #TODO: switch state might be 'unknown' if not heard.
    #TODO: switch state might be 'turning_on' or 'turning_off' if send request and not heard response yet

    def turn_on(self): # command
        pass # TODO command i.e. valve position?
        self.send_message("turn on") #TODO

    def turn_off(self): # command
        pass # TODO command i.e. valve position?
        self.send_message("turn off") #TODO

    def is_on(self): # query last known reported state (unknown if changing?)
        pass # TODO i.e valve is not completely closed?

    def is_off(self): # query last known reported state (unknown if changing?)
        pass # TODO i.e. valve is completely closed?


#----- DEVICE FACTORY ---------------------------------------------------------

# This is a singleton, but might not be in the future.
# i.e. we might have device factories for lots of different devices.
# and a DeviceFactory could auto configure it's set of devices
# with a specific air_interface for us.
# i.e. this might be the EnergenieDeviceFactory, there might be others

class DeviceFactory():
    """A place to come to, to get instances of device classes"""
    devices = {
        # official name            friendly name
        "ENER002":     ENER002,    "GreenButton": ENER002,
        "MIHO005":     MIHO005,    "AdaptorPlus": MIHO005,
        "MIHO006":     MIHO006,    "HomeMonitor": MIHO006,
        "MIHO013":     MIHO013,    "eTRV":        MIHO013,
    }
    default_air_interface = None

    @staticmethod
    def set_default_air_interface(air_interface):
        DeviceFactory.default_air_interface = air_interface

    @staticmethod
    def keys():
        return DeviceFactory.devices.keys()

    @staticmethod
    def get_device(name, air_interface=None, device_id=None):
        """Get a device by name, construct a new instance"""
        if not name in DeviceFactory.devices:
            raise ValueError("Unsupported device:%s" % name)

        c = DeviceFactory.devices[name]
        if air_interface == None:
            air_interface = DeviceFactory.default_air_interface
        return c(air_interface, device_id)


#----- TEMPORARY TEST HARNESS -------------------------------------------------

#hmm: need two addresses for legacy - use a tuple (house_address, index)

#unless we have an adaptor class for air_interface which represents the
#collective house address for a house code. So if you use more than one
#house address, you create multiple air interface adaptors with different
#house codes, that just delegate to the same actual radio air interface?
#bit like a little local router?

#legacy1 = AirInterface.create("OOK", address=0xC8C8C, energenie_radio)

# Could also consider this a local network, with common parameters shared
# by all devices that use it.
#air2    = AirInterface.create("FSK", energenie_radio)

# scheduling would then become
# scheduler = Scheduler(energenie_radio)
# legacy1 = AirInterface.create("OOK", address=0xC8C8C, scheduler)
# air2    = AirInterface.create("FSK", scheduler
# so that when a device tries to transmit, it gets air interface specific
# settings added to it as appropriate, then the scheduler decides when
# to send and receive

# Somehow we need to associate devices with an air interface
# This might allow us to support multiple radios in the future too?
#legacy1.add(tv)

# cooperative loop could be energenie_radio.loop()
# or wrap a thread around it with start() but beware of thread context
# and thread safety.

import time

def test_without_registry():

    tv  = DeviceFactory.get_device("GreenButton", device_id=(0xC8C8C, 1))
    fan = DeviceFactory.get_device("AdaptorPlus", device_id=0x68b)

    while True:
        print("ON")
        tv.turn_on()
        fan.turn_off()
        time.sleep(2)

        print("OFF")
        tv.turn_off()
        fan.turn_on()
        time.sleep(1)


if __name__ == "__main__":
    test_without_registry()

# END