mirror of
https://github.com/tcsenpai/telnet_retro_chat.git
synced 2025-06-06 03:05:35 +00:00
269 lines
9.6 KiB
Python
269 lines
9.6 KiB
Python
from typing import Tuple
|
|
|
|
|
|
class CommandProcessor:
|
|
def __init__(self, user_manager, room_manager):
|
|
self.user_manager = user_manager
|
|
self.room_manager = room_manager
|
|
self.commands = {
|
|
"help": self.cmd_help,
|
|
"login": self.cmd_login,
|
|
"register": self.cmd_register,
|
|
"whoami": self.cmd_whoami,
|
|
"users": self.cmd_users,
|
|
"op": self.cmd_op,
|
|
"deop": self.cmd_deop,
|
|
"kick": self.cmd_kick,
|
|
"ban": self.cmd_ban,
|
|
"broadcast": self.cmd_broadcast,
|
|
"passwd": self.cmd_passwd,
|
|
"join": self.cmd_join,
|
|
"rooms": self.cmd_rooms,
|
|
"createroom": self.cmd_createroom,
|
|
"quit": self.cmd_quit,
|
|
}
|
|
|
|
def process_command(self, message: str, addr: Tuple) -> str:
|
|
"""Process a command and return the response."""
|
|
parts = message.strip().split()
|
|
if not parts:
|
|
return ""
|
|
|
|
cmd = parts[0].lower()
|
|
args = parts[1:]
|
|
|
|
if cmd in self.commands:
|
|
return self.commands[cmd](args, addr)
|
|
return f"Unknown command: {cmd}. Type 'help' for available commands."
|
|
|
|
def cmd_help(self, args, addr):
|
|
"""Show available commands based on user's role."""
|
|
is_admin = self.user_manager.is_admin(addr)
|
|
is_authenticated = not self.user_manager.get_username(addr).startswith("guest_")
|
|
|
|
# Commands for all users (including guests)
|
|
guest_commands = {
|
|
"help": "Show this help message",
|
|
"login": "Login with username and password",
|
|
"whoami": "Show current username",
|
|
"register": "Register new user",
|
|
"quit": "Disconnect from server",
|
|
}
|
|
|
|
# Commands for authenticated users
|
|
auth_commands = {
|
|
"broadcast": "Broadcast a message to all users",
|
|
"users": "List online users",
|
|
"join": "Join a chat room",
|
|
"rooms": "List available rooms",
|
|
}
|
|
|
|
# Admin commands
|
|
admin_commands = {
|
|
"op": "Give admin privileges to user",
|
|
"deop": "Remove admin privileges from user",
|
|
"kick": "Disconnect a user from the server",
|
|
"ban": "Ban a username from the server",
|
|
"createroom": "Create a new chat room",
|
|
}
|
|
|
|
# Build help message
|
|
help_msg = [
|
|
"+-------------COMMANDS-------------+",
|
|
"# Basic Commands: #",
|
|
]
|
|
|
|
# Add guest commands
|
|
for cmd, desc in guest_commands.items():
|
|
help_msg.append(f"# /{cmd:<10} - {desc:<15} #")
|
|
|
|
# Add authenticated user commands
|
|
if is_authenticated:
|
|
help_msg.append("# #")
|
|
help_msg.append("# User Commands: #")
|
|
for cmd, desc in auth_commands.items():
|
|
help_msg.append(f"# /{cmd:<10} - {desc:<15} #")
|
|
|
|
# Add admin commands
|
|
if is_admin:
|
|
help_msg.append("# #")
|
|
help_msg.append("# Admin Commands: #")
|
|
for cmd, desc in admin_commands.items():
|
|
help_msg.append(f"# /{cmd:<10} - {desc:<15} #")
|
|
|
|
help_msg.append("+--------------------------------+")
|
|
return "\n".join(help_msg)
|
|
|
|
def cmd_login(self, args, addr):
|
|
"""Handle login command."""
|
|
if len(args) != 2:
|
|
return "Usage: login <username> <password>"
|
|
|
|
username, password = args
|
|
if self.user_manager.authenticate(username, password):
|
|
self.user_manager.register_session(addr, username)
|
|
self.user_manager.message_timestamps[addr] = [] # Reset rate limit
|
|
return f"Successfully logged in as {username}"
|
|
return "Invalid username or password"
|
|
|
|
def cmd_register(self, args, addr):
|
|
"""Handle register command."""
|
|
if len(args) != 2:
|
|
return "Usage: register <username> <password>"
|
|
|
|
username, password = args
|
|
if self.user_manager.add_user(username, password):
|
|
return f"User {username} registered successfully"
|
|
return "Username already exists"
|
|
|
|
def cmd_whoami(self, args, addr):
|
|
"""Show current username."""
|
|
username = self.user_manager.get_username(addr)
|
|
return f"You are: {username}"
|
|
|
|
def cmd_users(self, args, addr):
|
|
"""List online users."""
|
|
# Check if user is authenticated (not a guest)
|
|
username = self.user_manager.get_username(addr)
|
|
if username.startswith("guest_"):
|
|
return "You must be logged in to list users"
|
|
|
|
users = [
|
|
f"{addr}: {user}"
|
|
for addr, user in self.user_manager.active_sessions.items()
|
|
]
|
|
return "Online users:\n" + "\n".join(users)
|
|
|
|
def cmd_op(self, args, addr):
|
|
"""Make a user admin."""
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You don't have permission to use this command"
|
|
if len(args) != 1:
|
|
return "Usage: op <username>"
|
|
username = args[0]
|
|
if username in self.user_manager.users:
|
|
self.user_manager.users[username]["role"] = "admin"
|
|
self.user_manager._save_users()
|
|
return f"User {username} is now an admin"
|
|
return "User not found"
|
|
|
|
def cmd_deop(self, args, addr):
|
|
"""Remove admin status from user."""
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You don't have permission to use this command"
|
|
if len(args) != 1:
|
|
return "Usage: deop <username>"
|
|
username = args[0]
|
|
if username in self.user_manager.users:
|
|
self.user_manager.users[username]["role"] = "user"
|
|
self.user_manager._save_users()
|
|
return f"User {username} is no longer an admin"
|
|
return "User not found"
|
|
|
|
def cmd_kick(self, args, addr):
|
|
"""Kick a user from the server."""
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You don't have permission to use this command"
|
|
if len(args) != 1:
|
|
return "Usage: kick <username>"
|
|
username = args[0]
|
|
|
|
# Check if target is admin
|
|
for client_addr, client_name in self.user_manager.active_sessions.items():
|
|
if client_name == username:
|
|
if (
|
|
username in self.user_manager.users
|
|
and self.user_manager.users[username]["role"] == "admin"
|
|
):
|
|
return "Cannot kick an admin"
|
|
return f"@KICK@{client_addr}"
|
|
return "User not found or not online"
|
|
|
|
def cmd_ban(self, args, addr):
|
|
"""Ban a username."""
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You don't have permission to use this command"
|
|
if len(args) != 1:
|
|
return "Usage: ban <username>"
|
|
username = args[0]
|
|
if (
|
|
username in self.user_manager.users
|
|
and self.user_manager.users[username]["role"] == "admin"
|
|
):
|
|
return "Cannot ban an admin"
|
|
self.user_manager.ban_user(username)
|
|
return f"User {username} has been banned"
|
|
|
|
def cmd_broadcast(self, args, addr):
|
|
"""Broadcast a message to all users."""
|
|
if not args:
|
|
return "Usage: broadcast <message>"
|
|
|
|
# Check if user is admin
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You must be an admin to broadcast messages"
|
|
|
|
if self.user_manager.is_rate_limited(addr):
|
|
return "Rate limit exceeded. Please wait a moment."
|
|
|
|
return f"@BROADCAST@{' '.join(args)}"
|
|
|
|
def cmd_passwd(self, args, addr):
|
|
"""Change password for current user."""
|
|
if len(args) != 2:
|
|
return "Usage: passwd <old_password> <new_password>"
|
|
|
|
old_password, new_password = args
|
|
username = self.user_manager.get_username(addr)
|
|
|
|
if username.startswith("guest_"):
|
|
return "You must be logged in to change password"
|
|
|
|
if not self.user_manager.authenticate(username, old_password):
|
|
return "Current password is incorrect"
|
|
|
|
if self.user_manager.change_password(username, new_password):
|
|
return "Password changed successfully"
|
|
return "Failed to change password"
|
|
|
|
def cmd_join(self, args, addr):
|
|
"""Join a chat room."""
|
|
if len(args) != 1:
|
|
return "Usage: join <room>"
|
|
|
|
room_name = args[0]
|
|
if self.room_manager.join_room(addr, room_name):
|
|
return f"Joined room: {room_name}"
|
|
return "Room not found"
|
|
|
|
def cmd_rooms(self, args, addr):
|
|
"""List available rooms."""
|
|
rooms = self.room_manager.list_rooms()
|
|
current_room = self.room_manager.get_user_room(addr)
|
|
|
|
room_list = ["+--------AVAILABLE ROOMS--------+"]
|
|
for name, desc in rooms.items():
|
|
current = " (current)" if name == current_room else ""
|
|
room_list.append(f"# {name:<15} - {desc[:20]}{current}")
|
|
room_list.append("+-----------------------------+")
|
|
return "\n".join(room_list)
|
|
|
|
def cmd_createroom(self, args, addr):
|
|
"""Create a new room (admin only)."""
|
|
if not self.user_manager.is_admin(addr):
|
|
return "You don't have permission to create rooms"
|
|
|
|
if len(args) < 1:
|
|
return "Usage: createroom <name> [description]"
|
|
|
|
name = args[0]
|
|
description = " ".join(args[1:]) if len(args) > 1 else ""
|
|
|
|
if self.room_manager.create_room(name, description):
|
|
return f"Room {name} created successfully"
|
|
return "Room already exists"
|
|
|
|
def cmd_quit(self, args, addr):
|
|
"""Disconnect from the server."""
|
|
return "@QUIT@"
|