| | # setup_tool.py 28/05/2016 D.J.Whale |
---|
| | # |
---|
| | # A simple menu-driven setup tool for the Energenie Python library. |
---|
| | |
---|
| | # |
---|
| | # Just be a simple menu system. |
---|
| | # This then means you don't have to have all this in the demo apps |
---|
| | # and the demo apps can just refer to object variables names |
---|
| | # from an assumed auto_create registry, that is built using this setup tool. |
---|
| | |
---|
| | #TODO: Choose a better way to interrupt other than Ctrl-C |
---|
| | #It's very platform independent |
---|
| | #also, the foreground thread gets the KeyboardInterrupt, |
---|
| | #so as soon as threads are added, its a bad way to terminate user entry |
---|
| | #or processing. |
---|
| | |
---|
| | import time |
---|
| | import energenie |
---|
| | from energenie.lifecycle import * |
---|
| | |
---|
| | |
---|
| | #===== GLOBALS ===== |
---|
| | |
---|
| | quit = False |
---|
| | |
---|
| | |
---|
| | #===== INPUT METHODS ========================================================== |
---|
| | |
---|
| | try: |
---|
| | readin = raw_input # Python 2 |
---|
| | except NameError: |
---|
| | readin = input # Python 3 |
---|
| | |
---|
| | quit = False |
---|
| | |
---|
| | |
---|
| | def do_legacy_learn(): |
---|
| | """Repeatedly broadcast a legacy switch message, so you can learn a socket to the pattern""" |
---|
| | # get house code, default to energenie code |
---|
| | hc = readin("House code? (ENTER for default) ") |
---|
| | if hc == "": |
---|
| | house_code = None |
---|
| | else: |
---|
| | house_code = int(hc, 16) |
---|
| | #TODO: error check |
---|
| | |
---|
| | # get switch index, default 1 (0,1,2,3,4) |
---|
| | si = readin("Switch index 1..4? (ENTER for all)") |
---|
| | if si == "": |
---|
| | switch_index = 0 #ALL |
---|
| | else: |
---|
| | switch_index = int(si) |
---|
| | #TODO: error check |
---|
| | |
---|
| | device = energenie.Devices.ENER002((house_code, switch_index)) |
---|
| | |
---|
| | # in a loop until Ctrl-C |
---|
| | try: |
---|
| | while True: #TODO: detect Ctrl-C |
---|
| | print("ON") |
---|
| | device.turn_on() |
---|
| | time.sleep(0.5) |
---|
| | print("OFF") |
---|
| | device.turn_off() |
---|
| | time.sleep(0.5) |
---|
| | except KeyboardInterrupt: |
---|
| | pass # user exit |
---|
| | |
---|
| | |
---|
| | def do_mihome_discovery(): |
---|
| | """Discover any mihome device when it sends reports""" |
---|
| | print("Discovery mode, press Ctrl-C to stop") |
---|
| | energenie.Registry.discovery_ask(energenie.Registry.ask) |
---|
| | try: |
---|
| | while True: |
---|
| | energenie.loop() # Allow receive processing |
---|
| | time.sleep(0.25) # tick fast enough to get messages in quite quickly |
---|
| | except KeyboardInterrupt: |
---|
| | print("Discovery stopped") |
---|
| | |
---|
| | def get_house_code(): |
---|
| | """Get a house code or default to Energenie code""" |
---|
| | |
---|
| | while True: |
---|
| | try: |
---|
| | hc = readin("House code? (ENTER for default) ") |
---|
| | if hc == "": return None |
---|
| | |
---|
| | except KeyboardInterrupt: |
---|
| | return None # user abort |
---|
| | |
---|
| | try: |
---|
| | house_code = int(hc, 16) |
---|
| | return house_code |
---|
| | |
---|
| | except ValueError: |
---|
| | print("Must enter a number") |
---|
| | |
---|
| | |
---|
| | def get_device_index(): |
---|
| | """get switch index, default 1 (0,1,2,3,4)""" |
---|
| | |
---|
| | while True: |
---|
| | try: |
---|
| | di = readin("Device index 1..4? (ENTER for all)") |
---|
| | except KeyboardInterrupt: |
---|
| | return None # user abort |
---|
| | |
---|
| | if di == "": return 0 # ALL |
---|
| | try: |
---|
| | device_index = int(di) |
---|
| | return device_index |
---|
| | |
---|
| | except ValueError: |
---|
| | print("Must enter a number") |
---|
| | |
---|
| | |
---|
| | def show_registry(): |
---|
| | """Show the registry as a numbered list""" |
---|
| | |
---|
| | names = energenie.registry.names() |
---|
| | i=1 |
---|
| | for name in names: |
---|
| | print("%d. %s %s" % (i, name, energenie.registry.get(name))) |
---|
| | i += 1 |
---|
| | return names |
---|
| | |
---|
| | |
---|
| | def get_device_name(): |
---|
| | """Give user a list of devices and choose one from the list""" |
---|
| | |
---|
| | names = show_registry() |
---|
| | |
---|
| | try: |
---|
| | while True: |
---|
| | i = readin("Which device %s to %s" % (1,len(names))) |
---|
| | try: |
---|
| | device_index = int(i) |
---|
| | break # got it |
---|
| | except ValueError: |
---|
| | print("Must enter a number") |
---|
| | |
---|
| | except KeyboardInterrupt: |
---|
| | return None # nothing chosen, user aborted |
---|
| | |
---|
| | name = names[device_index-1] |
---|
| | print("selected: %s" % name) |
---|
| | |
---|
| | return name |
---|
| | |
---|
| | |
---|
| | #===== ACTION ROUTINES ======================================================== |
---|
| | |
---|
| | def do_legacy_learn(): |
---|
| | """Repeatedly broadcast a legacy switch message, so you can learn a socket to the pattern""" |
---|
| | |
---|
| | # get device |
---|
| | house_code = get_house_code() |
---|
| | device_index = get_device_index() |
---|
| | device = energenie.Devices.ENER002((house_code, device_index)) |
---|
| | |
---|
| | # in a loop until Ctrl-C |
---|
| | try: |
---|
| | while True: |
---|
| | print("ON") |
---|
| | device.turn_on() |
---|
| | time.sleep(0.5) |
---|
| | |
---|
| | print("OFF") |
---|
| | device.turn_off() |
---|
| | time.sleep(0.5) |
---|
| | |
---|
| | except KeyboardInterrupt: |
---|
| | pass # user exit |
---|
| | |
---|
| | |
---|
| | def do_mihome_discovery(): |
---|
| | """Discover any mihome device when it sends reports""" |
---|
| | |
---|
| | print("Discovery mode, press Ctrl-C to stop") |
---|
| | energenie.Registry.discovery_ask(energenie.Registry.ask) |
---|
| | try: |
---|
| | while True: |
---|
| | energenie.loop() # Allow receive processing |
---|
| | time.sleep(0.25) # tick fast enough to get messages in quite quickly |
---|
| | |
---|
| | except KeyboardInterrupt: |
---|
| | print("Discovery stopped") |
---|
| | |
---|
| | |
---|
| | def do_list_registry(): |
---|
| | """List the entries in the registry""" |
---|
| | |
---|
| | show_registry() |
---|
| | |
---|
| | |
---|
| | def do_switch_device(): |
---|
| | """Turn the switch on a socket on and off, to test it""" |
---|
| | |
---|
| | global quit |
---|
| | """Turn the switch on a socket on and off, to test it""" |
---|
| | |
---|
| | #TODO DRY name = select_device() |
---|
| | # select the device from a menu of numbered devices |
---|
| | names = show_registry() |
---|
| | |
---|
| | # ask user for on/off/another/done |
---|
| | i = readin("Which device %s to %s" % (1,len(names))) |
---|
| | device_index = int(i) |
---|
| | #TODO: error check |
---|
| | #TODO: Ctrl-C check |
---|
| | |
---|
| | name = names[device_index-1] |
---|
| | print("selected: %s" % name) |
---|
| | |
---|
| | |
---|
| | name = get_device_name() |
---|
| | device = energenie.registry.get(name) |
---|
| | #TODO: DRY END |
---|
| | |
---|
| | #TODO could list all action methods by introspecting device class |
---|
| | # and build a device specific menu |
---|
| | |
---|
| | def on(): |
---|
| | print("Turning on") |
---|
| | device.turn_on |
---|
| | device.turn_on() |
---|
| | |
---|
| | def off(): |
---|
| | print("Turning off") |
---|
| | device.turn_off |
---|
| | device.turn_off() |
---|
| | |
---|
| | MENU = [ |
---|
| | ("on", on), |
---|
| | ("off", off) |
---|
| |
---|
| | show_menu(MENU) |
---|
| | choice = get_choice((1,len(MENU))) |
---|
| | if choice != None: |
---|
| | handle_choice(MENU, choice) |
---|
| | |
---|
| | except KeyboardInterrupt: |
---|
| | pass # user exit |
---|
| | quit = False |
---|
| | |
---|
| |
---|
| | @untested |
---|
| | def do_show_device_status(): |
---|
| | """Show the readings associated with a device""" |
---|
| | |
---|
| | #TODO DRY name = select_device() |
---|
| | names = show_registry() |
---|
| | |
---|
| | energenie.loop() # allow receive processing #TODO: not long enough for >1 msg?? |
---|
| | |
---|
| | # ask user for on/off/another/done |
---|
| | i = readin("Which device %s to %s" % (1,len(names))) |
---|
| | device_index = int(i) |
---|
| | #TODO: error check |
---|
| | #TODO: Ctrl-C check |
---|
| | |
---|
| | name = names[device_index-1] |
---|
| | print("selected: %s" % name) |
---|
| | name = get_device_name() |
---|
| | device = energenie.registry.get(name) |
---|
| | #TODO: DRY END |
---|
| | |
---|
| | readings = device.get_readings_summary() |
---|
| | print(readings) |
---|
| | |
---|
| | |
---|
| | @untested |
---|
| | def do_watch_devices(): |
---|
| | """Repeatedly show readings for all devices""" |
---|
| | |
---|
| | try: |
---|
| | while True: |
---|
| | energenie.loop() # allow receive processing |
---|
| | print('-' * 80) |
---|
| |
---|
| | |
---|
| | @untested |
---|
| | def do_rename_device(): |
---|
| | """Rename a device in the registry to a different name""" |
---|
| | #This is useful when turning auto discovered names into your own names |
---|
| | |
---|
| | #TODO DRY name = select_device() |
---|
| | names = show_registry() |
---|
| | |
---|
| | # ask user for on/off/another/done |
---|
| | i = readin("Which device %s to %s" % (1,len(names))) |
---|
| | device_index = int(i) |
---|
| | #TODO: error check |
---|
| | #TODO: Ctrl-C check |
---|
| | |
---|
| | name = names[device_index-1] |
---|
| | print("selected: %s" % name) |
---|
| | #TODO: DRY END |
---|
| | |
---|
| | |
---|
| | # This is useful when turning auto discovered names into your own names |
---|
| | |
---|
| | old_name = get_device_name() |
---|
| | new_name = readin("New name?") |
---|
| | |
---|
| | energenie.registry.rename(name, new_name) |
---|
| | energenie.registry.rename(old_name, new_name) |
---|
| | |
---|
| | |
---|
| | @untested |
---|
| | def do_delete_device(): |
---|
| | """Delete a device from the registry so it is no longer recognised""" |
---|
| | |
---|
| | #TODO DRY name = select_device() |
---|
| | names = show_registry() |
---|
| | |
---|
| | # ask user for on/off/another/done |
---|
| | i = readin("Which device %s to %s" % (1,len(names))) |
---|
| | device_index = int(i) |
---|
| | #TODO: error check |
---|
| | #TODO: Ctrl-C check |
---|
| | |
---|
| | name = names[device_index-1] |
---|
| | print("selected: %s" % name) |
---|
| | #TODO: DRY END |
---|
| | name = get_device_name() |
---|
| | |
---|
| | energenie.registry.delete(name) |
---|
| | |
---|
| | |
---|
| | @untested |
---|
| | def do_logging(): |
---|
| | """Enter a mode where all communications are logged to screen and a file""" |
---|
| | |
---|
| | import Logger |
---|
| | |
---|
| | # provide a default incoming message handler for all fsk messages |
---|
| | def incoming(address, message): |
---|
| |
---|
| | |
---|
| | |
---|
| | def do_quit(): |
---|
| | """Finished with the program, so exit""" |
---|
| | |
---|
| | global quit |
---|
| | quit = True |
---|
| | |
---|
| | |
---|
| | #===== MENU =================================================================== |
---|
| | |
---|
| | def show_menu(menu): |
---|
| | """Display a menu on the console""" |
---|
| | |
---|
| | i = 1 |
---|
| | for item in menu: |
---|
| | print("%d. %s" % (i, item[0])) |
---|
| | i += 1 |
---|
| | |
---|
| | |
---|
| | def get_choice(choices): |
---|
| | """Get and validate a numberic choice from the tuple choices(first, last)""" |
---|
| | |
---|
| | first = choices[0] |
---|
| | last = choices[1] |
---|
| | try: |
---|
| | while True: |
---|
| |
---|
| | |
---|
| | |
---|
| | def handle_choice(menu, choice): |
---|
| | """Route to the handler for the given menu choice""" |
---|
| | |
---|
| | menu[choice-1][1]() |
---|
| | |
---|
| | |
---|
| | MAIN_MENU = [ |
---|
| |
---|
| | ("quit", do_quit) |
---|
| | ] |
---|
| | |
---|
| | |
---|
| | #===== MAIN PROGRAM =========================================================== |
---|
| | |
---|
| | def setup_tool(): |
---|
| | """The main program loop""" |
---|
| | |
---|
| | while not quit: |
---|
| | print("MAIN MENU") |
---|
| | show_menu(MAIN_MENU) |
---|
| | choice = get_choice((1,len(MAIN_MENU))) |
---|
| | if not quit: |
---|
| | handle_choice(MAIN_MENU, choice) |
---|
| | |
---|
| | |
---|
| | #----- MAIN ENTRY POINT ------------------------------------------------------- |
---|
| | |
---|
| | if __name__ == "__main__": |
---|
| | |
---|
| | energenie.init() |
---|
| | try: |
---|
| | setup_tool() |
---|
| | finally: |
---|
| | energenie.finished() |
---|
| | |
---|
| | |
---|
| | # END |
---|
| | |
---|
| | |
---|
| | |
---|
| | |