Initial commit

This commit is contained in:
thecookingsenpai 2023-12-25 13:27:02 +01:00
commit 3ce77673da
13 changed files with 649 additions and 0 deletions

5
.env Executable file
View File

@ -0,0 +1,5 @@
BEACONING=True
BEACONING_INTERVAL=60
SLEEP_INTERVAL=1
PORT=/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0
FORCE_QUIT=False

3
.gitignore vendored Executable file
View File

@ -0,0 +1,3 @@
*.key
*.pem
textual

21
LICENSE.md Normal file
View File

@ -0,0 +1,21 @@
MIT LIcense
Copyright (c) 2023 TheCookingSenpai
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

61
README.md Executable file
View File

@ -0,0 +1,61 @@
# eMesh
## A human-usable fast and universal GUI for Meshtastic nodes
### What is eMesh?
eMesh is an interface initially made for myself to be able to better control, understand and play with devices used in the [Meshtastic Project](https://meshtastic.org/). A brief and absolutely not definitive list of supported devices is provided in the next section.
### Compatible devices
Any device that is compatible with the current Meshtastic for Python version should be supported without any problems. However, is important to note that our tests have been made against the following devices:
• LilyGo LORA32
### Features
- [x] A fully functional GUI for Meshtastic even if you are using the terminal (thanks [Textualize for its Textual library](https://github.com/Textualize/textual))
- [x] Serial Port connection (serial or usb over serial)
- [ ] Bluetooth connection (not yet, maybe not ever)
- [x] Support for beaconing (emitting a signal every X seconds)
- [ ] Support for beaconing time customization
- [x] Possibility of specifying the serial port to use
- [x] Listening and showing messages in a clear and clean way
- [x] Easy to use chat-like interface with advanced commands
### Installation and usage
git clone https://github.com/thecookingsenpai/emesh
cd emesh
pip install -r requirements.txt
python gui.py
You can also play with term.py and emesh.py and use directly
python term.py
If you really hate GUIs.
### License
MIT License
Copyright (c) 2023 TheCookingSenpai
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Binary file not shown.

BIN
__pycache__/keys.cpython-39.pyc Executable file

Binary file not shown.

BIN
__pycache__/term.cpython-39.pyc Executable file

Binary file not shown.

105
emesh.py Executable file
View File

@ -0,0 +1,105 @@
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
import time
# Helpers
from hashlib import sha256
import keys
import json
serial_port = None
interface = None
beaconOn = False
# Is set to false on GUI mode so that we can control the beaconing
beaconingPrioritySettings = True
bnum = 0
connected = False
msg_received = []
# NOTE Just an easy wrapper around sha256
def hash(input):
return sha256(input.encode('utf-8')).hexdigest()
def onReceive(packet, interface):
global msg_received
print("[RECEIVED] Received packet: " + str(packet))
# called when a packet arrives
try:
decoded = packet["decoded"]
decoded["from"] = packet["from"]
decoded["to"] = packet["to"]
except:
print("[ERROR] Could not decode packet: discarding it")
return
# ANCHOR We have received a packet and we decoded it
print(decoded)
# Let's take the type of the packet
packet_type = decoded["portnum"]
print("Received packet type: " + packet_type)
msg_received.append(decoded)
def onConnection(interface, topic=pub.AUTO_TOPIC):
global connected
# called when we (re)connect to the radio
# defaults to broadcast, specify a destination ID if you wish
connected = True
theName = json.dumps(interface.getShortName())
interface.sendText(theName + " greets you!")
# INFO Monitor and, if applicable, start beaconing using encrypted messages or plaintext messages
def beacon(encrypted=False):
# If we are supposed to be beaconing, we need to send a beacon and wait 10 seconds
print("[BEACONING] Sending beacon...")
# NOTE Generating a beacon first
our_info = interface.getShortName()
our_timestamp = int(time.time())
global bnum
bnum += 1
beacon = {
"type": "beacon",
"number": bnum,
"timestamp": our_timestamp,
"info": our_info
}
interface.sendText(json.dumps(beacon))
print("[BEACONING] Beacon sent: " + json.dumps(beacon))
def sendRaw(raw):
print("[SEND RAW] Sending raw: " + raw)
interface.sendText(raw)
print("[SEND RAW] Raw sent: " + raw)
def sendRawBytes(raw):
print("[SEND RAW BYTES] Sending raw: " + raw)
interface.sendBytes(raw)
print("[SEND RAW BYTES] Raw sent: " + raw)
def connect(serialPort=None):
global serial_port
global interface
# Ensuring we have an identity
keys.ensure()
# Connecting to the radio
serial_port = serialPort
pub.subscribe(onReceive, "meshtastic.receive")
pub.subscribe(onConnection, "meshtastic.connection.established")
interface = meshtastic.serial_interface.SerialInterface(serial_port)
print("[INITIALIZATION] Connection to radio established")
def listSerials():
# TODO
pass

187
gui.py Executable file
View File

@ -0,0 +1,187 @@
import json
import os
import time
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer
from textual.widgets import Input, Label, Pretty
from textual.widgets import Button, Static, RichLog, Sparkline, Checkbox
from textual.containers import Horizontal, VerticalScroll
from textual.validation import Function, Number, ValidationResult, Validator
from textual import events, on
import threading
import term
from dotenv import load_dotenv
class MeshTerm(App):
CSS_PATH = "meshterm.tcss"
stopWatchdog = False
messageToShow = None
# INFO Composing the app
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
yield Header()
yield Footer()
# Inputs
yield Horizontal(VerticalScroll(
Label("Enter the serial port to connect to: "),
Input(placeholder="/dev/ttyUSB0", id="port"),
Button("Connect to radio", id="connect"),
Checkbox("Enable beaconing:", True, id="beaconingBox"),
),
VerticalScroll(
Label("Unknown Radio Name", id="radio_name"),
Label(""),
Input(placeholder="Send something...", id="msg"),
Button("Send", id="send", disabled=True)
))
yield Horizontal(VerticalScroll(
Button("Exit", id="exit"),
Label("CONNECTED RADIO INFO"),
VerticalScroll(
Label("No radio connected", id="radio_namebox"),
Label("", id="radio_id"),
Label("", id="radio_user"),
)
),
VerticalScroll(
Sparkline([1, 2, 3, 3, 3, 3, 3], summary_function=min,),
Label("Received messages:"),
RichLog(id="received_messages", auto_scroll=True)
))
yield Label("", id="message_to_show")
yield Sparkline([1, 2, 3, 3, 3, 3, 3, 3, 4, 4, 5, 5, 6, 5, 5, 4, 4, 3, 3, 3, 3, 3, 3, 3, 2, 1], summary_function=min,)
# Main log
yield RichLog(id="main_log", auto_scroll=True)
# NOTE Here we start the watcher thread
self.watchdog = threading.Thread(name="watchdog", target=self.watcher)
self.watchdog.start()
# SECTION Actions
def on_key(self, event: events.Key) -> None:
"""Handle key events."""
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button events."""
text_log = self.query_one("#main_log")
action = str(event.button.id).lower()
if action == "exit":
try:
term.forceQuit = True
self.stopWatchdog = True
except:
print("[SYSTEM] Failed to stop thread")
exit(1)
elif action == "connect":
self.connect()
elif action == "send":
self.send()
# INFO Sending a message to the device
def send(self):
if not term.emesh.connected:
self.messageToShow = "CANNOT SEND MESSAGE: No connection"
return
textToSend = self.query_one("#msg").value
term.emesh.sendRaw(textToSend)
self.query_one("#msg").value = ""
self.messageToShow = "MESSAGE SENT: " + textToSend
self.query_one("#main_log").write(self.messageToShow)
self.query_one("#received_messages").write("[You] > " + textToSend)
# INFO Managing connection to the device
def connect(self):
self.query_one("#connect").disabled = True
self.query_one("#connect").value = "CONNECTING..."
self.port = self.query_one("#port").value
self.port = self.port.strip()
self.messageToShow = "CONNECTING TO " + self.port + "..."
if not self.port or self.port == "":
self.port = None
self.instance = threading.Thread(target=term.main)
self.instance.start()
def change_value(self, id, replacement):
self.query_one(id).update(replacement)
# !SECTION Actions
def loadEnv(self):
self.env = {}
with open(".env", "r") as f:
textenv = f.readlines()
for line in textenv:
key, value = line.split("=")
self.env[key.strip()] = value.strip()
return self.env
def saveEnv(self):
preparedEnv = ""
for key, value in self.env.items():
preparedEnv += key + "=" + value + "\n"
with open(".env", "w") as f:
f.write(preparedEnv)
f.flush()
return self.env
def watcher(self):
while not self.stopWatchdog:
time.sleep(1)
# Refreshing the environment variables and setting ours if needed
try:
term.emesh.beaconingPrioritySettings = False
term.emesh.beaconOn = self.query_one("#beaconingBox").value
print("[WATCHDOG] Refreshing environment variables...")
os.environ['BEACONING'] = str(term.emesh.beaconOn)
print("[WATCHDOG] Environment variables refreshed: " + str(os.environ['BEACONING']))
except Exception as e:
print("[WARNING] beaconingBox element is not reachable - this may be temporary.")
# Loading messages into the gui
try:
if (term.outputs != term.last_output):
term.last_output = term.outputs
self.query_one("#main_log").write(term.outputs)
# Priority to us here
if (self.messageToShow):
messageToShow = self.messageToShow
self.messageToShow = None
else:
messageToShow = term.messageToShow
self.change_value("#message_to_show", messageToShow)
# If we are connected we should get our variables
if term.emesh.connected:
name = term.emesh.interface.getShortName()
self.query_one("#connect").disabled = False
self.query_one("#connect").value = "Reconnect"
self.query_one("#radio_name").update(f"Connected to: {name}")
self.query_one("#send").disabled = False
# Also updating our infos
self.query_one("#radio_namebox").update(f"Radio NAME: {name}")
self.query_one("#radio_id").update(
f"Radio ID (long name): {str(term.emesh.interface.getLongName())}"
)
self.query_one("#radio_user").update(
f"Radio USER: {str(term.emesh.interface.getMyUser())}"
)
# Populating the received messages
for receivd in term.emesh.msg_received:
if receivd["portnum"] == "TEXT_MESSAGE_APP":
headerMessage = "[" + str(receivd["from"]) + " -> " + str(receivd["to"]) + "] > "
textToShow = headerMessage + receivd["text"]
self.query_one("#received_messages").write(textToShow)
term.emesh.msg_received = []
except Exception as e:
self.change_value("#message_to_show", "ERROR: " + str(e))
if __name__ == "__main__":
app = MeshTerm()
app.run()

148
keys.py Executable file
View File

@ -0,0 +1,148 @@
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
import rsa
import os
# LINK https://cryptography.io/en/latest/hazmat/primitives/asymmetric/ed25519/
# NOTE Identity keys
privateKey = None
privateBytes = None
publicKey = None
publicBytes = None
# NOTE Encryption keys
privateRSAKey = None
privateRSAPEM = None
publicRSAKey = None
publicRSAPEM = None
# INFO Common entry point to authentication
def ensure():
if(
os.path.exists("private.key")
):
print("[ED25519] Loading ed25519 private key...")
load()
else:
print("[ED25519] Creating ed25519 private key...")
create()
def create():
global privateKey
global privateBytes
# ED25519 Creation
print("[ED25519] Generating ed25519 identity...")
privateKey = ed25519.Ed25519PrivateKey.generate()
print(privateKey)
privateBytes = privateKey.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
print(privateBytes.hex())
# Public Key Creation
publicDerivation()
# RSA Creation
derive()
# Writing file
save()
def load(filepath="./"):
global privateBytes
# Reading file
with open(filepath + "private.key", "rb") as keyFile:
privateBytes = keyFile.read()
# Loading key
try:
loadBytes(privateBytes)
print("[ED25519] Loaded ed25519 private key from file [+]")
except Exception as e:
print("[ED25519] Could not load ed25519 private key: [X]")
print(e)
exit()
# INFO privateBytesProvided must be the same kind of data as the privateBytes (aka bytes)
def loadBytes(privateBytesProvided: bytes):
global privateKey
print("[ED25519] Loading ed25519 private key from bytes... [*]")
privateKey = ed25519.Ed25519PrivateKey.from_private_bytes(privateBytesProvided)
print("[ED25519] Loaded ed25519 private key from bytes [+]")
#print(privateKey)
# Public Key Creation
publicDerivation()
# RSA Creation
derive()
# INFO Deriving a public key from the private key
def publicDerivation():
global publicKey
global publicBytes
print("[ED25519] Generating ed25519 public key...[*]")
publicKey = privateKey.public_key()
#print(publicKey)
publicBytes = publicKey.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
print("[ED25519] We are: " + publicBytes.hex())
print("[ED25519] Generated ed25519 public key [+]")
#print(publicBytes.hex())
# INFO RSA Derivation
def derive():
global privateRSAKey
global privateRSAPEM
global publicRSAKey
global publicRSAPEM
# RSA Creation
print("[RSA] Generating RSA keys from ed25519 identity... [*]")
privateRSAKey = rsa.generate_key(privateBytes.hex()) # So that the two are linked
privateRSAPEM = privateRSAKey.exportKey("PEM")
publicRSAKey = privateRSAKey.public_key()
publicRSAPEM = publicRSAKey.exportKey("PEM")
print("[RSA] Generated RSA keys from ed25519 identity [+]")
#print(privateRSAPEM)
#print(publicRSAPEM)
# INFO Encrypting a message (returning bytes)
def encrypt(message, publicKey=None):
global publicRSAKey
# Supporting self encryption
if not publicKey:
publicKey = publicRSAKey
# Generating the encrypted message
encrypted = rsa.encrypt(message, publicKey)
return encrypted
# INFO Decrypting a message (returning bytes)
def decrypt(message, privateKey=None):
global privateRSAKey
# Supporting self decryption by default
if not privateKey:
privateKey = privateRSAKey
# Generating the decrypted message
decrypted = rsa.decrypt(message, privateKey)
return decrypted
# INFO Sign a message after encoding it (returning bytes)
def sign(message):
global privateKey
signature = privateKey.sign(message.encode('utf-8'))
return signature
# INFO Verify a message (returning boolean)
def verify(message, signature, publicKeyProvided=None):
global publicKey
# Supporting self verification
if not publicKeyProvided:
publicKeyProvided = publicKey
# Generating the verified result
return publicKey.verify(signature, message.encode('utf-8'))
# ANCHOR Utilities
def save():
global privateBytes
print("[ED25519] Saving ed25519 key...")
with open("private.key", "wb") as f:
f.write(privateBytes)

10
meshterm.tcss Executable file
View File

@ -0,0 +1,10 @@
Button {
margin: 1 2;
}
.header {
margin: 1 0 0 2;
text-style: bold;
}

10
requirements.txt Executable file
View File

@ -0,0 +1,10 @@
pytap2
pubsub
meshtastic
cryptography
deterministic-rsa-keygen
python-dotenv
textual
textual-dev

99
term.py Executable file
View File

@ -0,0 +1,99 @@
import emesh
import time
import os
from dotenv import load_dotenv
# SECTION GUI Variables
outputs = ""
last_output = ""
messageToShow = ""
last_messageToShow = ""
forceQuit = False
# !SECTION GUI Variables
beaconCooldown = 0
import builtins as __builtin__
# Overriding print for the GUI
def print(*args, **kwargs):
global outputs
outputs = "".join(map(str, args))
__builtin__.print(*args, **kwargs)
# INFO Initializing the emesh structure
def init():
print("[SYSTEM] Starting EMesh...")
vars = preparse()
emesh.connect(vars['port'])
print("[LOADER] Initialized")
# INFO Parsing our environment variables
def preparse():
load_dotenv()
vars = {}
# Parsing the port
if not os.getenv('PORT') == "default":
vars['port'] = os.getenv('PORT')
print(os.getenv('PORT'))
return vars
def main():
global beaconCooldown
global messageToShow
global forceQuit
# INFO Entry point
init()
# Main cycle
print("[MAIN CYCLE] Starting watchdog...")
was_connected = False
cooldownHeader = False
while not ((os.getenv('FORCE_QUIT')=="True") or forceQuit):
# This is just a way to check if we need to notify the gui
are_connected = emesh.connected
if (are_connected!= was_connected):
print("[GUI] Changed connection status")
messageToShow = "CONNECTION ESTABLISHED"
was_connected = are_connected
# NOTE Reloading .env ensures that we can control the app cycle externally
load_dotenv()
# NOTE Overriding is always possible, otherwise we have to rely on gui.py
if emesh.beaconingPrioritySettings:
print("[MAIN CYCLE] Terminal mode: getting beaconing from .env...")
emesh.beaconOn = (os.getenv('BEACONING')=="True")
else:
print("[MAIN CYCLE] GUI mode: getting beaconing from GUI...")
print(f"[MAIN CYCLE] Beaconing: {emesh.beaconOn}")
# NOTE As the scenarios can include long range radios, we have low bandwidth.
# By waiting N seconds between beacons, we ensure that we are not beaconing
# too often and spamming the radio channel with beacons.
if emesh.beaconOn:
print("[MAIN CYCLE] Checking for beacon cooldown...")
# The following keeps the code running while we cooldown beaconing too
if (beaconCooldown > 0):
if not cooldownHeader:
print("+++ COOLDOWN ACTIVE +++")
cooldownHeader = True
isMultipleOfTen = (beaconCooldown % 10 == 0)
if isMultipleOfTen:
print(f"[MAIN CYCLE] Beacon cooldown: {str(beaconCooldown)}")
beaconCooldown -= 1
else:
print("*** COOLDOWN COMPLETE ***")
print("[MAIN CYCLE] Beaconing is activated, proceeding...")
beaconCooldown = int(os.getenv('BEACONING_INTERVAL'))
emesh.beacon()
print("[MAIN CYCLE] Beacon emitted. Proceeding to the next cycle...")
else:
print("[MAIN CYCLE] Beaconing is not activated, proceeding...")
# Sleep for N seconds
# print("[MAIN CYCLE] Sleeping for " + os.getenv('SLEEP_INTERVAL') + " seconds")
time.sleep(int(os.getenv('SLEEP_INTERVAL')))
# print("[MAIN CYCLE] Sleeping complete. Proceeding to the next cycle...")
print("[SYSTEM] Ready to start.")
if __name__ == "__main__":
main()