diff --git a/src/energenie/modulator.py b/src/energenie/modulator.py index 251bb2c..bd6b96e 100644 --- a/src/energenie/modulator.py +++ b/src/energenie/modulator.py @@ -1,6 +1,6 @@ # modulator.py 27/03/2016 D.J.Whale # -# OOK modulator +# modulator for use with OOK payloads ALL_SOCKETS = 0 diff --git a/src/energenie/radio.py b/src/energenie/radio.py index 2420d0d..45589ce 100644 --- a/src/energenie/radio.py +++ b/src/energenie/radio.py @@ -289,31 +289,6 @@ warning("Failed to send payload to HRF") -def send_payload_repeat(payload, times=0): - """Send a payload multiple times""" - - # 32 bits enclosed in sync words - HRF_pollreg(ADDR_IRQFLAGS1, MASK_MODEREADY|MASK_TXREADY, MASK_MODEREADY|MASK_TXREADY) - - #write first payload without sync preamble - HRF_writefifo_burst(payload) - - # preceed all future payloads with a sync-word preamble - if times > 0: - preamble = [0x00,0x80,0x00,0x00,0x00] - preamble_payload = preamble + payload - for i in range(times): # Repeat the message a number of times - HRF_pollreg(ADDR_IRQFLAGS2, MASK_FIFOLEVEL, 0) - HRF_writefifo_burst(preamble_payload) - - HRF_pollreg(ADDR_IRQFLAGS2, MASK_PACKETSENT, MASK_PACKETSENT) # wait for Packet sent - - reg = HRF_readreg(ADDR_IRQFLAGS2) - trace(" irqflags2=%s" % hex(reg)) - if (reg & (MASK_FIFONOTEMPTY) != 0) or ((reg & MASK_FIFOOVERRUN) != 0): - warning("Failed to send repeated payload to HRF") - - def dumpPayloadAsHex(payload): length = payload[0] print(hex(length)) @@ -324,47 +299,32 @@ print("[%d] = %s" % (i, hex(payload[i]))) -def build_OOK_relay_msg(relayState=False): - """Temporary test code to prove we can turn the relay on or off""" - #This code does not live in this module, it lives in an EnergenieLegacy codec module - #also there are 4 switches, so should pass in up to 4 relayState values +def HRF_send_OOK_payload_repeat(payload, times=0): + """Send a payload multiple times""" - # This generates a 20*4 bit address i.e. 10 bytes - # The number generated is always the same - # Presumably this is the 'Energenie address prefix' - # The switch number is encoded in the payload - # This code looks screwy, but it is correct compared to the C code. - # It's probably doing bit encoding on the fly, manchester or sync bits or something. - # Wait until we get the OOK spec from Energenie to better document this. + HRF_pollreg(ADDR_IRQFLAGS1, MASK_MODEREADY|MASK_TXREADY, MASK_MODEREADY|MASK_TXREADY) - # Looks like: 0000 00BA gets encoded as: - # 128 64 32 16 8 4 2 1 - # 1 0 B B 1 A A 0 + # A payload is 32 bits enclosed in sync words + # write first payload without sync preamble + HRF_writefifo_burst(payload) - #payload = [] - #for i in range(10): - # j = i + 5 - # payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) - #dumpPayloadAsHex(payload) - # - # 5 6 7 8 9 10 11 12 13 14 - # 1(01) 1(10) 1(11) 0(00) 0(01) 0(10) 0(11) 1(00) 1(01) 1(10) - payload = [0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8] + # preceed all future payloads with a sync-word preamble + if times > 0: + SYNC_PREAMBLE = [0x00,0x80,0x00,0x00,0x00] + preamble_payload = SYNC_PREAMBLE + payload + for i in range(times): # Repeat the message a number of times + HRF_pollreg(ADDR_IRQFLAGS2, MASK_FIFOLEVEL, 0) + HRF_writefifo_burst(preamble_payload) - if relayState: # ON - # D0=high, D1=high, D2-high, D3=high (S1 on) - # 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 - # 1 0 B B 1 A A 0 1 0 B B 1 A A 0 - # 1 1 1 0 1 1 1 0 1 1 1 0 1 1 1 0 = 10 11 10 11 - payload += [0xEE, 0xEE] - else: # OFF - # D0=high, D1=high, D2=high, D3=low (S1 off) - # 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 - # 1 0 B B 1 A A 0 1 0 B B 1 A A 0 - # 1 1 1 0 1 1 1 0 1 1 1 0 1 0 0 0 = 10 11 10 00 - payload += [0xEE, 0xE8] + #TODO: this will be unreliable, as the IRQ might fire on any single packet + #need to check how big the FIFO is, and just build a single payload and burst load it + HRF_pollreg(ADDR_IRQFLAGS2, MASK_PACKETSENT, MASK_PACKETSENT) # wait for Packet sent - return payload + reg = HRF_readreg(ADDR_IRQFLAGS2) + trace(" irqflags2=%s" % hex(reg)) + if (reg & (MASK_FIFONOTEMPTY) != 0) or ((reg & MASK_FIFOOVERRUN) != 0): + warning("Failed to send repeated OOK payload to HRF") + #----- USER API --------------------------------------------------------------- @@ -425,7 +385,10 @@ def transmit(payload): """Transmit a single payload using the present modulation scheme""" - HRF_send_payload(payload) + if not modulation_fsk: + HRF_send_OOK_payload_repeat(payload, times=4) + else: + HRF_send_payload(payload) def receiver(fsk=None, ook=None): diff --git a/src/energenie/test_modulator.py b/src/energenie/test_modulator.py index f05ee5f..6e0d621 100644 --- a/src/energenie/test_modulator.py +++ b/src/energenie/test_modulator.py @@ -2,6 +2,7 @@ # # Test the OOK modulator +import modulator def ashex(data): if type(data) == list: @@ -12,112 +13,27 @@ else: return str(hex(data)) -ALL_SOCKETS = 0 - -def build_OOK_relay_msg(state, device_address=ALL_SOCKETS, house_address=None): - """Temporary test code to prove we can turn the relay on or off""" - #This code does not live in this module, it lives in an EnergenieLegacy codec module - print("build: state:%s, device:%d, house:%s" % (str(state), device_address, str(house_address))) - - if house_address == None: - #this is just a fixed address generator, from the C code - #payload = [] - #for i in range(10): - # j = i + 5 - # payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) - #dumpPayloadAsHex(payload) - # binary = 0110 1100 0110 1100 0110 - # hex = 6 C 6 C 6 - house_address = 0x6C6C6 - - payload = modulate_bits((house_address & 0x0F0000) >> 16, 4) - payload += modulate_bits((house_address & 0x00FF00) >> 8, 8) - payload += modulate_bits((house_address & 0x0000FF), 8) - - # Turn switch request into a 4 bit switch command, and add to payload - # D 3210 - # 0000 UNUSED - # 0001 UNUSED - # 0010 UNUSED - # 0011 All off (3) - # 0100 socket 4 off (4) - # 0101 socket 3 off (5) - # 0110 socket 2 off (6) - # 0111 socket 1 off (7) - # 1000 UNUSED - # 1101 UNUSED - # 1110 UNUSED - # 1011 All on (3) - # 1100 socket 4 on (4) - # 1101 socket 3 on (5) - # 1110 socket 2 on (6) - # 1111 socket 1 on (7) - - if not state: # OFF - bits = 0x00 - else: # ON - bits = 0x08 - - if device_address == ALL_SOCKETS: - bits |= 0x03 # ALL - else: - bits += 7-((device_address-1) & 0x03) - - payload += modulate_bits(bits, 4) - return payload - - -def modulate_bytes(data): - """Turn a list of bytes into a modulated pattern equivalent""" - #print("modulate_bytes: %s" % ashex(data)) - payload = [] - for b in data: - payload += modulate_bits(b, 8) - #print(" returns: %s" % ashex(payload)) - return payload - - -MODULATOR = [0x88, 0x8E, 0xE8, 0xEE] - -def modulate_bits(data, number): - """Turn bits into n bytes of modulation patterns""" - # 0000 00BA gets encoded as: - # 128 64 32 16 8 4 2 1 - # 1 B B 0 1 A A 0 - # i.e. a 0 is a short pulse, a 1 is a long pulse - #print("modulate_bits %s (%s)" % (ashex(data), str(number))) - - shift = number-2 - modulated = [] - for i in range(number/2): - bits = (data >> shift) & 0x03 - #print(" shift %d bits %d" % (shift, bits)) - modulated.append(MODULATOR[bits]) - shift -= 2 - #print(" returns:%s" % ashex(modulated)) - return modulated - #----- TEST APPLICATION ------------------------------------------------------- print("*" * 80) -ALL_ON = build_OOK_relay_msg(True) -ALL_OFF = build_OOK_relay_msg(False) +ALL_ON = modulator.build_switch_msg(True) +ALL_OFF = modulator.build_switch_msg(False) -ONE_ON = build_OOK_relay_msg(True, device_address=1) -ONE_OFF = build_OOK_relay_msg(False, device_address=1) +ONE_ON = modulator.build_switch_msg(True, device_address=1) +ONE_OFF = modulator.build_switch_msg(False, device_address=1) -TWO_ON = build_OOK_relay_msg(True, device_address=2) -TWO_OFF = build_OOK_relay_msg(False, device_address=2) +TWO_ON = modulator.build_switch_msg(True, device_address=2) +TWO_OFF = modulator.build_switch_msg(False, device_address=2) -THREE_ON = build_OOK_relay_msg(True, device_address=3) -THREE_OFF = build_OOK_relay_msg(False, device_address=3) +THREE_ON = modulator.build_switch_msg(True, device_address=3) +THREE_OFF = modulator.build_switch_msg(False, device_address=3) -FOUR_ON = build_OOK_relay_msg(True, device_address=4) -FOUR_OFF = build_OOK_relay_msg(False, device_address=4) +FOUR_ON = modulator.build_switch_msg(True, device_address=4) +FOUR_OFF = modulator.build_switch_msg(False, device_address=4) -MYHOUSE_ALL_ON = build_OOK_relay_msg(True, house_address=0x12345) +MYHOUSE_ALL_ON = modulator.build_switch_msg(True, house_address=0x12345) tests = [ALL_ON, ALL_OFF, ONE_ON, ONE_OFF, TWO_ON, TWO_OFF, THREE_ON, THREE_OFF, FOUR_ON, FOUR_OFF, MYHOUSE_ALL_ON] diff --git a/src/legacy.py b/src/legacy.py index 76f3592..182ed5d 100644 --- a/src/legacy.py +++ b/src/legacy.py @@ -1,79 +1,105 @@ # legacy.py 17/03/2016 D.J.Whale # -# Placeholder for a legacy plug (OOK) test harness +# Simple legacy plug test harness. +# Legacy plugs have a big green button on the front, +# and usually come in kits with a handheld remote control. # -# This is a temporary piece of code, that will be thrown away once the device object interface -# is written. +# Note: This is a temporary piece of code, that will be thrown away +# once the device object interface is written. # -# Please don't use this as the basis for an application, it is only designed to be used -# to test the lower level code. - -# switch.py 17/03/2016 D.J.Whale -# -# Control Energenie switches. -# Note, at the moment, this only works with MiHome Adaptor Plus devices -# because the 'sensorid' is discovered via the monitor message. -# You could probably fake it by preloading the directory with your sensorid -# if you know what it is. - -# Note, this is *only* a test program, to exercise the lower level code. -# Don't expect this to be a good starting point for an application. -# Consider waiting for me to finish developing the device object interface first. +# Please don't use this as the basis for an application, +# it is only designed to be used to test the lower level code. import time -from energenie import Devices, radio +from energenie import modulator from energenie import radio -from Timer import Timer - -TX_RATE = 10 # seconds between each switch change cycle - -def warning(msg): - print("warning:%s" % str(msg)) - -def trace(msg): - print("monitor:%s" % str(msg)) #----- TEST APPLICATION ------------------------------------------------------- +# Prebuild all possible message up front, to make switching code faster + +HOUSE_ADDRESS = None # default + +ALL_ON = modulator.build_switch_msg(True, house_address=HOUSE_ADDRESS) +ONE_ON = modulator.build_switch_msg(True, device_address=1, house_address=HOUSE_ADDRESS) +TWO_ON = modulator.build_switch_msg(True, device_address=2, house_address=HOUSE_ADDRESS) +THREE_ON = modulator.build_switch_msg(True, device_address=3, house_address=HOUSE_ADDRESS) +FOUR_ON = modulator.build_switch_msg(True, device_address=4, house_address=HOUSE_ADDRESS) +ON_MSGS = [ALL_ON, ONE_ON, TWO_ON, THREE_ON, FOUR_ON] + +ALL_OFF = modulator.build_switch_msg(False, house_address=HOUSE_ADDRESS) +ONE_OFF = modulator.build_switch_msg(False, device_address=1, house_address=HOUSE_ADDRESS) +TWO_OFF = modulator.build_switch_msg(False, device_address=2, house_address=HOUSE_ADDRESS) +THREE_OFF = modulator.build_switch_msg(False, device_address=3, house_address=HOUSE_ADDRESS) +FOUR_OFF = modulator.build_switch_msg(False, device_address=4, house_address=HOUSE_ADDRESS) +OFF_MSGS = [ALL_OFF, ONE_OFF, TWO_OFF, THREE_OFF, FOUR_OFF] + + +def get_yes_no(): + """Get a simple yes or no answer""" + answer = raw_input() # python2 + if answer.upper() in ['Y', 'YES']: + return True + return False + + +def legacy_learn_mode(): + """Give the user a chance to learn any switches""" + print("Do you want to program any switches?") + y = get_yes_no() + if not y: + return + + for switch_no in range(1,5): + print("Learn switch %d?" % switch_no) + y = get_yes_no() + if y: + print("Press the LEARN button on any switch %d for 5 secs until LED flashes" % switch_no) + raw_input("press ENTER when LED is flashing") + + radio.transmit(OFF_MSGS[switch_no]) + + print("Device should now be programmed") + + print("Testing....") + for i in range(4): + time.sleep(1) + radio.transmit(ON_MSGS[switch_no]) + time.sleep(1) + radio.transmit(OFF_MSGS[switch_no]) + print("Test completed") + + def legacy_switch_loop(): - """Listen to sensor messages, and turn switches on and off every few seconds""" - - radio.transmitter(ook=True) - - # Transmit a code multiple times and program it into the switch - print("Press the LEARN button on the switch for 5 secs until LED flashes") - raw_input("press ENTER when LED is flashing") - - ON = radio.build_OOK_relay_msg(True) - OFF = radio.build_OOK_relay_msg(False) + """Turn all switches on or off every few seconds""" while True: - print("sending ON") - radio.send_payload_repeat(ON, 8) + print("sending ALL ON") + radio.transmit(ALL_ON) print("waiting") time.sleep(2) - print("sending OFF") - radio.send_payload_repeat(OFF, 8) + print("sending ALL OFF") + radio.transmit(ALL_OFF) print("waiting") time.sleep(2) if __name__ == "__main__": - trace("starting legacy switch tester") + print("starting legacy switch tester") radio.init() + radio.transmitter(ook=True) try: + legacy_learn_mode() legacy_switch_loop() finally: radio.finished() -# END - # END