From 05929a99c470c423f00f11754675e0399ba97f9c Mon Sep 17 00:00:00 2001 From: tcsenpai Date: Wed, 19 Feb 2025 11:07:39 +0100 Subject: [PATCH] first version --- .env | 1 + .gitignore | 7 ++ data/banner.txt | 13 +++ data/users.json | 6 ++ libs/banner.py | 21 ++++ libs/broadcast.py | 23 +++++ libs/process_message.py | 219 ++++++++++++++++++++++++++++++++++++++++ libs/user_manager.py | 134 ++++++++++++++++++++++++ main.py | 215 +++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 10 files changed, 640 insertions(+) create mode 100644 .env create mode 100644 .gitignore create mode 100644 data/banner.txt create mode 100644 data/users.json create mode 100644 libs/banner.py create mode 100644 libs/broadcast.py create mode 100644 libs/process_message.py create mode 100644 libs/user_manager.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.env b/.env new file mode 100644 index 0000000..b2b0d72 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +MAX_CONNECTIONS=10 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8f1a48a --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +_pycache_ +__pycache__ +*.pyc +*.pyo +*.pyd +*.pyw +*.pyz \ No newline at end of file diff --git a/data/banner.txt b/data/banner.txt new file mode 100644 index 0000000..ae1fd82 --- /dev/null +++ b/data/banner.txt @@ -0,0 +1,13 @@ ++------------------+ +# TCSERVER v1.0 # +# # +# ######## # +# # # # +# # #### # # +# # # # # # +# # #### # # +# # # # +# ######## # +# # +# C64 TELNET SRV # ++------------------+ \ No newline at end of file diff --git a/data/users.json b/data/users.json new file mode 100644 index 0000000..ce1d775 --- /dev/null +++ b/data/users.json @@ -0,0 +1,6 @@ +{ + "admin": { + "password": "8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918", + "role": "admin" + } +} \ No newline at end of file diff --git a/libs/banner.py b/libs/banner.py new file mode 100644 index 0000000..f808110 --- /dev/null +++ b/libs/banner.py @@ -0,0 +1,21 @@ +from pathlib import Path + + +def load_banner(): + """Load and return the banner from file.""" + banner_file = Path("data/banner.txt") + + if not banner_file.exists(): + # Return a simple default banner if file doesn't exist + return """ +/======================================\\ +| Welcome to TCServer | +\\======================================/ +""" + + try: + with open(banner_file, "r") as f: + return f.read() + except: + # Return simple banner if there's any error + return "Welcome to TCServer\r\n" diff --git a/libs/broadcast.py b/libs/broadcast.py new file mode 100644 index 0000000..63b089c --- /dev/null +++ b/libs/broadcast.py @@ -0,0 +1,23 @@ +def broadcast_message(connections, message, sender_addr=None): + """ + Broadcasts a message to all connected clients. + + Args: + connections (dict): Dictionary of active connections {addr: socket} + message (str): Message to broadcast + sender_addr (tuple, optional): Address of the sender to exclude + """ + formatted_message = f"\r\n{message}\r\n" + encoded_message = formatted_message.encode("ascii") + + for addr, conn in connections.items(): + try: + # Don't send back to the sender if specified + if sender_addr and addr == sender_addr: + continue + conn.sendall(encoded_message) + except (ConnectionError, BrokenPipeError): + print(f"Error sending message to {addr}") + except: + # If sending fails, we'll let the main loop handle the disconnection + pass diff --git a/libs/process_message.py b/libs/process_message.py new file mode 100644 index 0000000..f22ac57 --- /dev/null +++ b/libs/process_message.py @@ -0,0 +1,219 @@ +from typing import Tuple + + +class CommandProcessor: + def __init__(self, user_manager): + self.user_manager = user_manager + self.commands = { + "help": self.cmd_help, + "login": self.cmd_login, + "register": self.cmd_register, + "whoami": self.cmd_whoami, + "users": self.cmd_users, + "op": self.cmd_op, + "deop": self.cmd_deop, + "kick": self.cmd_kick, + "ban": self.cmd_ban, + "broadcast": self.cmd_broadcast, + "passwd": self.cmd_passwd, + } + + def process_command(self, message: str, addr: Tuple) -> str: + """Process a command and return the response.""" + parts = message.strip().split() + if not parts: + return "" + + cmd = parts[0].lower() + args = parts[1:] + + if cmd in self.commands: + return self.commands[cmd](args, addr) + return f"Unknown command: {cmd}. Type 'help' for available commands." + + def cmd_help(self, args, addr): + """Show available commands based on user's role.""" + is_admin = self.user_manager.is_admin(addr) + is_authenticated = not self.user_manager.get_username(addr).startswith("guest_") + + # Commands for all users (including guests) + guest_commands = { + "help": "Show this help message", + "login": "Login with username and password", + "whoami": "Show current username", + "register": "Register new user", + } + + # Commands for authenticated users + auth_commands = { + "broadcast": "Broadcast a message to all users", + "users": "List online users", + } + + # Admin commands + admin_commands = { + "op": "Give admin privileges to user", + "deop": "Remove admin privileges from user", + "kick": "Disconnect a user from the server", + "ban": "Ban a username from the server", + } + + # Build help message + help_msg = [ + "+-------------COMMANDS-------------+", + "# Basic Commands: #", + ] + + # Add guest commands + for cmd, desc in guest_commands.items(): + help_msg.append(f"# /{cmd:<10} - {desc:<15} #") + + # Add authenticated user commands + if is_authenticated: + help_msg.append("# #") + help_msg.append("# User Commands: #") + for cmd, desc in auth_commands.items(): + help_msg.append(f"# /{cmd:<10} - {desc:<15} #") + + # Add admin commands + if is_admin: + help_msg.append("# #") + help_msg.append("# Admin Commands: #") + for cmd, desc in admin_commands.items(): + help_msg.append(f"# /{cmd:<10} - {desc:<15} #") + + help_msg.append("+--------------------------------+") + return "\n".join(help_msg) + + def cmd_login(self, args, addr): + """Handle login command.""" + if len(args) != 2: + return "Usage: login " + + username, password = args + if self.user_manager.authenticate(username, password): + self.user_manager.register_session(addr, username) + self.user_manager.message_timestamps[addr] = [] # Reset rate limit + return f"Successfully logged in as {username}" + return "Invalid username or password" + + def cmd_register(self, args, addr): + """Handle register command.""" + if len(args) != 2: + return "Usage: register " + + username, password = args + if self.user_manager.add_user(username, password): + return f"User {username} registered successfully" + return "Username already exists" + + def cmd_whoami(self, args, addr): + """Show current username.""" + username = self.user_manager.get_username(addr) + return f"You are: {username}" + + def cmd_users(self, args, addr): + """List online users.""" + # Check if user is authenticated (not a guest) + username = self.user_manager.get_username(addr) + if username.startswith("guest_"): + return "You must be logged in to list users" + + users = [ + f"{addr}: {user}" + for addr, user in self.user_manager.active_sessions.items() + ] + return "Online users:\n" + "\n".join(users) + + def cmd_op(self, args, addr): + """Make a user admin.""" + if not self.user_manager.is_admin(addr): + return "You don't have permission to use this command" + if len(args) != 1: + return "Usage: op " + username = args[0] + if username in self.user_manager.users: + self.user_manager.users[username]["role"] = "admin" + self.user_manager._save_users() + return f"User {username} is now an admin" + return "User not found" + + def cmd_deop(self, args, addr): + """Remove admin status from user.""" + if not self.user_manager.is_admin(addr): + return "You don't have permission to use this command" + if len(args) != 1: + return "Usage: deop " + username = args[0] + if username in self.user_manager.users: + self.user_manager.users[username]["role"] = "user" + self.user_manager._save_users() + return f"User {username} is no longer an admin" + return "User not found" + + def cmd_kick(self, args, addr): + """Kick a user from the server.""" + if not self.user_manager.is_admin(addr): + return "You don't have permission to use this command" + if len(args) != 1: + return "Usage: kick " + username = args[0] + + # Check if target is admin + for client_addr, client_name in self.user_manager.active_sessions.items(): + if client_name == username: + if ( + username in self.user_manager.users + and self.user_manager.users[username]["role"] == "admin" + ): + return "Cannot kick an admin" + return f"@KICK@{client_addr}" + return "User not found or not online" + + def cmd_ban(self, args, addr): + """Ban a username.""" + if not self.user_manager.is_admin(addr): + return "You don't have permission to use this command" + if len(args) != 1: + return "Usage: ban " + username = args[0] + if ( + username in self.user_manager.users + and self.user_manager.users[username]["role"] == "admin" + ): + return "Cannot ban an admin" + self.user_manager.ban_user(username) + return f"User {username} has been banned" + + def cmd_broadcast(self, args, addr): + """Broadcast a message to all users.""" + if not args: + return "Usage: broadcast " + + # Check if user is authenticated (not a guest) + username = self.user_manager.get_username(addr) + if username.startswith("guest_"): + return "You must be logged in to broadcast messages" + + if self.user_manager.is_rate_limited(addr): + return "Rate limit exceeded. Please wait a moment." + + return f"@BROADCAST@{' '.join(args)}" + + def cmd_passwd(self, args, addr): + """Change password for current user.""" + if len(args) != 2: + return "Usage: passwd " + + old_password, new_password = args + username = self.user_manager.get_username(addr) + + if username.startswith("guest_"): + return "You must be logged in to change password" + + if not self.user_manager.authenticate(username, old_password): + return "Current password is incorrect" + + if self.user_manager.change_password(username, new_password): + return "Password changed successfully" + return "Failed to change password" diff --git a/libs/user_manager.py b/libs/user_manager.py new file mode 100644 index 0000000..8b03ac3 --- /dev/null +++ b/libs/user_manager.py @@ -0,0 +1,134 @@ +import json +import os +import random +import string +from pathlib import Path +import time +import hashlib + + +class UserManager: + def __init__(self): + self.users = {} # {username: {'password': hash, 'role': role}} + self.active_sessions = {} # {addr: username} + self.users_file = Path("data/users.json") + self.message_timestamps = {} # {addr: [timestamps]} + self.banned_users = set() # Store banned usernames + self._load_users() + + def _hash_password(self, password): + """Hash a password using SHA-256.""" + return hashlib.sha256(password.encode()).hexdigest() + + def _load_users(self): + """Load users from JSON file.""" + if self.users_file.exists(): + with open(self.users_file) as f: + self.users = json.load(f) + # Convert any plain text passwords to hashed + needs_update = False + for username, data in self.users.items(): + if len(data["password"]) != 64: # Not a SHA-256 hash + data["password"] = self._hash_password(data["password"]) + needs_update = True + if needs_update: + self._save_users() + else: + # Create default admin user if no users exist + self.users_file.parent.mkdir(exist_ok=True) + default_password = self._hash_password("admin") # Hash the default password + self.users = { + "admin": { + "password": default_password, + "role": "admin", + } + } + self._save_users() + + def _save_users(self): + """Save users to JSON file.""" + with open(self.users_file, "w") as f: + json.dump(self.users, f, indent=2) + + def generate_guest_name(self): + """Generate a random guest username.""" + suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=4)) + return f"guest_{suffix}" + + def authenticate(self, username, password): + """Check if username and password match.""" + if username in self.users: + hashed_password = self._hash_password(password) + return self.users[username]["password"] == hashed_password + return False + + def register_session(self, addr, username=None): + """Register a new session for an address.""" + if not username: + username = self.generate_guest_name() + self.active_sessions[addr] = username + return username + + def get_username(self, addr): + """Get username for an address.""" + return self.active_sessions.get(addr) + + def remove_session(self, addr): + """Remove a session.""" + if addr in self.active_sessions: + del self.active_sessions[addr] + + def add_user(self, username, password, role="user"): + """Add a new user.""" + if username not in self.users: + self.users[username] = { + "password": self._hash_password(password), + "role": role, + } + self._save_users() + return True + return False + + def is_admin(self, addr): + """Check if user is admin.""" + username = self.get_username(addr) + return username in self.users and self.users[username]["role"] == "admin" + + def is_rate_limited(self, addr): + """Check if user is rate limited (2 messages per second).""" + if self.is_admin(addr): + return False + + now = time.time() + timestamps = self.message_timestamps.setdefault(addr, []) + + # Remove timestamps older than 1 second + timestamps = [t for t in timestamps if now - t < 1] + self.message_timestamps[addr] = timestamps + + # Check if more than 2 messages in the last second + if len(timestamps) >= 2: + return True + + timestamps.append(now) + return False + + def ban_user(self, username): + """Ban a username.""" + self.banned_users.add(username.lower()) + + def unban_user(self, username): + """Unban a username.""" + self.banned_users.discard(username.lower()) + + def is_banned(self, username): + """Check if username is banned.""" + return username.lower() in self.banned_users + + def change_password(self, username, new_password): + """Change a user's password.""" + if username in self.users: + self.users[username]["password"] = self._hash_password(new_password) + self._save_users() + return True + return False diff --git a/main.py b/main.py new file mode 100644 index 0000000..3bee012 --- /dev/null +++ b/main.py @@ -0,0 +1,215 @@ +import socket +import threading +import os +from dotenv import load_dotenv +from libs.broadcast import broadcast_message +from libs.user_manager import UserManager +from libs.process_message import CommandProcessor +from libs.banner import load_banner + +# Load environment variables +load_dotenv() + +# Server configuration +HOST = "0.0.0.0" # Listen on all available network interfaces +PORT = 2323 # Standard Telnet alternative port +MAX_CONNECTIONS = int( + os.getenv("MAX_CONNECTIONS", "5") +) # Default to 5 if not specified + +# Dictionary to store active connections +active_connections = {} +connections_lock = threading.Lock() # Thread-safe operations on active_connections + +# Initialize user management +user_manager = UserManager() +command_processor = CommandProcessor(user_manager) + + +def handle_backspace(display_buffer, conn): + """Handle backspace/delete character input.""" + if display_buffer: + display_buffer = display_buffer[:-1] + # Send backspace, space, backspace to clear the character + conn.sendall(b"\x08 \x08") + return display_buffer + + +def process_input_byte(byte, display_buffer, conn): + """Process a single byte of input and handle display.""" + if byte in (127, 8): # Backspace/delete + return handle_backspace(display_buffer, conn) + + # Add character to display buffer and echo back + display_buffer += bytes([byte]) + conn.sendall(bytes([byte])) + return display_buffer + + +def process_complete_line(line, addr, active_connections, conn): + """Process a complete line of input and broadcast if valid.""" + try: + # Clean up the input by processing backspaces + cleaned = [] + for char in line: + if char in (8, 127): # Backspace/delete + if cleaned: + cleaned.pop() + else: + cleaned.append(char) + + message = bytes(cleaned).decode("ascii", "ignore").strip() + print(f"Client {addr}: {message}") + + # Process commands + if message.startswith("/"): + response = command_processor.process_command(message[1:], addr) + + # Handle special responses + if response.startswith("@KICK@"): + _, kick_addr = response.split("@", 2) + # Convert string addr back to tuple + kick_addr = eval(kick_addr) # Safe here as we control the input + if kick_addr in active_connections: + active_connections[kick_addr].sendall( + b"\r\nYou have been kicked from the server.\r\n" + ) + active_connections[kick_addr].close() + return + + if response.startswith("@BROADCAST@"): + parts = response.split("@", 2) # Split into max 3 parts + broadcast_msg = parts[2] if len(parts) > 2 else "" + username = user_manager.get_username(addr) + broadcast_message( + active_connections, f"[BROADCAST] {username}: {broadcast_msg}" + ) + return + + conn.sendall(f"\r\n{response}\r\n".encode("ascii")) + else: + # Regular chat message + username = user_manager.get_username(addr) + + # Check if user is authenticated (not a guest) + if username.startswith("guest_"): + conn.sendall( + b"\r\nYou must be logged in to chat. Use /help for commands.\r\n" + ) + return + + # Check rate limit + if user_manager.is_rate_limited(addr): + conn.sendall(b"\r\nRate limit exceeded. Please wait a moment.\r\n") + return + + # Broadcast the message + broadcast_message(active_connections, f"[{username}]: {message}") + except UnicodeDecodeError: + print("UnicodeDecodeError: ", line) + + +def handle_client(conn, addr): + """ + Handles individual client connections. + + Args: + conn (socket): Client socket connection + addr (tuple): Client address information + """ + print(f"Connected by {addr}") + + # Register as guest initially + username = user_manager.register_session(addr) + + # Check if username is banned + if user_manager.is_banned(username): + conn.sendall(b"You are banned from this server.\r\n") + conn.close() + return + + with connections_lock: + active_connections[addr] = conn + + # Send banner and welcome message + banner = load_banner() + welcome_msg = ( + f"\r\n{banner}\r\n" + f"You are connected as: {username}\r\n" + "Type '/help' for available commands.\r\n" + ) + conn.sendall(welcome_msg.encode("ascii")) + broadcast_message( + active_connections, f"* New user connected from {addr[0]} as {username}", addr + ) + + # Initialize buffers + input_buffer = b"" + display_buffer = b"" + + try: + while True: + # Receive data from client + data = conn.recv(1024) + if not data: + break + + # Process each byte for display + for byte in data: + display_buffer = process_input_byte(byte, display_buffer, conn) + + # Handle complete lines + input_buffer += data + if b"\r" in input_buffer: + # Split into complete line and remaining buffer + line, *remaining = input_buffer.split(b"\r", 1) + input_buffer = remaining[0] if remaining else b"" + display_buffer = b"" + + # Remove trailing LF if present + if input_buffer.startswith(b"\n"): + input_buffer = input_buffer[1:] + + process_complete_line(line, addr, active_connections, conn) + + finally: + # Clean up disconnected client + cleanup_client_connection(addr) + + +def cleanup_client_connection(addr): + """Clean up resources when a client disconnects.""" + print(f"Client {addr} disconnected") + with connections_lock: + del active_connections[addr] + broadcast_message(active_connections, f"* User {addr[0]} disconnected") + + +def start_server(): + """ + Starts the server and listens for incoming connections. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: + server.bind((HOST, PORT)) + server.listen(MAX_CONNECTIONS) + print(f"Listening on port {PORT}...") + print(f"Maximum connections allowed: {MAX_CONNECTIONS}") + + while True: + # Accept new connection + conn, addr = server.accept() + + # Check if maximum connections reached + if len(active_connections) >= MAX_CONNECTIONS: + conn.sendall(b"Server is full. Please try again later.\r\n") + conn.close() + continue + + # Create a new thread for each client + client_thread = threading.Thread(target=handle_client, args=(conn, addr)) + client_thread.daemon = True # Thread will close when main program exits + client_thread.start() + + +if __name__ == "__main__": + start_server() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3e338bf --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +python-dotenv \ No newline at end of file