diff --git a/gui.py b/gui.py index a72bed6..c203105 100755 --- a/gui.py +++ b/gui.py @@ -1,20 +1,17 @@ -import json import os import time from textual.app import App, ComposeResult from textual.widgets import Header, Footer -from textual.widgets import Input, Label, Pretty -from textual.widgets import Button, Static, RichLog, Sparkline, Checkbox +from textual.widgets import Input, Label +from textual.widgets import Button, RichLog, Sparkline, Checkbox from textual.containers import Horizontal, VerticalScroll -from textual.validation import Function, Number, ValidationResult, Validator -from textual import events, on +from textual import events import threading import term -from dotenv import load_dotenv class MeshTerm(App): - CSS_PATH = "meshterm.tcss" + CSS_PATH = "tcss/meshterm.tcss" stopWatchdog = False messageToShow = None @@ -26,36 +23,72 @@ class MeshTerm(App): yield Footer() # Inputs - yield Horizontal(VerticalScroll( - Label("Enter the serial port to connect to: "), - Input(placeholder="/dev/ttyUSB0", id="port"), - Button("Connect to radio", id="connect"), - Checkbox("Enable beaconing:", True, id="beaconingBox"), - - ), + yield Horizontal( + VerticalScroll( + Label("Enter the serial port to connect to: "), + Input(placeholder="/dev/ttyUSB0", id="port"), + Button("Connect to radio", id="connect"), + Checkbox("Enable beaconing:", True, id="beaconingBox"), + ), VerticalScroll( Label("Unknown Radio Name", id="radio_name"), Label(""), Input(placeholder="Send something...", id="msg"), - Button("Send", id="send", disabled=True) - )) - - yield Horizontal(VerticalScroll( - Button("Exit", id="exit"), - Label("CONNECTED RADIO INFO"), + Button("Send", id="send", disabled=True), + ), + ) + + yield Horizontal( VerticalScroll( - Label("No radio connected", id="radio_namebox"), - Label("", id="radio_id"), - Label("", id="radio_user"), - ) - ), + Button("Exit", id="exit"), + Label("CONNECTED RADIO INFO"), + VerticalScroll( + Label("No radio connected", id="radio_namebox"), + Label("", id="radio_id"), + Label("", id="radio_user"), + ), + ), VerticalScroll( - Sparkline([1, 2, 3, 3, 3, 3, 3], summary_function=min,), - Label("Received messages:"), - RichLog(id="received_messages", auto_scroll=True) - )) + Sparkline( + [1, 2, 3, 3, 3, 3, 3], + summary_function=min, + ), + Label("Received messages:"), + RichLog(id="received_messages", auto_scroll=True), + ), + ) yield Label("", id="message_to_show") - yield Sparkline([1, 2, 3, 3, 3, 3, 3, 3, 4, 4, 5, 5, 6, 5, 5, 4, 4, 3, 3, 3, 3, 3, 3, 3, 2, 1], summary_function=min,) + yield Sparkline( + [ + 1, + 2, + 3, + 3, + 3, + 3, + 3, + 3, + 4, + 4, + 5, + 5, + 6, + 5, + 5, + 4, + 4, + 3, + 3, + 3, + 3, + 3, + 3, + 3, + 2, + 1, + ], + summary_function=min, + ) # Main log yield RichLog(id="main_log", auto_scroll=True) @@ -71,14 +104,14 @@ class MeshTerm(App): def on_button_pressed(self, event: Button.Pressed) -> None: """Handle button events.""" - text_log = self.query_one("#main_log") + # text_log = self.query_one("#main_log") action = str(event.button.id).lower() if action == "exit": try: term.forceQuit = True self.stopWatchdog = True - except: - print("[SYSTEM] Failed to stop thread") + except Exception as e: + print("[SYSTEM] Failed to stop thread: %s" % e) exit(1) elif action == "connect": self.connect() @@ -96,7 +129,7 @@ class MeshTerm(App): self.messageToShow = "MESSAGE SENT: " + textToSend self.query_one("#main_log").write(self.messageToShow) self.query_one("#received_messages").write("[You] > " + textToSend) - + # INFO Managing connection to the device def connect(self): self.query_one("#connect").disabled = True @@ -111,8 +144,9 @@ class MeshTerm(App): def change_value(self, id, replacement): self.query_one(id).update(replacement) + # !SECTION Actions - + def loadEnv(self): self.env = {} with open(".env", "r") as f: @@ -130,7 +164,6 @@ class MeshTerm(App): f.write(preparedEnv) f.flush() return self.env - def watcher(self): while not self.stopWatchdog: @@ -140,17 +173,23 @@ class MeshTerm(App): term.emesh.beaconingPrioritySettings = False term.emesh.beaconOn = self.query_one("#beaconingBox").value print("[WATCHDOG] Refreshing environment variables...") - os.environ['BEACONING'] = str(term.emesh.beaconOn) - print("[WATCHDOG] Environment variables refreshed: " + str(os.environ['BEACONING'])) + os.environ["BEACONING"] = str(term.emesh.beaconOn) + print( + "[WATCHDOG] Environment variables refreshed: " + + str(os.environ["BEACONING"]) + ) except Exception as e: - print("[WARNING] beaconingBox element is not reachable - this may be temporary.") + print( + "[WARNING] beaconingBox element is not reachable - this may be temporary." + ) + print("[WARNING] Error: " + str(e)) # Loading messages into the gui try: - if (term.outputs != term.last_output): + if term.outputs != term.last_output: term.last_output = term.outputs self.query_one("#main_log").write(term.outputs) # Priority to us here - if (self.messageToShow): + if self.messageToShow: messageToShow = self.messageToShow self.messageToShow = None else: @@ -174,7 +213,13 @@ class MeshTerm(App): # Populating the received messages for receivd in term.emesh.msg_received: if receivd["portnum"] == "TEXT_MESSAGE_APP": - headerMessage = "[" + str(receivd["from"]) + " -> " + str(receivd["to"]) + "] > " + headerMessage = ( + "[" + + str(receivd["from"]) + + " -> " + + str(receivd["to"]) + + "] > " + ) textToShow = headerMessage + receivd["text"] self.query_one("#received_messages").write(textToShow) term.emesh.msg_received = [] diff --git a/emesh.py b/libs/emesh.py similarity index 97% rename from emesh.py rename to libs/emesh.py index 283a256..e192d61 100755 --- a/emesh.py +++ b/libs/emesh.py @@ -35,8 +35,9 @@ def onReceive(packet, interface): decoded = packet["decoded"] decoded["from"] = packet["from"] decoded["to"] = packet["to"] - except: + except Exception as e: print("[ERROR] Could not decode packet: discarding it") + print("[ERROR] " + str(e)) return # ANCHOR We have received a packet and we decoded it print(decoded) diff --git a/keys.py b/libs/keys.py similarity index 100% rename from keys.py rename to libs/keys.py diff --git a/meshterm.tcss b/tcss/meshterm.tcss similarity index 100% rename from meshterm.tcss rename to tcss/meshterm.tcss diff --git a/term.py b/term.py index 91455d0..e4db0ba 100755 --- a/term.py +++ b/term.py @@ -1,4 +1,5 @@ -import emesh +import libs.emesh as emesh +import builtins as __builtin__ import time import os from dotenv import load_dotenv @@ -15,29 +16,32 @@ forceQuit = False beaconCooldown = 0 -import builtins as __builtin__ + # Overriding print for the GUI def print(*args, **kwargs): global outputs outputs = "".join(map(str, args)) __builtin__.print(*args, **kwargs) + # INFO Initializing the emesh structure def init(): print("[SYSTEM] Starting EMesh...") vars = preparse() - emesh.connect(vars['port']) + emesh.connect(vars["port"]) print("[LOADER] Initialized") + # INFO Parsing our environment variables def preparse(): load_dotenv() vars = {} # Parsing the port - if not os.getenv('PORT') == "default": - vars['port'] = os.getenv('PORT') - print(os.getenv('PORT')) - return vars + if not os.getenv("PORT") == "default": + vars["port"] = os.getenv("PORT") + print(os.getenv("PORT")) + return vars + def main(): global beaconCooldown @@ -49,10 +53,10 @@ def main(): print("[MAIN CYCLE] Starting watchdog...") was_connected = False cooldownHeader = False - while not ((os.getenv('FORCE_QUIT')=="True") or forceQuit): + while not ((os.getenv("FORCE_QUIT") == "True") or forceQuit): # This is just a way to check if we need to notify the gui are_connected = emesh.connected - if (are_connected!= was_connected): + if are_connected != was_connected: print("[GUI] Changed connection status") messageToShow = "CONNECTION ESTABLISHED" was_connected = are_connected @@ -61,9 +65,9 @@ def main(): # NOTE Overriding is always possible, otherwise we have to rely on gui.py if emesh.beaconingPrioritySettings: print("[MAIN CYCLE] Terminal mode: getting beaconing from .env...") - emesh.beaconOn = (os.getenv('BEACONING')=="True") + emesh.beaconOn = os.getenv("BEACONING") == "True" else: - print("[MAIN CYCLE] GUI mode: getting beaconing from GUI...") + print("[MAIN CYCLE] GUI mode: getting beaconing from GUI...") print(f"[MAIN CYCLE] Beaconing: {emesh.beaconOn}") # NOTE As the scenarios can include long range radios, we have low bandwidth. # By waiting N seconds between beacons, we ensure that we are not beaconing @@ -71,27 +75,28 @@ def main(): if emesh.beaconOn: print("[MAIN CYCLE] Checking for beacon cooldown...") # The following keeps the code running while we cooldown beaconing too - if (beaconCooldown > 0): + if beaconCooldown > 0: if not cooldownHeader: print("+++ COOLDOWN ACTIVE +++") cooldownHeader = True - isMultipleOfTen = (beaconCooldown % 10 == 0) + isMultipleOfTen = beaconCooldown % 10 == 0 if isMultipleOfTen: print(f"[MAIN CYCLE] Beacon cooldown: {str(beaconCooldown)}") beaconCooldown -= 1 else: print("*** COOLDOWN COMPLETE ***") print("[MAIN CYCLE] Beaconing is activated, proceeding...") - beaconCooldown = int(os.getenv('BEACONING_INTERVAL')) + beaconCooldown = int(os.getenv("BEACONING_INTERVAL")) emesh.beacon() print("[MAIN CYCLE] Beacon emitted. Proceeding to the next cycle...") else: print("[MAIN CYCLE] Beaconing is not activated, proceeding...") # Sleep for N seconds # print("[MAIN CYCLE] Sleeping for " + os.getenv('SLEEP_INTERVAL') + " seconds") - time.sleep(int(os.getenv('SLEEP_INTERVAL'))) + time.sleep(int(os.getenv("SLEEP_INTERVAL"))) # print("[MAIN CYCLE] Sleeping complete. Proceeding to the next cycle...") - + + print("[SYSTEM] Ready to start.")