mirror of
https://github.com/tcsenpai/emesh.git
synced 2025-06-03 01:00:03 +00:00
233 lines
8.0 KiB
Python
Executable File
233 lines
8.0 KiB
Python
Executable File
import os
|
|
import time
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import Header, Footer
|
|
from textual.widgets import Input, Label
|
|
from textual.widgets import Button, RichLog, Sparkline, Checkbox
|
|
from textual.containers import Horizontal, VerticalScroll
|
|
from textual import events
|
|
import threading
|
|
import term
|
|
|
|
|
|
class MeshTerm(App):
|
|
CSS_PATH = "tcss/meshterm.tcss"
|
|
|
|
stopWatchdog = False
|
|
messageToShow = None
|
|
|
|
# INFO Composing the app
|
|
def compose(self) -> ComposeResult:
|
|
"""Create child widgets for the app."""
|
|
yield Header()
|
|
yield Footer()
|
|
# Inputs
|
|
|
|
yield Horizontal(
|
|
VerticalScroll(
|
|
Label("Enter the serial port to connect to: "),
|
|
Input(placeholder="/dev/ttyUSB0", id="port"),
|
|
Button("Connect to radio", id="connect"),
|
|
Checkbox("Enable beaconing:", True, id="beaconingBox"),
|
|
),
|
|
VerticalScroll(
|
|
Label("Unknown Radio Name", id="radio_name"),
|
|
Label(""),
|
|
Input(placeholder="Send something...", id="msg"),
|
|
Button("Send", id="send", disabled=True),
|
|
),
|
|
)
|
|
|
|
yield Horizontal(
|
|
VerticalScroll(
|
|
Button("Exit", id="exit"),
|
|
Label("CONNECTED RADIO INFO"),
|
|
VerticalScroll(
|
|
Label("No radio connected", id="radio_namebox"),
|
|
Label("", id="radio_id"),
|
|
Label("", id="radio_user"),
|
|
),
|
|
),
|
|
VerticalScroll(
|
|
Sparkline(
|
|
[1, 2, 3, 3, 3, 3, 3],
|
|
summary_function=min,
|
|
),
|
|
Label("Received messages:"),
|
|
RichLog(id="received_messages", auto_scroll=True),
|
|
),
|
|
)
|
|
yield Label("", id="message_to_show")
|
|
yield Sparkline(
|
|
[
|
|
1,
|
|
2,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
4,
|
|
4,
|
|
5,
|
|
5,
|
|
6,
|
|
5,
|
|
5,
|
|
4,
|
|
4,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
3,
|
|
2,
|
|
1,
|
|
],
|
|
summary_function=min,
|
|
)
|
|
# Main log
|
|
yield RichLog(id="main_log", auto_scroll=True)
|
|
|
|
# NOTE Here we start the watcher thread
|
|
self.watchdog = threading.Thread(name="watchdog", target=self.watcher)
|
|
self.watchdog.start()
|
|
|
|
# SECTION Actions
|
|
|
|
def on_key(self, event: events.Key) -> None:
|
|
"""Handle key events."""
|
|
pass
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle button events."""
|
|
# text_log = self.query_one("#main_log")
|
|
action = str(event.button.id).lower()
|
|
if action == "exit":
|
|
try:
|
|
term.forceQuit = True
|
|
self.stopWatchdog = True
|
|
except Exception as e:
|
|
print("[SYSTEM] Failed to stop thread: %s" % e)
|
|
exit(1)
|
|
elif action == "connect":
|
|
self.connect()
|
|
elif action == "send":
|
|
self.send()
|
|
|
|
# INFO Sending a message to the device
|
|
def send(self):
|
|
if not term.emesh.connected:
|
|
self.messageToShow = "CANNOT SEND MESSAGE: No connection"
|
|
return
|
|
textToSend = self.query_one("#msg").value
|
|
term.emesh.sendRaw(textToSend)
|
|
self.query_one("#msg").value = ""
|
|
self.messageToShow = "MESSAGE SENT: " + textToSend
|
|
self.query_one("#main_log").write(self.messageToShow)
|
|
self.query_one("#received_messages").write("[You] > " + textToSend)
|
|
|
|
# INFO Managing connection to the device
|
|
def connect(self):
|
|
self.query_one("#connect").disabled = True
|
|
self.query_one("#connect").value = "CONNECTING..."
|
|
self.port = self.query_one("#port").value
|
|
self.port = self.port.strip()
|
|
self.messageToShow = "CONNECTING TO " + self.port + "..."
|
|
if not self.port or self.port == "":
|
|
self.port = None
|
|
self.instance = threading.Thread(target=term.main)
|
|
self.instance.start()
|
|
|
|
def change_value(self, id, replacement):
|
|
self.query_one(id).update(replacement)
|
|
|
|
# !SECTION Actions
|
|
|
|
def loadEnv(self):
|
|
self.env = {}
|
|
with open(".env", "r") as f:
|
|
textenv = f.readlines()
|
|
for line in textenv:
|
|
key, value = line.split("=")
|
|
self.env[key.strip()] = value.strip()
|
|
return self.env
|
|
|
|
def saveEnv(self):
|
|
preparedEnv = ""
|
|
for key, value in self.env.items():
|
|
preparedEnv += key + "=" + value + "\n"
|
|
with open(".env", "w") as f:
|
|
f.write(preparedEnv)
|
|
f.flush()
|
|
return self.env
|
|
|
|
def watcher(self):
|
|
while not self.stopWatchdog:
|
|
time.sleep(1)
|
|
# Refreshing the environment variables and setting ours if needed
|
|
try:
|
|
term.emesh.beaconingPrioritySettings = False
|
|
term.emesh.beaconOn = self.query_one("#beaconingBox").value
|
|
print("[WATCHDOG] Refreshing environment variables...")
|
|
os.environ["BEACONING"] = str(term.emesh.beaconOn)
|
|
print(
|
|
"[WATCHDOG] Environment variables refreshed: "
|
|
+ str(os.environ["BEACONING"])
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
"[WARNING] beaconingBox element is not reachable - this may be temporary."
|
|
)
|
|
print("[WARNING] Error: " + str(e))
|
|
# Loading messages into the gui
|
|
try:
|
|
if term.outputs != term.last_output:
|
|
term.last_output = term.outputs
|
|
self.query_one("#main_log").write(term.outputs)
|
|
# Priority to us here
|
|
if self.messageToShow:
|
|
messageToShow = self.messageToShow
|
|
self.messageToShow = None
|
|
else:
|
|
messageToShow = term.messageToShow
|
|
self.change_value("#message_to_show", messageToShow)
|
|
# If we are connected we should get our variables
|
|
if term.emesh.connected:
|
|
name = term.emesh.interface.getShortName()
|
|
self.query_one("#connect").disabled = False
|
|
self.query_one("#connect").value = "Reconnect"
|
|
self.query_one("#radio_name").update(f"Connected to: {name}")
|
|
self.query_one("#send").disabled = False
|
|
# Also updating our infos
|
|
self.query_one("#radio_namebox").update(f"Radio NAME: {name}")
|
|
self.query_one("#radio_id").update(
|
|
f"Radio ID (long name): {str(term.emesh.interface.getLongName())}"
|
|
)
|
|
self.query_one("#radio_user").update(
|
|
f"Radio USER: {str(term.emesh.interface.getMyUser())}"
|
|
)
|
|
# Populating the received messages
|
|
for receivd in term.emesh.msg_received:
|
|
if receivd["portnum"] == "TEXT_MESSAGE_APP":
|
|
headerMessage = (
|
|
"["
|
|
+ str(receivd["from"])
|
|
+ " -> "
|
|
+ str(receivd["to"])
|
|
+ "] > "
|
|
)
|
|
textToShow = headerMessage + receivd["text"]
|
|
self.query_one("#received_messages").write(textToShow)
|
|
term.emesh.msg_received = []
|
|
except Exception as e:
|
|
self.change_value("#message_to_show", "ERROR: " + str(e))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = MeshTerm()
|
|
app.run()
|