| |
---|
| | class Device(): |
---|
| | """A generic connected device abstraction""" |
---|
| | def __init__(self, device_id=None, air_interface=None): |
---|
| | self.air_interface = air_interface |
---|
| | self.device_id = device_id |
---|
| | self.device_id = self.parse_device_id(device_id) |
---|
| | class Config(): pass |
---|
| | self.config = Config() |
---|
| | class Capabilities(): pass |
---|
| | self.capabilities = Capabilities() |
---|
| | self.updated_cb = None |
---|
| | |
---|
| | def get_config(self): |
---|
| | raise RuntimeError("There is no configuration for a base Device") |
---|
| | |
---|
| | @staticmethod |
---|
| | def parse_device_id(device_id): |
---|
| | """device_id could be a number, a hex string or a decimal string""" |
---|
| | ##print("**** parsing: %s" % str(device_id)) |
---|
| | if device_id == None: |
---|
| | raise ValueError("device_id is None, not allowed") |
---|
| | |
---|
| | if type(device_id) == int: |
---|
| | return device_id # does not need to be parsed |
---|
| | |
---|
| | if type(device_id) == tuple or type(device_id) == list: |
---|
| | # each part of the tuple could be encoded |
---|
| | res = [] |
---|
| | for p in device_id: |
---|
| | res.append(Device.parse_device_id(p)) |
---|
| | #TODO: could usefully convert to tuple here to be helpful |
---|
| | return res |
---|
| | |
---|
| | if type(device_id) == str: |
---|
| | # could be hex or decimal or strtuple or strlist |
---|
| | if device_id == "": |
---|
| | raise ValueError("device_id is blank, not allowed") |
---|
| | elif device_id.startswith("0x"): |
---|
| | return int(device_id, 16) |
---|
| | elif device_id[0] == '(' and device_id[-1] == ')': |
---|
| | ##print("**** parse tuple") |
---|
| | inner = device_id[1:-1] |
---|
| | parts = inner.split(',') |
---|
| | ##print(parts) |
---|
| | res = [] |
---|
| | for p in parts: |
---|
| | res.append(Device.parse_device_id(p)) |
---|
| | ##print(res) |
---|
| | return res |
---|
| | |
---|
| | elif device_id[0] == '[' and device_id[-1] == ']': |
---|
| | ##print("**** parse list") |
---|
| | inner = device_id[1:-1] |
---|
| | parts = inner.split(',') |
---|
| | ##print(parts) |
---|
| | res = [] |
---|
| | for p in parts: |
---|
| | res.append(Device.parse_device_id(p)) |
---|
| | #TODO: could usefully change to tuple here |
---|
| | ##print(res) |
---|
| | return res |
---|
| | else: |
---|
| | return int(device_id, 10) |
---|
| | |
---|
| | else: |
---|
| | raise ValueError("device_id unsupported type or format, got: %s %s" % (type(device_id), str(device_id))) |
---|
| | |
---|
| | |
---|
| | def has_switch(self): |
---|
| | return hasattr(self.capabilities, "switch") |
---|
| | |
---|
| |
---|
| | def get_config(self): |
---|
| | """Get the persistable config, enough to reconstruct this class from a factory""" |
---|
| | return { |
---|
| | "type": self.__class__.__name__, |
---|
| | "manufacturer_id": self.manufacturer_id, |
---|
| | "product_id": self.product_id, |
---|
| | ##"manufacturer_id": self.manufacturer_id, # not needed, known by class |
---|
| | ##"product_id": self.product_id, # not needed, known by class |
---|
| | "device_id": self.device_id |
---|
| | } |
---|
| | |
---|
| | def __repr__(self): |
---|
| |
---|
| | self.config.tx_repeats = 8 |
---|
| | self.capabilities.switch = True |
---|
| | self.capabilities.receive = True |
---|
| | |
---|
| | def __repr__(self): |
---|
| | return "ENER002(%s,%s)" % (str(hex(self.device_id[0])), str(hex(self.device_id[1]))) |
---|
| | |
---|
| | |
---|
| | def turn_on(self): |
---|
| | #TODO should this be here, or in LegacyDevice?? |
---|
| | #addressing should probably be in LegacyDevice |
---|
| | #child devices might interpret the command differently |
---|
| |
---|
| | "on": False |
---|
| | } |
---|
| | self.send_message(payload) |
---|
| | |
---|
| | def __repr__(self): |
---|
| | return "ENER002(%s,%s)" % (str(hex(self.device_id[0]), str(self.device_id[1]))) |
---|
| | |
---|
| | |
---|
| | class MIHO004(MiHomeDevice): |
---|
| | """Monitor only Adaptor""" |
---|
| |
---|
| | def keys(): |
---|
| | return DeviceFactory.device_from_name.keys() |
---|
| | |
---|
| | @staticmethod |
---|
| | def get_device_from_name(name, device_id=None, air_interface=None): |
---|
| | def get_device_from_name(name, device_id=None, air_interface=None, **kwargs): |
---|
| | """Get a device by name, construct a new instance""" |
---|
| | # e.g. This is useful when creating device class instances from a human readable config |
---|
| | if not name in DeviceFactory.device_from_name: |
---|
| | raise ValueError("Unsupported device:%s" % name) |
---|
| | |
---|
| | c = DeviceFactory.device_from_name[name] |
---|
| | if air_interface == None: |
---|
| | air_interface = DeviceFactory.default_air_interface |
---|
| | return c(device_id, air_interface) |
---|
| | return c(device_id, air_interface, **kwargs) |
---|
| | |
---|
| | @staticmethod |
---|
| | def get_device_from_id(id, device_id=None, air_interface=None): |
---|
| | """Get a device by it's id, construct a new instance""" |
---|
| |
---|
| | |