diff --git a/src/combined.py b/src/combined.py index 5467c22..4f04632 100644 --- a/src/combined.py +++ b/src/combined.py @@ -8,7 +8,7 @@ # will be much nicer to use. import time -from energenie import Messages, OpenThings, radio, encoder, Devices +from energenie import Messages, OpenThings, radio, TwoBit, Devices # build FSK messages for MiHome purple @@ -29,8 +29,8 @@ # build OOK messages for legacy green button -GREEN_ON = encoder.build_switch_msg(True, device_address=1) -GREEN_OFF = encoder.build_switch_msg(False, device_address=1) +GREEN_ON = TwoBit.build_switch_msg(True, device_address=1) +GREEN_OFF = TwoBit.build_switch_msg(False, device_address=1) def switch_loop(): diff --git a/src/energenie/Devices.py b/src/energenie/Devices.py index 9a55caa..e869283 100644 --- a/src/energenie/Devices.py +++ b/src/energenie/Devices.py @@ -218,7 +218,7 @@ def incoming_message(self, payload): """Handle incoming messages for this device""" #we know at this point that it's a FSK message - #TODO: do we OpenThings.decrypt() here? + #TODO: do we OpenThings.decrypt() here? Done by OpenThings.decode() by default #TODO: do we OpenThings.decode() here into a pydict header/recs?? #TODO join request might be handled generically here @@ -229,7 +229,7 @@ #TODO: at what point is the payload turned into a pydict? #TODO: We know it's going over OpenThings, #do we call OpenThings.encode(payload) here? - #also OpenThings.encrypt() + #also OpenThings.encrypt() - done by encode() as default if self.air_interface != None: #TODO: might want to send the config, either as a send parameter, #or by calling air_interface.configure() first? diff --git a/src/energenie/TwoBit.py b/src/energenie/TwoBit.py new file mode 100644 index 0000000..484a914 --- /dev/null +++ b/src/energenie/TwoBit.py @@ -0,0 +1,169 @@ +# encoder.py 27/03/2016 D.J.Whale +# +# payload encoder for use with OOK payloads + +ALL_SOCKETS = 0 + +# The preamble is now stored in the payload, +# this is more predictable than using the radio sync feature +PREAMBLE = [0x80, 0x00, 0x00, 0x00] + +# This generates a 20*4 bit address i.e. 10 bytes +# The number generated is always the same +# This is the random 'Energenie address prefix' +# The switch number is encoded in the payload +# 0000 00BA gets encoded as: +# 128 64 32 16 8 4 2 1 +# 1 B B 0 1 A A 0 +#payload = [] +#for i in range(10): +# j = i + 5 +# payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) +#dumpPayloadAsHex(payload) + +#this is just a fixed address generator, from the C code +#payload = [] +#for i in range(10): +# j = i + 5 +# payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) +#dumpPayloadAsHex(payload) +# binary = 0110 1100 0110 1100 0110 +# hex = 6 C 6 C 6 + +DEFAULT_ADDR = 0x6C6C6 + +# 5 6 7 8 9 10 11 12 13 14 +# 1(01) 1(10) 1(11) 0(00) 0(01) 0(10) 0(11) 1(00) 1(01) 1(10) +DEFAULT_ADDR_ENC = [0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8] + +# D0=high, D1=high, D2-high, D3=high (S1 on) sent as:(D0D1D2D3) +# 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 +# 1 B B 0 1 A A 0 1 B B 0 1 A A 0 +# 1 1 1 0 1 1 1 0 1 1 1 0 1 1 1 0 + +SW1_ON_ENC = [0xEE, 0xEE] # 1111 sent as 1111 + +# D0=high, D1=high, D2=high, D3=low (S1 off) +# 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 +# 1 B B 0 1 A A 0 1 B B 0 1 A A 0 +# 1 1 1 0 1 1 1 0 1 1 1 0 1 0 0 0 + +SW1_OFF_ENC = [0xEE, 0xE8] # 1110 sent as 0111 + + +def ashex(payload): + line = "" + for b in payload: + line += str(hex(b)) + " " + return line + + +def build_relay_msg(relayState=False): + """Temporary test code to prove we can turn the relay on or off""" + + payload = PREAMBLE #TODO: + DEFAULT_ADDR_ENC ?? + + if relayState: # ON + payload += SW1_ON_ENC + + else: # OFF + payload += SW1_OFF_ENC + + return payload + + +def build_test_message(pattern): + """build a test message for a D3D2D1D0 control patter""" + payload = PREAMBLE + DEFAULT_ADDR_ENC + pattern &= 0x0F + control = encode_bits(pattern, 4) + payload += control + return payload + + +def build_switch_msg(state, device_address=ALL_SOCKETS, house_address=None): + """Build a message to turn a switch on or off""" + #print("build: state:%s, device:%d, house:%s" % (str(state), device_address, str(house_address))) + + if house_address == None: + house_address = DEFAULT_ADDR + + payload = [] + PREAMBLE + payload += encode_bits((house_address & 0x0F0000) >> 16, 4) + payload += encode_bits((house_address & 0x00FF00) >> 8, 8) + payload += encode_bits((house_address & 0x0000FF), 8) + + # Coded as per the (working) C code, as it is transmitted D0D1D2D3: + # D 0123 + # b 3210 + # 0000 UNUSED 0 + # 0001 UNUSED 1 + # 0010 socket 4 off 2 + # 0011 socket 4 on 3 + # 0100 UNUSED 4 + # 0101 UNUSED 5 + # 0110 socket 2 off 6 + # 0111 socket 2 on 7 + # 1000 UNUSED 8 + # 1001 UNUSED 9 + # 1010 socket 3 off A + # 1011 socket 3 on B + # 1100 all off C + # 1101 all on D + # 1110 socket 1 off E + # 1111 socket 1 on F + + if not state: # OFF + bits = 0x00 + else: # ON + bits = 0x01 + + if device_address == ALL_SOCKETS: + bits |= 0x0C # ALL + elif device_address == 1: + bits |= 0x0E + elif device_address == 2: + bits |= 0x06 + elif device_address == 3: + bits |= 0x0A + elif device_address == 4: + bits |= 0x02 + + payload += encode_bits(bits, 4) + #print("encoded as:%s" % ashex(payload)) + return payload + + +def encode_bytes(data): + """Turn a list of bytes into a modulated pattern equivalent""" + #print("modulate_bytes: %s" % ashex(data)) + payload = [] + for b in data: + payload += encode_bits(b, 8) + #print(" returns: %s" % ashex(payload)) + return payload + + +ENCODER = [0x88, 0x8E, 0xE8, 0xEE] + +def encode_bits(data, number): + """Turn bits into n bytes of modulation patterns""" + # 0000 00BA gets encoded as: + # 128 64 32 16 8 4 2 1 + # 1 B B 0 1 A A 0 + # i.e. a 0 is a short pulse, a 1 is a long pulse + #print("modulate_bits %s (%s)" % (ashex(data), str(number))) + + shift = number-2 + encoded = [] + for i in range(int(number/2)): + bits = (data >> shift) & 0x03 + #print(" shift %d bits %d" % (shift, bits)) + encoded.append(ENCODER[bits]) + shift -= 2 + #print(" returns:%s" % ashex(encoded)) + return encoded + + +# END + diff --git a/src/energenie/TwoBit_test.py b/src/energenie/TwoBit_test.py new file mode 100644 index 0000000..34e8855 --- /dev/null +++ b/src/energenie/TwoBit_test.py @@ -0,0 +1,41 @@ +# test_encoder.py 27/03/2016 D.J.Whale +# +# Test the OOK message encoder + +import TwoBit + +def ashex(data): + if type(data) == list: + line = "" + for b in data: + line += str(hex(b)) + " " + return line + else: + return str(hex(data)) + + +#----- TEST APPLICATION ------------------------------------------------------- + +print("*" * 80) + +ALL_ON = TwoBit.build_switch_msg(True) +ALL_OFF = TwoBit.build_switch_msg(False) +ONE_ON = TwoBit.build_switch_msg(True, device_address=1) +ONE_OFF = TwoBit.build_switch_msg(False, device_address=1) +TWO_ON = TwoBit.build_switch_msg(True, device_address=2) +TWO_OFF = TwoBit.build_switch_msg(False, device_address=2) +THREE_ON = TwoBit.build_switch_msg(True, device_address=3) +THREE_OFF = TwoBit.build_switch_msg(False, device_address=3) +FOUR_ON = TwoBit.build_switch_msg(True, device_address=4) +FOUR_OFF = TwoBit.build_switch_msg(False, device_address=4) +MYHOUSE_ALL_ON = TwoBit.build_switch_msg(True, house_address=0x12345) + +tests = [ALL_ON, ALL_OFF, ONE_ON, ONE_OFF, TWO_ON, TWO_OFF, THREE_ON, THREE_OFF, FOUR_ON, FOUR_OFF, MYHOUSE_ALL_ON] + +for t in tests: + print('') + print(ashex(t)) + +# END + + diff --git a/src/energenie/encoder.py b/src/energenie/encoder.py deleted file mode 100644 index 4cc0cec..0000000 --- a/src/energenie/encoder.py +++ /dev/null @@ -1,169 +0,0 @@ -# encoder.py 27/03/2016 D.J.Whale -# -# payload encoder for use with OOK payloads - -ALL_SOCKETS = 0 - -# The preamble is now stored in the payload, -# this is more predictable than using the radio sync feature -PREAMBLE = [0x80, 0x00, 0x00, 0x00] - -# This generates a 20*4 bit address i.e. 10 bytes -# The number generated is always the same -# This is the random 'Energenie address prefix' -# The switch number is encoded in the payload -# 0000 00BA gets encoded as: -# 128 64 32 16 8 4 2 1 -# 1 B B 0 1 A A 0 -#payload = [] -#for i in range(10): -# j = i + 5 -# payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) -#dumpPayloadAsHex(payload) - -#this is just a fixed address generator, from the C code -#payload = [] -#for i in range(10): -# j = i + 5 -# payload.append(8 + (j&1) * 6 + 128 + (j&2) * 48) -#dumpPayloadAsHex(payload) -# binary = 0110 1100 0110 1100 0110 -# hex = 6 C 6 C 6 - -DEFAULT_ADDR = 0x6C6C6 - -# 5 6 7 8 9 10 11 12 13 14 -# 1(01) 1(10) 1(11) 0(00) 0(01) 0(10) 0(11) 1(00) 1(01) 1(10) -DEFAULT_ADDR_ENC = [0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8, 0xee, 0x88, 0x8e, 0xe8] - -# D0=high, D1=high, D2-high, D3=high (S1 on) sent as:(D0D1D2D3) -# 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 -# 1 B B 0 1 A A 0 1 B B 0 1 A A 0 -# 1 1 1 0 1 1 1 0 1 1 1 0 1 1 1 0 - -SW1_ON_ENC = [0xEE, 0xEE] # 1111 sent as 1111 - -# D0=high, D1=high, D2=high, D3=low (S1 off) -# 128 64 32 16 8 4 2 1 128 64 32 16 8 4 2 1 -# 1 B B 0 1 A A 0 1 B B 0 1 A A 0 -# 1 1 1 0 1 1 1 0 1 1 1 0 1 0 0 0 - -SW1_OFF_ENC = [0xEE, 0xE8] # 1110 sent as 0111 - - -def ashex(payload): - line = "" - for b in payload: - line += str(hex(b)) + " " - return line - - -def build_relay_msg(relayState=False): - """Temporary test code to prove we can turn the relay on or off""" - - payload = PREAMBLE - - if relayState: # ON - payload += SW1_ON_ENC - - else: # OFF - payload += SW1_OFF_ENC - - return payload - - -def build_test_message(pattern): - """build a test message for a D3D2D1D0 control patter""" - payload = PREAMBLE + DEFAULT_ADDR_ENC - pattern &= 0x0F - control = encode_bits(pattern, 4) - payload += control - return payload - - -def build_switch_msg(state, device_address=ALL_SOCKETS, house_address=None): - """Build a message to turn a switch on or off""" - #print("build: state:%s, device:%d, house:%s" % (str(state), device_address, str(house_address))) - - if house_address == None: - house_address = DEFAULT_ADDR - - payload = [] + PREAMBLE - payload += encode_bits((house_address & 0x0F0000) >> 16, 4) - payload += encode_bits((house_address & 0x00FF00) >> 8, 8) - payload += encode_bits((house_address & 0x0000FF), 8) - - # Coded as per the (working) C code, as it is transmitted D0D1D2D3: - # D 0123 - # b 3210 - # 0000 UNUSED 0 - # 0001 UNUSED 1 - # 0010 socket 4 off 2 - # 0011 socket 4 on 3 - # 0100 UNUSED 4 - # 0101 UNUSED 5 - # 0110 socket 2 off 6 - # 0111 socket 2 on 7 - # 1000 UNUSED 8 - # 1001 UNUSED 9 - # 1010 socket 3 off A - # 1011 socket 3 on B - # 1100 all off C - # 1101 all on D - # 1110 socket 1 off E - # 1111 socket 1 on F - - if not state: # OFF - bits = 0x00 - else: # ON - bits = 0x01 - - if device_address == ALL_SOCKETS: - bits |= 0x0C # ALL - elif device_address == 1: - bits |= 0x0E - elif device_address == 2: - bits |= 0x06 - elif device_address == 3: - bits |= 0x0A - elif device_address == 4: - bits |= 0x02 - - payload += encode_bits(bits, 4) - #print("encoded as:%s" % ashex(payload)) - return payload - - -def encode_bytes(data): - """Turn a list of bytes into a modulated pattern equivalent""" - #print("modulate_bytes: %s" % ashex(data)) - payload = [] - for b in data: - payload += encode_bits(b, 8) - #print(" returns: %s" % ashex(payload)) - return payload - - -ENCODER = [0x88, 0x8E, 0xE8, 0xEE] - -def encode_bits(data, number): - """Turn bits into n bytes of modulation patterns""" - # 0000 00BA gets encoded as: - # 128 64 32 16 8 4 2 1 - # 1 B B 0 1 A A 0 - # i.e. a 0 is a short pulse, a 1 is a long pulse - #print("modulate_bits %s (%s)" % (ashex(data), str(number))) - - shift = number-2 - encoded = [] - for i in range(int(number/2)): - bits = (data >> shift) & 0x03 - #print(" shift %d bits %d" % (shift, bits)) - encoded.append(ENCODER[bits]) - shift -= 2 - #print(" returns:%s" % ashex(encoded)) - return encoded - - -# END - diff --git a/src/energenie/encoder_test.py b/src/energenie/encoder_test.py deleted file mode 100644 index 2967bd6..0000000 --- a/src/energenie/encoder_test.py +++ /dev/null @@ -1,41 +0,0 @@ -# test_encoder.py 27/03/2016 D.J.Whale -# -# Test the OOK message encoder - -import encoder - -def ashex(data): - if type(data) == list: - line = "" - for b in data: - line += str(hex(b)) + " " - return line - else: - return str(hex(data)) - - -#----- TEST APPLICATION ------------------------------------------------------- - -print("*" * 80) - -ALL_ON = encoder.build_switch_msg(True) -ALL_OFF = encoder.build_switch_msg(False) -ONE_ON = encoder.build_switch_msg(True, device_address=1) -ONE_OFF = encoder.build_switch_msg(False, device_address=1) -TWO_ON = encoder.build_switch_msg(True, device_address=2) -TWO_OFF = encoder.build_switch_msg(False, device_address=2) -THREE_ON = encoder.build_switch_msg(True, device_address=3) -THREE_OFF = encoder.build_switch_msg(False, device_address=3) -FOUR_ON = encoder.build_switch_msg(True, device_address=4) -FOUR_OFF = encoder.build_switch_msg(False, device_address=4) -MYHOUSE_ALL_ON = encoder.build_switch_msg(True, house_address=0x12345) - -tests = [ALL_ON, ALL_OFF, ONE_ON, ONE_OFF, TWO_ON, TWO_OFF, THREE_ON, THREE_OFF, FOUR_ON, FOUR_OFF, MYHOUSE_ALL_ON] - -for t in tests: - print('') - print(ashex(t)) - -# END - - diff --git a/src/energenie/radio_test.py b/src/energenie/radio_test.py index b02f117..77a19ff 100644 --- a/src/energenie/radio_test.py +++ b/src/energenie/radio_test.py @@ -17,7 +17,7 @@ # The 'radio' module knows nothing about the Energenie (HS1527) bit encoding, # so this test code manually encodes the bits. -# For the full Python stack, there is an encoder module that can generate +# For the full Python stack, there is a TwoBit module that can generate # specific payloads. Repeats are done in radio_transmitter. # The HRF preamble feature is no longer used, it's more predictable to # put the preamble in the payload. diff --git a/src/legacy.py b/src/legacy.py index 4bf3ab7..6759a41 100644 --- a/src/legacy.py +++ b/src/legacy.py @@ -8,7 +8,7 @@ import time -from energenie import encoder, radio +from energenie import TwoBit, radio # How many times to send messages in the driver fast loop # Present version of driver limits to 15 @@ -39,18 +39,18 @@ HOUSE_ADDRESS = None # Use default energenie quasi-random address 0x6C6C6 ##HOUSE_ADDRESS = 0xA0170 # Captured address of David's RF hand controller -ALL_ON = encoder.build_switch_msg(True, house_address=HOUSE_ADDRESS) -ONE_ON = encoder.build_switch_msg(True, device_address=1, house_address=HOUSE_ADDRESS) -TWO_ON = encoder.build_switch_msg(True, device_address=2, house_address=HOUSE_ADDRESS) -THREE_ON = encoder.build_switch_msg(True, device_address=3, house_address=HOUSE_ADDRESS) -FOUR_ON = encoder.build_switch_msg(True, device_address=4, house_address=HOUSE_ADDRESS) +ALL_ON = TwoBit.build_switch_msg(True, house_address=HOUSE_ADDRESS) +ONE_ON = TwoBit.build_switch_msg(True, device_address=1, house_address=HOUSE_ADDRESS) +TWO_ON = TwoBit.build_switch_msg(True, device_address=2, house_address=HOUSE_ADDRESS) +THREE_ON = TwoBit.build_switch_msg(True, device_address=3, house_address=HOUSE_ADDRESS) +FOUR_ON = TwoBit.build_switch_msg(True, device_address=4, house_address=HOUSE_ADDRESS) ON_MSGS = [ALL_ON, ONE_ON, TWO_ON, THREE_ON, FOUR_ON] -ALL_OFF = encoder.build_switch_msg(False, house_address=HOUSE_ADDRESS) -ONE_OFF = encoder.build_switch_msg(False, device_address=1, house_address=HOUSE_ADDRESS) -TWO_OFF = encoder.build_switch_msg(False, device_address=2, house_address=HOUSE_ADDRESS) -THREE_OFF = encoder.build_switch_msg(False, device_address=3, house_address=HOUSE_ADDRESS) -FOUR_OFF = encoder.build_switch_msg(False, device_address=4, house_address=HOUSE_ADDRESS) +ALL_OFF = TwoBit.build_switch_msg(False, house_address=HOUSE_ADDRESS) +ONE_OFF = TwoBit.build_switch_msg(False, device_address=1, house_address=HOUSE_ADDRESS) +TWO_OFF = TwoBit.build_switch_msg(False, device_address=2, house_address=HOUSE_ADDRESS) +THREE_OFF = TwoBit.build_switch_msg(False, device_address=3, house_address=HOUSE_ADDRESS) +FOUR_OFF = TwoBit.build_switch_msg(False, device_address=4, house_address=HOUSE_ADDRESS) OFF_MSGS = [ALL_OFF, ONE_OFF, TWO_OFF, THREE_OFF, FOUR_OFF] @@ -127,8 +127,8 @@ while True: p = readin("number 0..F") p = int(p, 16) - msg = encoder.build_test_message(p) - print("pattern %s payload %s" % (str(hex(p)), encoder.ashex(msg))) + msg = TwoBit.build_test_message(p) + print("pattern %s payload %s" % (str(hex(p)), TwoBit.ashex(msg))) radio.send_payload(msg, OUTER_TIMES, INNER_TIMES)