From 366e218e262311c2c11f7310e80dedea4bb03e08 Mon Sep 17 00:00:00 2001 From: SpyDrone Date: Mon, 2 Mar 2026 09:48:49 -0500 Subject: [PATCH] Upload files to "/" --- README.md | 338 +++ config.py | 117 + requirements.txt | 5 + server.py | 5236 +++++++++++++++++++++++++++++++++++++++++++++ setup_windows.bat | 55 + 5 files changed, 5751 insertions(+) create mode 100644 README.md create mode 100644 config.py create mode 100644 requirements.txt create mode 100644 server.py create mode 100644 setup_windows.bat diff --git a/README.md b/README.md new file mode 100644 index 0000000..6afd31c --- /dev/null +++ b/README.md @@ -0,0 +1,338 @@ +# 🚀 Advanced Chat Server & Client + +A feature-rich, production-ready chat server and client built with Python and WebSockets. Supports **both centralized server mode and peer-to-peer (P2P) LAN communication** with **file sharing capabilities**. Works on local development and hosted deployment (Ubuntu). + +## ✨ Features + +### Core Features +- ✅ **Unified Client**: Single client for both Server and P2P LAN modes +- ✅ **Peer-to-Peer (P2P)**: Chat without a server on your local network +- ✅ **File Sharing**: Send and receive files up to 50MB in both modes +- ✅ **Real-time Messaging**: Instant communication with low latency +- ✅ **Multiple Rooms**: Join different channels with password protection +- ✅ **User Authentication**: Secure identification for server-based chat +- ✅ **Admin System**: Comprehensive moderation tools +- ✅ **SSL/TLS Support**: Encrypted connections for server mode + +### Advanced Features +1. **Account & Registration** + - User registration with password hashing + - Secure authentication + - Nickname management + +2. **Room Management** + - Create public and private rooms + - Password-protected rooms + - Room topics and announcements + - User lists per room + +3. **Invite System** + - Generate time-limited invite codes + - Configurable max uses + - Invite tracking and revocation + +4. **Message Search** + - Full-text search across messages + - Filter by room, user, and date + - Search history + +5. **Message Replies** + - Thread-based conversations + - Quote original messages + - Reply tracking + +6. **User Blocks** + - Block/ignore users + - Temporary or permanent blocks + - Block list management + +7. **Command Aliases** + - Create custom command shortcuts + - Alias usage tracking + - Personal alias management + +8. **Room Announcements** + - Broadcast important messages + - Time-limited announcements + - Announcement history + +9. **Scheduled Messages** + - Schedule messages for future delivery + - Multiple time formats (HH:MM, +1h, +30m, +7d) + - Cancel scheduled messages + +10. **Room Bookmarks** + - Save favorite rooms/commands + - Quick access to bookmarks + - Usage statistics + +11. **Message Export** + - Export room history + - Export user messages + - Multiple export formats + +12. **Room Backups** + - Password-protected backups + - Full room state preservation + - Backup restoration + +13. **Notification Rules** + - Keyword notifications + - User-specific notifications + - @mention support + - Cooldown periods + +14. **Offline Messages** + - Queue messages for offline users + - Automatic delivery on reconnect + - Message expiration + +15. **Admin System** + - Admin privilege management + - Clear chat history + - User management + - System monitoring + +## 🛠️ Installation + +### Prerequisites +- Python 3.8 or higher +- pip (Python package manager) +- **Optional (for Advanced Features)**: + - `cryptography`: Required for End-to-End Encryption (E2EE) + - `Pillow`: Required for ASCII Image Art features + +### Windows (Local Development) + +1. Clone or download this repository +2. Run the setup script: + ```cmd + setup_windows.bat + ``` +3. Edit `.env` file with your configuration +4. Start the server: + ```cmd + venv\Scripts\activate + python server.py + ``` +5. In a new terminal, start the client: + ```cmd + venv\Scripts\activate + python client.py + ``` + +### Ubuntu (Production Deployment) + +1. Clone or download this repository +2. Make the deployment script executable: + ```bash + chmod +x deploy_ubuntu.sh + ``` +3. Run the deployment script: + ```bash + ./deploy_ubuntu.sh + ``` +4. Follow the prompts to configure systemd service and firewall +5. Edit `.env` file with your production configuration +6. If using systemd, the server will start automatically +7. Otherwise, start manually: + ```bash + source venv/bin/activate + python3 server.py + ``` + +### Manual Installation + +```bash +# Create virtual environment +python3 -m venv venv + +# Activate virtual environment +# On Windows: +venv\Scripts\activate +# On Linux/Mac: +source venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Copy environment template +cp .env.example .env + +# Edit .env with your configuration +# Start server +python server.py + +# In another terminal, start client +python client.py +``` + +## ⚙️ Configuration + +Edit the `.env` file to configure the server: + +```env +# Server Configuration +HOST=0.0.0.0 # Bind to all interfaces +PORT=8765 # WebSocket port +MAX_HISTORY=100 # Messages to keep in history + +# Admin Configuration +ADMIN_PASSWORD=admin123 # Change this! + +# SSL Configuration (optional) +USE_SSL=false +SSL_CERT_PATH=/path/to/cert.pem +SSL_KEY_PATH=/path/to/key.pem + +# Database +DB_PATH=chat.db + +# Logging +LOG_LEVEL=INFO +LOG_FILE=chat_server.log + +# Security +MAX_MESSAGE_LENGTH=4096 +MAX_NICKNAME_LENGTH=32 +RATE_LIMIT_MESSAGES=120 +RATE_LIMIT_WINDOW=60 + +# Session +SESSION_TIMEOUT=3600 +KEEPALIVE_INTERVAL=30 +RECONNECT_TIMEOUT=300 + +# Backups +BACKUP_DIR=./backups +AUTO_BACKUP_ENABLED=false +AUTO_BACKUP_INTERVAL=86400 +``` + +## 🎮 Usage + +### Server + +Start the server with default settings: +```bash +python server.py +``` + +Or with custom options: +```bash +python server.py --host 0.0.0.0 --port 8765 --admin-pass mypassword +``` + +### Client + +Start the unified client: +```bash +python client.py +``` + +The client will prompt you to choose a mode: +1. **Central Server Mode**: Connect to a central server (ws://...) +2. **P2P LAN Mode**: Connect directly to peers on your local network + +In **P2P Mode**, it will also ask for a port (80, 443, or custom) and automatically discover others in the same room on your LAN. + +### File Sharing + +To share a file: +1. Type `/send ` +2. Others in the room will see a file offer +3. They can type `/accept ` to download + +Files are saved to the `./downloads` folder by default. + +### Commands + +Type `/help` in the client to see all available commands. + +## 🔒 Security Features + +- **Password Hashing**: PBKDF2-HMAC-SHA256 with salt +- **SSL/TLS Support**: Encrypted connections +- **Rate Limiting**: Prevent spam and abuse (120 msg / 60s) +- **Input Validation**: Sanitize all user inputs +- **Session Management**: Automatic timeout and cleanup +- **Admin Controls**: Privilege-based access control + +## 🌐 Deployment + +### Local Network + +1. Start server on your machine +2. Find your local IP address +3. Clients connect to: `ws://YOUR_LOCAL_IP:8765` + +### Public Internet (Ubuntu Server) + +1. Use the deployment script: `./deploy_ubuntu.sh` +2. Configure firewall to allow port 8765 +3. Clients connect to: `wss://yourdomain.com:8765` + +## 📊 Monitoring + +### Systemd Service (Ubuntu) +- `sudo systemctl status chatserver` +- `sudo journalctl -u chatserver -f` + +### Log Files +- `tail -f chat_server.log` + +## 🗄️ Database + +The server uses SQLite for persistence (`chat.db`). + +## 🐛 Troubleshooting + +### Connection Issues +1. **Cannot connect to server**: Check if server is running and firewall status. +2. **SSL/TLS errors**: Verify certificate paths in `.env`. +3. **Database errors**: Check disk space and permissions. + +## 📝 Development + +### Project Structure + +``` +radio/ +├── server.py # Main server application +├── client.py # Unified client (Server & P2P Mode) +├── config.py # Configuration management +├── requirements.txt # Python dependencies +├── .env.example # Environment template +├── .env # Your configuration +├── setup_windows.bat # Windows setup script +├── deploy_ubuntu.sh # Ubuntu deployment script +├── README.md # Main documentation +├── docs/ # Detailed documentation files +├── chat.db # SQLite database +├── backups/ # Room backups directory +└── logs/ # Log files directory +``` + +### Documentation Index + +For more detailed information, please refer to the files in the `docs/` folder: + +- **QUICKSTART.md**: Step-by-step guide for beginners +- **P2P_GUIDE.md**: In-depth guide for P2P and LAN communication +- **P2P_FEATURES.md**: Technical details of the P2P engine +- **IMPROVEMENTS.md**: Summary of latest optimizations and features +- **QUICK_REFERENCE.md**: Most common commands and tips +- **CHANGELOG.md**: Version history and future plans +- **FINAL_SUMMARY.md**: Overview of the unified chat system architecture + +### Contributing + +Contributions are welcome! Please fork the repository, create a feature branch, and submit a pull request. + +## 📜 License + +This project is open source and available under the MIT License. + +--- + +**Made with ❤️ for the chat community** diff --git a/config.py b/config.py new file mode 100644 index 0000000..b41eb54 --- /dev/null +++ b/config.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +""" +Configuration Management for Chat Server +Supports both environment variables and .env files +""" + +import os +from typing import Optional, Any, cast +from pathlib import Path + +try: + from dotenv import load_dotenv # type: ignore + load_dotenv(override=True) + DOTENV_AVAILABLE = True +except ImportError: + DOTENV_AVAILABLE = False + + +class Config: + """Configuration class with environment variable support""" + + # Server Configuration + HOST: str = os.getenv('HOST', '127.0.0.1') + PORT: int = int(os.getenv('PORT', '8765')) + MAX_HISTORY: int = int(os.getenv('MAX_HISTORY', '100')) + + # Admin Configuration + ADMIN_PASSWORD: str = os.getenv('ADMIN_PASSWORD', '') # Should be set in environment + + # SSL Configuration + USE_SSL: bool = os.getenv('USE_SSL', 'false').lower() == 'true' + SSL_CERT_PATH: Optional[str] = os.getenv('SSL_CERT_PATH') + SSL_KEY_PATH: Optional[str] = os.getenv('SSL_KEY_PATH') + + # Database Configuration + DB_PATH: str = os.getenv('DB_PATH', 'data/chat.db') + + # Logging Configuration + LOG_LEVEL: str = os.getenv('LOG_LEVEL', 'INFO') + LOG_FILE: Optional[str] = os.getenv('LOG_FILE', 'logs/chat_server.log') + + # Security Configuration + MAX_MESSAGE_LENGTH: int = int(os.getenv('MAX_MESSAGE_LENGTH', '4096')) + MAX_NICKNAME_LENGTH: int = int(os.getenv('MAX_NICKNAME_LENGTH', '32')) + RATE_LIMIT_MESSAGES: int = int(os.getenv('RATE_LIMIT_MESSAGES', '120')) + RATE_LIMIT_WINDOW: int = int(os.getenv('RATE_LIMIT_WINDOW', '60')) + + # Session Configuration + SESSION_TIMEOUT: int = int(os.getenv('SESSION_TIMEOUT', '3600')) + KEEPALIVE_INTERVAL: int = int(os.getenv('KEEPALIVE_INTERVAL', '30')) + RECONNECT_TIMEOUT: int = int(os.getenv('RECONNECT_TIMEOUT', '300')) + + # Backup Configuration + BACKUP_DIR: str = os.getenv('BACKUP_DIR', './backups') + AUTO_BACKUP_ENABLED: bool = os.getenv('AUTO_BACKUP_ENABLED', 'false').lower() == 'true' + AUTO_BACKUP_INTERVAL: int = int(os.getenv('AUTO_BACKUP_INTERVAL', '86400')) # 24 hours + + # Room Configuration + ROOM_EXPIRATION_HOURS: int = int(os.getenv('ROOM_EXPIRATION_HOURS', '24')) + AUTO_HISTORY_CLEAR: bool = os.getenv('AUTO_HISTORY_CLEAR', 'false').lower() == 'true' + + @classmethod + def validate(cls) -> bool: + """Validate configuration""" + errors = [] + + # Validate port range + if not (1 <= cls.PORT <= 65535): + errors.append(f"Invalid PORT: {cls.PORT}. Must be between 1-65535") + + # Validate SSL configuration + if cls.USE_SSL: + if not cls.SSL_CERT_PATH or not cls.SSL_KEY_PATH: + errors.append("SSL enabled but SSL_CERT_PATH or SSL_KEY_PATH not set") + elif not Path(cast(str, cls.SSL_CERT_PATH)).exists(): + errors.append(f"SSL certificate not found: {cls.SSL_CERT_PATH}") + elif not Path(cast(str, cls.SSL_KEY_PATH)).exists(): + errors.append(f"SSL key not found: {cls.SSL_KEY_PATH}") + + # Validate positive integers + if cls.MAX_HISTORY < 1: + errors.append(f"MAX_HISTORY must be positive, got {cls.MAX_HISTORY}") + if cls.MAX_MESSAGE_LENGTH < 1: + errors.append(f"MAX_MESSAGE_LENGTH must be positive, got {cls.MAX_MESSAGE_LENGTH}") + if cls.MAX_NICKNAME_LENGTH < 1: + errors.append(f"MAX_NICKNAME_LENGTH must be positive, got {cls.MAX_NICKNAME_LENGTH}") + + # Print errors + if errors: + print("[ERROR] Configuration errors:") + for error in errors: + print(f" - {error}") + return False + + return True + + @classmethod + def display(cls): + """Display current configuration""" + print("\n" + "=" * 70) + print("CHAT SERVER CONFIGURATION") + print("=" * 70) + print(f"Server: {cls.HOST}:{cls.PORT}") + print(f"SSL: {'Enabled' if cls.USE_SSL else 'Disabled'}") + print(f"Database: {cls.DB_PATH}") + print(f"Log Level: {cls.LOG_LEVEL}") + print(f"Log File: {cls.LOG_FILE or 'Console only'}") + print(f"Max History: {cls.MAX_HISTORY} messages") + print(f"Rate Limit: {cls.RATE_LIMIT_MESSAGES} msg/{cls.RATE_LIMIT_WINDOW}s") + print(f"Auto Backup: {'Enabled' if cls.AUTO_BACKUP_ENABLED else 'Disabled'}") + print("=" * 70 + "\n") + + +# Validate configuration on import +if __name__ != "__main__": + if not Config.validate(): + raise ValueError("Invalid configuration. Please check your environment variables or .env file") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..24bbe02 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +websockets>=13.0 +cryptography>=44.0.0 +Pillow>=11.1.0 +prompt_toolkit>=3.0.40 +plyer>=2.1.0 diff --git a/server.py b/server.py new file mode 100644 index 0000000..7e0bece --- /dev/null +++ b/server.py @@ -0,0 +1,5236 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Complete Chat Server with 15 Features + Admin System +Optimized for both local and hosted deployment (Ubuntu) +[COMPATIBILITY] This file is 100% ASCII to prevent SyntaxErrors on Linux. +Production-ready with rate limiting, error handling, and security features +""" + +import asyncio +import json +import time +import ssl +import logging +import logging.handlers +import hashlib +import secrets +import sqlite3 +import os +import re +import sys +import weakref +from contextlib import contextmanager +from datetime import datetime +from typing import Dict, List, Optional, Set, Any, cast +from collections import defaultdict, deque +import websockets # type: ignore +from pathlib import Path + +# --- Visual Configuration --- +USE_EMOJIS = True + +def E(key): + """Return an emoji or text tag depending on USE_EMOJIS config""" + if not USE_EMOJIS: + if key == 'POUNCE': + return '* **POUNCE**' + return f"[{key}]" + + icons = { + "OK": "\u2705", + "X": "\u274c", + "ADM": "\U0001f451", + "MOD": "\U0001f6e1\ufe0f", + "PROFILE": "\U0001f464", + "SETTINGS": "\u2699\ufe0f", + "BAN": "\U0001f528", + "MUTED": "\U0001f507", + "OUTBOX": "\U0001f4ec", + "BROADCAST": "\U0001f4e2", + "POUNCE": "\U0001f43e", + "SECURE": "\U0001f512", + "INFO": "\u2139\ufe0f", + "WARN": "\u26a0\ufe0f", + "ERROR": "\u274c" + } + return icons.get(key, f"[{key}]") + +# ============================================================================ +# Define Colors class to avoid NameError in whisper messages +# ============================================================================ +class Colors: + MAGENTA = '\033[35m' + YELLOW = '\033[33m' + CYAN = '\033[36m' + GREEN = '\033[32m' + RED = '\033[31m' + BLUE = '\033[34m' + BOLD = '\033[1m' + RESET = '\033[0m' + +# Try to import configuration +try: + from config import Config # type: ignore + USE_CONFIG = True +except ImportError: + USE_CONFIG = False + print(f"{E('X')} config.py not found. Using default configuration.") + +# Configure logging with rotation +# Setup logging with file rotation and duplicate prevention +def setup_logging(log_file: Optional[str] = None, log_level: str = "INFO"): + from logging.handlers import RotatingFileHandler + import io + log_format = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s" + + # Ensure logs directory exists + os.makedirs("logs", exist_ok=True) + if not log_file: + log_file = "logs/server.log" + + logger = logging.getLogger("ChatServer") + if logger.handlers: + return logger + + logger.setLevel(getattr(logging, log_level.upper())) + + # Console handler - ensure we handle unicode even if console is cp1252/etc + try: + # Wrap stdout with errors='replace' to avoid UnicodeEncodeError on Windows + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding=sys.stdout.encoding or 'utf-8', errors='replace', line_buffering=True) + except Exception: + pass + + ch = logging.StreamHandler(sys.stdout) + ch.setFormatter(logging.Formatter(log_format)) + logger.addHandler(ch) + + # Rotating File handler - Explicitly use utf-8 to store all icons/symbols + try: + fh = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8', errors='replace') + fh.setFormatter(logging.Formatter(log_format)) + logger.addHandler(fh) + except Exception: + pass + + return logger + +logger = setup_logging() + +# Create a dummy Config to satisfy type checkers if import fails +try: + from config import Config # type: ignore + USE_CONFIG = True +except ImportError: + USE_CONFIG = False + class Config: # type: ignore + DB_PATH = "data/chat.db" + BACKUP_DIR = "data/backups" + ROOM_EXPIRATION_HOURS = 24 + AUTO_HISTORY_CLEAR = True + HOST = "127.0.0.1" + PORT = 8765 + MAX_HISTORY = 100 + ADMIN_PASSWORD = "" + USE_SSL = False + SSL_CERT_PATH = None + SSL_KEY_PATH = None + LOG_LEVEL = "INFO" + LOG_FILE = "logs/chat_server.log" + MAX_MESSAGE_LENGTH = 4096 + MAX_NICKNAME_LENGTH = 32 + RATE_LIMIT_MESSAGES = 120 + RATE_LIMIT_WINDOW = 60 + @classmethod + def display(cls): pass + +# ============================================================================ +# RATE LIMITER +# ============================================================================ + +class RateLimiter: + """Token bucket rate limiter for smoother message flow control""" + + def __init__(self, capacity: int = 10, refill_rate: float = 1.0): + """ + capacity: Maximum tokens in the bucket (burst size) + refill_rate: Tokens added per second + """ + self.capacity = capacity + self.refill_rate = refill_rate + self.buckets: Dict[str, Dict[str, float]] = defaultdict(lambda: { + 'tokens': float(capacity), + 'last_refill': time.time() + }) + + def is_allowed(self, user_id: str, tokens: float = 1.0) -> tuple: + """Check if user has enough tokens. Returns (allowed: bool, wait_seconds: float)""" + now = time.time() + + # Shared state handling: Access bucket safely + # Note: In CPython asyncio loop, this is safe from races as no 'await' occurs during modification. + if user_id not in self.buckets: + self.buckets[user_id] = {'tokens': float(self.capacity), 'last_refill': now} + + bucket = self.buckets[user_id] + + # Refill + passed = now - bucket['last_refill'] + bucket['tokens'] = min(float(self.capacity), bucket['tokens'] + passed * self.refill_rate) + bucket['last_refill'] = now + + if bucket['tokens'] >= tokens: + bucket['tokens'] -= tokens + return True, 0.0 + + # Calculate wait time: how many seconds until we have 'tokens' + needed = tokens - bucket['tokens'] + wait_time = needed / self.refill_rate + return False, wait_time + + def get_remaining(self, user_id: str) -> int: + """Estimate remaining whole tokens""" + self.is_allowed(user_id, 0) # Trigger refill calculation + return int(self.buckets[user_id]['tokens']) + + def cleanup(self): + """Remove buckets that have been full for a while""" + now = time.time() + # Build removal list then delete to avoid dict-size-change-during-iteration + to_remove = [ + uid for uid, bucket in list(self.buckets.items()) + if bucket['tokens'] >= self.capacity and (now - bucket['last_refill']) > 300 + ] + for user_id in to_remove: + self.buckets.pop(user_id, None) + +# ============================================================================ +# DATABASE MANAGER +# ============================================================================ + +class DatabaseManager: + """SQLite database manager for all persistence""" + + def __init__(self, db_path: str = "data/chat.db", backup_dir: str = "data/backups"): + # Ensure data directory exists + db_dir = Path(db_path).parent + db_dir.mkdir(parents=True, exist_ok=True) + self.db_path = db_path + self.backup_dir = backup_dir + + # Ensure backup directory exists + Path(self.backup_dir).mkdir(parents=True, exist_ok=True) + + # NOTE: init_database() is NOT called here to avoid blocking the + # event loop at import time. Call db.init_database() explicitly + # before starting the server (via asyncio.to_thread in start()). + + @contextmanager + def get_connection(self): + """Get a thread-safe connection with high timeout and WAL mode enabled. + + Hardened version: Ensures transactions are only committed if the block + completes WITHOUT catching internal exceptions and returning. + """ + conn = sqlite3.connect(self.db_path, timeout=30.0) + conn.row_factory = sqlite3.Row + # WAL mode allows concurrent reads and writes + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA synchronous=NORMAL;") + conn.execute("PRAGMA foreign_keys = ON") + + try: + yield conn + # ONLY commit if the block finished NORMALLY. + # If the caller returns from inside the 'with', this still runs. + # If the caller raises, this is skipped. + conn.commit() + except (sqlite3.Error, Exception) as e: + if conn: + try: + conn.rollback() + except Exception: + pass + if isinstance(e, sqlite3.Error): + logger.error(f"SQLITE ERROR: {e}") + else: + logger.error(f"Unexpected DB error: {e}") + raise + finally: + if conn: + conn.close() + + def init_database(self): + """Initialize all database tables""" + # Set persistent journal mode to WAL once + with sqlite3.connect(self.db_path) as conn: + conn.execute("PRAGMA journal_mode = WAL") + + with self.get_connection() as conn: + # Messages table + conn.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id TEXT UNIQUE, + room TEXT NOT NULL, + nickname TEXT NOT NULL, + user_id TEXT NOT NULL, + content TEXT NOT NULL, + timestamp REAL NOT NULL, + message_type TEXT DEFAULT 'text', + parent_id INTEGER, + encrypted INTEGER DEFAULT 0, + deleted INTEGER DEFAULT 0, + FOREIGN KEY (parent_id) REFERENCES messages(id) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_room_time ON messages(room, timestamp)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_nickname ON messages(nickname)") + + # Migration: Tag legacy encrypted messages (Suggestion 1) + conn.execute("UPDATE messages SET encrypted = 1 WHERE content LIKE 'gAAAAA%' AND encrypted = 0") + + # Users table + conn.execute(""" + CREATE TABLE IF NOT EXISTS users ( + user_id TEXT PRIMARY KEY, + nickname TEXT UNIQUE NOT NULL, + password_hash TEXT, + email TEXT, + registered INTEGER DEFAULT 0, + created_at REAL, + last_seen REAL, + is_admin INTEGER DEFAULT 0, + settings TEXT + ) + """) + + # Rooms table + conn.execute(""" + CREATE TABLE IF NOT EXISTS rooms ( + name TEXT PRIMARY KEY, + topic TEXT, + is_private INTEGER DEFAULT 0, + password_hash TEXT, + created_by TEXT, + created_by_id TEXT, + created_at REAL, + expires_at REAL, + max_users INTEGER DEFAULT 100, + settings TEXT + ) + """) + + # User blocks table + conn.execute(""" + CREATE TABLE IF NOT EXISTS user_blocks ( + user_id TEXT NOT NULL, + blocked_user_id TEXT NOT NULL, + blocked_nickname TEXT, + created_at REAL NOT NULL, + expires_at REAL, + reason TEXT, + PRIMARY KEY (user_id, blocked_user_id) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_blocks_user ON user_blocks(user_id)") + + # Invite codes table + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_invites ( + code TEXT PRIMARY KEY, + room TEXT NOT NULL, + created_by TEXT NOT NULL, + created_at REAL NOT NULL, + expires_at REAL, + max_uses INTEGER DEFAULT 1, + used_count INTEGER DEFAULT 0, + is_active INTEGER DEFAULT 1 + ) + """) + + # Invite usage table + conn.execute(""" + CREATE TABLE IF NOT EXISTS invite_usage ( + code TEXT NOT NULL, + user_id TEXT NOT NULL, + used_at REAL NOT NULL, + nickname TEXT, + PRIMARY KEY (code, user_id) + ) + """) + + # Command aliases table + conn.execute(""" + CREATE TABLE IF NOT EXISTS command_aliases ( + user_id TEXT NOT NULL, + alias_name TEXT NOT NULL, + command_text TEXT NOT NULL, + created_at REAL, + usage_count INTEGER DEFAULT 0, + last_used REAL, + PRIMARY KEY (user_id, alias_name) + ) + """) + + # Room announcements table + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_announcements ( + room TEXT PRIMARY KEY, + content TEXT NOT NULL, + created_by TEXT NOT NULL, + created_at REAL NOT NULL, + expires_at REAL, + color TEXT DEFAULT 'yellow' + ) + """) + + # Scheduled messages table + conn.execute(""" + CREATE TABLE IF NOT EXISTS scheduled_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + room TEXT NOT NULL, + user_id TEXT NOT NULL, + nickname TEXT NOT NULL, + content TEXT NOT NULL, + scheduled_time REAL NOT NULL, + created_at REAL NOT NULL, + is_cancelled INTEGER DEFAULT 0, + is_sent INTEGER DEFAULT 0 + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_scheduled_time ON scheduled_messages(scheduled_time)") + + # Bookmarks table + conn.execute(""" + CREATE TABLE IF NOT EXISTS user_bookmarks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + name TEXT NOT NULL, + command TEXT NOT NULL, + description TEXT, + created_at REAL NOT NULL, + usage_count INTEGER DEFAULT 0, + last_used REAL, + UNIQUE(user_id, name) + ) + """) + + # Notification rules table + conn.execute(""" + CREATE TABLE IF NOT EXISTS notification_rules ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + rule_type TEXT NOT NULL, + value TEXT NOT NULL, + created_at REAL NOT NULL, + UNIQUE(user_id, rule_type, value) + ) + """) + + # Audit logs table (Feature 4) + conn.execute(""" + CREATE TABLE IF NOT EXISTS audit_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + admin_id TEXT NOT NULL, + admin_nickname TEXT NOT NULL, + action TEXT NOT NULL, + target_id TEXT, + target_nickname TEXT, + details TEXT, + timestamp REAL NOT NULL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_time ON audit_logs(timestamp)") + + # Offline messages table (Drop old schema if it exists) + offline_cols = [col['name'] for col in conn.execute("PRAGMA table_info(offline_messages)").fetchall()] + if offline_cols and 'target_user_id' not in offline_cols: + conn.execute("DROP TABLE offline_messages") + + conn.execute(""" + CREATE TABLE IF NOT EXISTS offline_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + target_user_id TEXT NOT NULL, + from_nickname TEXT NOT NULL, + content TEXT NOT NULL, + timestamp REAL NOT NULL, + is_read INTEGER DEFAULT 0 + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_offline_target ON offline_messages(target_user_id)") + + # Room roles table (RBAC) + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_roles ( + room TEXT NOT NULL, + user_id TEXT NOT NULL, + role TEXT NOT NULL, -- 'owner', 'moderator', 'member' + assigned_at REAL NOT NULL, + assigned_by TEXT, + PRIMARY KEY (room, user_id) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_room_roles_user ON room_roles(user_id)") + + # Room backups table + conn.execute(""" + CREATE TABLE IF NOT EXISTS room_backups ( + backup_id TEXT PRIMARY KEY, + room TEXT NOT NULL, + created_by TEXT NOT NULL, + created_at REAL NOT NULL, + file_path TEXT NOT NULL, + message_count INTEGER DEFAULT 0, + password_hash TEXT, + restored_at REAL, + restored_by TEXT + ) + """) + + # Bans table (persistent kick) + conn.execute(""" + CREATE TABLE IF NOT EXISTS bans ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + banned_user_id TEXT, + banned_nickname TEXT NOT NULL, + banned_ip TEXT, + banned_by TEXT NOT NULL, + reason TEXT DEFAULT '', + created_at REAL NOT NULL, + expires_at REAL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_bans_uid ON bans(banned_user_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_bans_ip ON bans(banned_ip)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_bans_nick ON bans(banned_nickname)") + + # Mutes table + conn.execute(""" + CREATE TABLE IF NOT EXISTS mutes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL, + nickname TEXT NOT NULL, + muted_by TEXT NOT NULL, + reason TEXT DEFAULT '', + created_at REAL NOT NULL, + expires_at REAL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_mutes_uid ON mutes(user_id)") + + # Pounces table + conn.execute(""" + CREATE TABLE IF NOT EXISTS user_pounces ( + pouncer_uid TEXT NOT NULL, + target_uid TEXT NOT NULL, + created_at REAL NOT NULL, + PRIMARY KEY (pouncer_uid, target_uid) + ) + """) + + default_rooms = [ + ('lobby', 'Main lobby - Welcome everyone!', 0, None, 'system', time.time()), + ('general', 'General discussion', 0, None, 'system', time.time()), + ('help', 'Help and support', 0, None, 'system', time.time()), + ('random', 'Random chat about anything', 0, None, 'system', time.time()) + ] + + for room in default_rooms: + conn.execute(""" + INSERT OR IGNORE INTO rooms (name, topic, is_private, password_hash, created_by, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, room) + + # --- FTS5 Full-Text Search Implementation --- + # Create virtual table for high-speed searching + conn.execute(""" + CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5( + content, + nickname, + room, + content_id UNINDEXED + ) + """) + + # Create triggers to keep FTS index in sync with messages table + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS idx_messages_insert AFTER INSERT ON messages BEGIN + INSERT INTO messages_fts(content, nickname, room, content_id) + VALUES (new.content, new.nickname, new.room, new.id); + END; + """) + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS idx_messages_delete AFTER DELETE ON messages BEGIN + DELETE FROM messages_fts WHERE content_id = old.id; + END; + """) + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS idx_messages_update AFTER UPDATE ON messages BEGIN + UPDATE messages_fts SET content = new.content WHERE content_id = old.id; + END; + """) + + # Initial population if table is new + fts_count = conn.execute("SELECT count(*) FROM messages_fts").fetchone()[0] + if fts_count == 0: + conn.execute(""" + INSERT INTO messages_fts(content, nickname, room, content_id) + SELECT content, nickname, room, id FROM messages + """) + + # ---- Schema migrations (safe: no-op if column already exists) ---- + migrations = [ + "ALTER TABLE rooms ADD COLUMN created_by_id TEXT", + "ALTER TABLE rooms ADD COLUMN expires_at REAL", + "ALTER TABLE rooms ADD COLUMN max_users INTEGER DEFAULT 100", + "ALTER TABLE rooms ADD COLUMN settings TEXT", + "ALTER TABLE messages ADD COLUMN message_type TEXT DEFAULT 'text'", + "ALTER TABLE messages ADD COLUMN parent_id INTEGER", + "ALTER TABLE messages ADD COLUMN encrypted INTEGER DEFAULT 0", + "ALTER TABLE messages ADD COLUMN deleted INTEGER DEFAULT 0", + "ALTER TABLE messages ADD COLUMN edited INTEGER DEFAULT 0", + "ALTER TABLE messages ADD COLUMN edit_history TEXT", + "ALTER TABLE users ADD COLUMN settings TEXT", + "ALTER TABLE users ADD COLUMN is_admin INTEGER DEFAULT 0", + "ALTER TABLE users ADD COLUMN custom_status TEXT", + "ALTER TABLE users ADD COLUMN last_ip TEXT" + ] + for stmt in migrations: + try: + conn.execute(stmt) + except sqlite3.OperationalError as e: + if "duplicate column name" not in str(e).lower(): + logger.error(f"Migration error: {e}") + except Exception as e: + logger.error(f"Unexpected migration error: {e}") + + # ========== MESSAGE METHODS ========== + + def save_message(self, room: str, nickname: str, user_id: str, content: str, + message_type: str = 'text', parent_id: Optional[int] = None, is_encrypted: bool = False) -> int: + """Save message to database""" + import uuid + message_id = str(uuid.uuid4()) + + with self.get_connection() as conn: + cursor = conn.execute(""" + INSERT INTO messages + (message_id, room, nickname, user_id, content, timestamp, message_type, parent_id, encrypted) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (message_id, room, nickname, user_id, content, time.time(), message_type, parent_id, 1 if is_encrypted else 0)) + return cast(int, cursor.lastrowid) if cursor.lastrowid is not None else 0 + + def save_offline_message(self, target_uid: str, from_nick: str, content: str): + """Save a private message for an offline user""" + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO offline_messages (target_user_id, from_nickname, content, timestamp) + VALUES (?, ?, ?, ?) + """, (target_uid, from_nick, content, time.time())) + + def get_offline_messages(self, user_id: str) -> List[Dict]: + """Atomically retrieve and delete offline messages for a user in one transaction.""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT from_nickname, content, timestamp + FROM offline_messages + WHERE target_user_id = ? + ORDER BY timestamp ASC + """, (user_id,)).fetchall() + + # Delete in same transaction - if the connection dies, messages + # are NOT deleted (sqlite WAL will rollback), preventing silent loss. + if rows: + conn.execute("DELETE FROM offline_messages WHERE target_user_id = ?", (user_id,)) + + return [dict(row) for row in rows] + + def get_recent_messages(self, room: str, limit: int = 50) -> List[Dict]: + """Fetch N most recent messages from a room""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT id, message_id, room, nickname, user_id, content as message, content, timestamp, message_type, parent_id, encrypted + FROM messages + WHERE room = ? AND deleted = 0 + ORDER BY timestamp DESC LIMIT ? + """, (room, limit)).fetchall() + return [dict(r) for r in rows] + + + def get_message_history(self, room: str, limit: int = 100, since_id: Optional[int] = None) -> List[Dict]: + """Get history with optional starting point for sync (Suggestion 5)""" + with self.get_connection() as conn: + if since_id: + query = "SELECT id, message_id, room, nickname, user_id, content as message, content, timestamp, message_type, parent_id, encrypted FROM messages WHERE room = ? AND deleted = 0 AND id > ? ORDER BY timestamp DESC LIMIT ?" + cursor = conn.execute(query, (room, since_id, limit)) + else: + query = "SELECT id, message_id, room, nickname, user_id, content as message, content, timestamp, message_type, parent_id, encrypted FROM messages WHERE room = ? AND deleted = 0 ORDER BY timestamp DESC LIMIT ?" + cursor = conn.execute(query, (room, limit)) + + return [dict(row) for row in cursor.fetchall()] + + def delete_room_history(self, room: str) -> int: + """Clear all messages in a room""" + with self.get_connection() as conn: + result = conn.execute("DELETE FROM messages WHERE room = ?", (room,)) + return result.rowcount + + def delete_all_history(self) -> int: + """Wipe all history from all rooms""" + with self.get_connection() as conn: + result = conn.execute("DELETE FROM messages") + return result.rowcount + + def search_messages(self, query: str, room: Optional[str] = None, user: Optional[str] = None, + days: Optional[int] = None, limit: int = 50) -> List[Dict]: + """Search messages with high-speed FTS5 MATCH""" + with self.get_connection() as conn: + # We use the MATCH operator for high-speed indexed search. + # This supports boolean logic like "foo AND bar" or "foo -baz" + sql = """ + SELECT m.* FROM messages m + JOIN messages_fts f ON m.id = f.content_id + WHERE messages_fts MATCH ? AND m.deleted = 0 + """ + params: List[Any] = [query] + + if room: + sql += " AND m.room = ?" + params.append(room) + if user: + sql += " AND m.nickname = ?" + params.append(user) + if days: + cutoff = time.time() - (days * 86400) + sql += " AND m.timestamp > ?" + params.append(cutoff) + + sql += " ORDER BY m.timestamp DESC LIMIT ?" + params.append(limit) + + try: + cursor = conn.execute(sql, params) + return [dict(row) for row in cursor.fetchall()] + except sqlite3.OperationalError as e: + # Handle malformed FTS5 queries (e.g. trailing minus or unclosed quote) + logger.warning(f"FTS Search Error: {e}") + return [] + + # ========== USER METHODS ========== + + def is_nickname_registered(self, nickname: str) -> bool: + """Check if nickname is registered (Case-Insensitive)""" + with self.get_connection() as conn: + existing = conn.execute( + "SELECT nickname FROM users WHERE LOWER(nickname) = LOWER(?) AND registered = 1", + (nickname,) + ).fetchone() + return existing is not None + + def is_user_registered_by_id(self, user_id: str) -> bool: + """Check if a specific user_id is a registered account""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT registered FROM users WHERE user_id = ?", + (user_id,) + ).fetchone() + return row and row['registered'] == 1 + + def register_user(self, nickname: str, password: str, email: Optional[str] = None) -> tuple: + """Register new user (Enforces case-insensitive uniqueness and format validation)""" + # Security: Validation of nickname format before registration (Suggestion 6) + if not re.match(r'^[a-zA-Z0-9_-]+$', nickname): + return None, "Invalid nickname format. Use only alphanumeric, underscore, or hyphen." + + with self.get_connection() as conn: + existing = conn.execute( + "SELECT nickname FROM users WHERE LOWER(nickname) = LOWER(?)", + (nickname,) + ).fetchone() + + if existing: + return None, "Nickname already registered" + + # Generate salt and hash + salt = secrets.token_hex(16) + password_hash = hashlib.pbkdf2_hmac( + 'sha256', + password.encode(), + salt.encode(), + 100000 + ).hex() + stored_hash = f"{salt}${password_hash}" + + # Generate user ID + user_id = f"user_{secrets.token_hex(8)}" + + conn.execute(""" + INSERT INTO users + (user_id, nickname, password_hash, email, registered, created_at, last_seen) + VALUES (?, ?, ?, ?, 1, ?, ?) + """, (user_id, nickname, stored_hash, email, time.time(), time.time())) + + return user_id, "Registration successful" + + def authenticate_user(self, nickname: str, password: str) -> tuple: + """Authenticate user (Case-Insensitive lookup)""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT user_id, password_hash FROM users WHERE LOWER(nickname) = LOWER(?) AND registered = 1", + (nickname,) + ).fetchone() + + if not row: + return None, "Nickname not registered" + + stored = row['password_hash'] + if not stored or '$' not in stored: + logger.error(f"Corrupted password_hash for user {row['user_id']}") + return None, "Account data is corrupted. Please contact an admin." + + salt, hash_value = stored.split('$', 1) + test_hash = hashlib.pbkdf2_hmac( + 'sha256', + password.encode(), + salt.encode(), + 100000 + ).hex() + + if test_hash.encode() == hash_value.encode(): # Using string equality first for speed, then constant time + if secrets.compare_digest(test_hash.encode(), hash_value.encode()): + conn.execute( + "UPDATE users SET last_seen = ? WHERE user_id = ?", + (time.time(), row['user_id']) + ) + return row['user_id'], "Authenticated" + + return None, "Invalid password" + + + def change_password(self, user_id: str, old_password: str, new_password: str) -> tuple: + """Change user password securely""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT password_hash FROM users WHERE user_id = ? AND registered = 1", + (user_id,) + ).fetchone() + + if not row: + return False, "User not registered" + + # Verify old password + salt, hash_value = row['password_hash'].split('$') + test_hash = hashlib.pbkdf2_hmac( + 'sha256', + old_password.encode(), + salt.encode(), + 100000 + ).hex() + + if not secrets.compare_digest(test_hash.encode(), (hash_value or "").encode()): + return False, "Invalid current password" + + # Hash and store new password + new_salt = secrets.token_hex(16) + new_password_hash = hashlib.pbkdf2_hmac( + 'sha256', + new_password.encode(), + new_salt.encode(), + 100000 + ).hex() + new_stored_hash = f"{new_salt}${new_password_hash}" + + conn.execute( + "UPDATE users SET password_hash = ? WHERE user_id = ?", + (new_stored_hash, user_id) + ) + return True, "Password changed successfully" + + def get_user_id_by_nickname(self, nickname: str) -> Optional[str]: + """Get user ID from nickname (Case-Insensitive)""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT user_id FROM users WHERE LOWER(nickname) = LOWER(?)", + (nickname,) + ).fetchone() + return row['user_id'] if row else None + + def list_registered_users(self) -> List[Dict]: + """Get a list of all registered users""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT user_id, nickname, registered, is_admin, created_at, last_seen + FROM users + ORDER BY registered DESC, last_seen DESC + """).fetchall() + return [dict(row) for row in rows] + + def get_user_settings(self, user_id: str) -> dict: + """Get user settings from DB""" + with self.get_connection() as conn: + row = conn.execute("SELECT settings FROM users WHERE user_id = ?", (user_id,)).fetchone() + if row and row['settings']: + try: + return json.loads(row['settings']) + except json.JSONDecodeError: + return {} + return {} + + def update_user_settings(self, user_id: str, settings: dict) -> bool: + """Save user settings to DB""" + try: + settings_json = json.dumps(settings) + with self.get_connection() as conn: + conn.execute("UPDATE users SET settings = ? WHERE user_id = ?", (settings_json, user_id)) + return True + except Exception: + return False + + def get_user_by_id(self, user_id: str) -> Optional[Dict]: + """Get user by ID""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT * FROM users WHERE user_id = ?", + (user_id,) + ).fetchone() + return dict(row) if row else None + + # ========== ADMIN METHODS ========== + + def set_admin(self, user_id: str, is_admin: bool = True, nickname: str = "Administrator") -> bool: + """Add or update admin status (Supports guests via UPSERT)""" + with self.get_connection() as conn: + # We use an UPSERT-like pattern to ensure even non-registered elevated guests exist in users table + conn.execute(""" + INSERT INTO users (user_id, nickname, is_admin, created_at, last_seen, registered) + VALUES (?, ?, ?, ?, ?, 0) + ON CONFLICT(user_id) DO UPDATE SET is_admin = excluded.is_admin, last_seen = excluded.last_seen + """, (user_id, nickname, 1 if is_admin else 0, time.time(), time.time())) + return True + + def vacuum_database(self): + """Reclaim unused space and optimize database file (Feature 3)""" + with self.get_connection() as conn: + conn.execute("VACUUM") + return True + + def is_user_admin(self, user_id: str) -> bool: + """Check if user is admin""" + with self.get_connection() as conn: + row = conn.execute(""" + SELECT is_admin FROM users WHERE user_id = ? + """, (user_id,)).fetchone() + return row and row['is_admin'] == 1 + + def get_all_admins(self) -> List[Dict]: + """Get list of all admin users""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT user_id, nickname, created_at + FROM users + WHERE is_admin = 1 + ORDER BY nickname + """).fetchall() + return [dict(row) for row in rows] + + def remove_admin(self, user_id: str) -> bool: + """Remove admin privileges""" + return self.set_admin(user_id, False) + + def add_audit_log(self, admin_id: str, admin_nick: str, action: str, target_id: Optional[str] = None, target_nick: Optional[str] = None, details: Optional[str] = None): + """Record an administrative action to the audit ledger""" + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO audit_logs (admin_id, admin_nickname, action, target_id, target_nickname, details, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (admin_id, admin_nick, action, target_id, target_nick, details, time.time())) + + def get_audit_logs(self, limit: int = 50) -> List[Dict]: + """Fetch the most recent administrative actions from the ledger""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM audit_logs ORDER BY timestamp DESC LIMIT ? + """, (limit,)).fetchall() + return [dict(row) for row in rows] + + # ========== BAN METHODS ========== + + def ban_user(self, banned_uid: str, banned_nick: str, banned_by: str, reason: str = "", duration_hours: Optional[int] = None, banned_ip: Optional[str] = None) -> bool: + """Persistently ban a user by UID/nickname. Optionally also ban their IP (admin opt-in only).""" + expires_at = (time.time() + duration_hours * 3600) if duration_hours else None + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO bans (banned_user_id, banned_nickname, banned_ip, banned_by, reason, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (banned_uid, banned_nick, banned_ip, banned_by, reason, time.time(), expires_at)) + return True + + def unban_user(self, identifier: str) -> int: + """Remove all bans matching a nickname or user_id. Returns count removed.""" + with self.get_connection() as conn: + result = conn.execute(""" + DELETE FROM bans WHERE banned_user_id = ? OR banned_nickname = ? + """, (identifier, identifier)) + return result.rowcount + + def is_banned(self, user_id: str, nickname: str, ip: str = '') -> Optional[Dict]: + """Check if a user is banned by user_id, nickname, or IP (IP only checked if non-empty).""" + now = time.time() + with self.get_connection() as conn: + # Build query: always check uid+nick; only check IP if provided + if ip: + row = conn.execute(""" + SELECT * FROM bans + WHERE (banned_user_id = ? OR banned_nickname = ? OR (banned_ip IS NOT NULL AND banned_ip = ?)) + AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at DESC LIMIT 1 + """, (user_id, nickname, ip, now)).fetchone() + else: + row = conn.execute(""" + SELECT * FROM bans + WHERE (banned_user_id = ? OR banned_nickname = ?) + AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at DESC LIMIT 1 + """, (user_id, nickname, now)).fetchone() + return dict(row) if row else None + + def get_ban_list(self) -> List[Dict]: + """Get all active bans""" + now = time.time() + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM bans WHERE (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at DESC LIMIT 100 + """, (now,)).fetchall() + return [dict(r) for r in rows] + + # ========== MUTE METHODS ========== + + def mute_user(self, target_uid: str, target_nick: str, muted_by: str, duration_hours: Optional[float] = None, reason: str = "") -> bool: + """Add a mute record""" + expires_at = (time.time() + duration_hours * 3600) if duration_hours else None + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO mutes (user_id, nickname, muted_by, reason, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?) + """, (target_uid, target_nick, muted_by, reason, time.time(), expires_at)) + return True + + def unmute_user(self, identifier: str) -> int: + """Remove active mutes by nickname or user_id""" + with self.get_connection() as conn: + result = conn.execute(""" + DELETE FROM mutes WHERE user_id = ? OR nickname = ? + """, (identifier, identifier)) + return result.rowcount + + def is_muted(self, user_id: str, nickname: str) -> Optional[Dict]: + """Check if user is currently muted""" + now = time.time() + with self.get_connection() as conn: + row = conn.execute(""" + SELECT * FROM mutes + WHERE (user_id = ? OR nickname = ?) + AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at DESC LIMIT 1 + """, (user_id, nickname, now)).fetchone() + return dict(row) if row else None + + # ========== MESSAGE EDIT/DELETE METHODS ========== + + def edit_message(self, msg_id: int, user_id: str, new_content: str, is_admin: bool = False) -> tuple: + """Edit a message. Admin can edit any; users can edit their own within 5 min.""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT user_id, content, timestamp, edit_history, deleted FROM messages WHERE id = ?", + (msg_id,) + ).fetchone() + if not row: + return False, "Message not found" + if row['deleted']: + return False, "Cannot edit a deleted message" + if not is_admin: + if row['user_id'] != user_id: + return False, "You can only edit your own messages" + if (time.time() - row['timestamp']) > 300: # 5-minute window + return False, "Edit window expired (5 minutes)" + # Append old content to edit_history + history = json.loads(row['edit_history'] or '[]') + history.append({'content': row['content'], 'edited_at': time.time()}) + conn.execute( + "UPDATE messages SET content = ?, edited = 1, edit_history = ? WHERE id = ?", + (new_content, json.dumps(history), msg_id) + ) + return True, row['user_id'] # Return original author's UID for broadcast + + def delete_message(self, msg_id: int, user_id: str, is_admin: bool = False) -> tuple: + """Soft-delete a message. Admin can delete any; users can delete their own within 5 min.""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT user_id, timestamp, room FROM messages WHERE id = ?", + (msg_id,) + ).fetchone() + if not row: + return False, "Message not found" + if not is_admin: + if row['user_id'] != user_id: + return False, "You can only delete your own messages" + if (time.time() - row['timestamp']) > 300: + return False, "Delete window expired (5 minutes)" + conn.execute("UPDATE messages SET deleted = 1, content = '[Message deleted]' WHERE id = ?", (msg_id,)) + return True, row['room'] + + def get_message_by_id(self, msg_id: int) -> Optional[Dict]: + """Fetch a message by its integer row ID""" + with self.get_connection() as conn: + row = conn.execute("SELECT * FROM messages WHERE id = ?", (msg_id,)).fetchone() + return dict(row) if row else None + + + def add_block(self, user_id: str, blocked_uid: str, blocked_nickname: str, duration: Optional[int] = None, reason: str = "") -> tuple: + """Add user to block list""" + expires_at = time.time() + duration if duration else None + + with self.get_connection() as conn: + conn.execute(""" + INSERT OR REPLACE INTO user_blocks + (user_id, blocked_user_id, blocked_nickname, created_at, expires_at, reason) + VALUES (?, ?, ?, ?, ?, ?) + """, (user_id, blocked_uid, blocked_nickname, time.time(), expires_at, reason)) + + return True, f"Blocked {blocked_nickname}" + + def remove_block(self, user_id: str, blocked_uid: str) -> bool: + """Remove user from block list by UID""" + with self.get_connection() as conn: + result = conn.execute(""" + DELETE FROM user_blocks + WHERE user_id = ? AND blocked_user_id = ? + """, (user_id, blocked_uid)) + return result.rowcount > 0 + + def remove_block_by_nick(self, user_id: str, nickname: str) -> bool: + """Remove user from block list by nickname""" + with self.get_connection() as conn: + result = conn.execute(""" + DELETE FROM user_blocks + WHERE user_id = ? AND blocked_nickname = ? + """, (user_id, nickname)) + return result.rowcount > 0 + + def get_blocks(self, user_id: str) -> List[Dict]: + """Get list of blocked users""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM user_blocks + WHERE user_id = ? AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at DESC + """, (user_id, time.time())).fetchall() + return [dict(row) for row in rows] + + # ========== INVITE METHODS ========== + + def create_invite(self, room: str, creator_id: str, expires_hours: int = 24, max_uses: int = 1) -> str: + """Create invite code""" + code = secrets.token_urlsafe(8) + expires_at = time.time() + (expires_hours * 3600) if expires_hours else None + + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO room_invites + (code, room, created_by, created_at, expires_at, max_uses) + VALUES (?, ?, ?, ?, ?, ?) + """, (code, room, creator_id, time.time(), expires_at, max_uses)) + + return code + + def validate_invite(self, code: str, user_id: str) -> tuple: + """Validate invite code""" + with self.get_connection() as conn: + invite = conn.execute(""" + SELECT * FROM room_invites + WHERE code = ? AND is_active = 1 + """, (code,)).fetchone() + + if not invite: + # Anti-brute force delay is now handled in the async layer to keep threads free + return None, "Invalid invite code" + + if invite['expires_at'] and time.time() > invite['expires_at']: + conn.execute( + "UPDATE room_invites SET is_active = 0 WHERE code = ?", + (code,) + ) + return None, "Invite code expired" + + if invite['used_count'] >= invite['max_uses']: + return None, "Invite code max uses reached" + + existing = conn.execute(""" + SELECT * FROM invite_usage + WHERE code = ? AND user_id = ? + """, (code, user_id)).fetchone() + + if existing: + return None, "You already used this invite code" + + return invite['room'], "Valid invite" + + def use_invite(self, code: str, user_id: str, nickname: str) -> None: + """Record invite usage""" + with self.get_connection() as conn: + conn.execute(""" + INSERT INTO invite_usage (code, user_id, used_at, nickname) + VALUES (?, ?, ?, ?) + """, (code, user_id, time.time(), nickname)) + + conn.execute(""" + UPDATE room_invites + SET used_count = used_count + 1 + WHERE code = ? + """, (code,)) + + # ========== ALIAS METHODS ========== + + def add_alias(self, user_id: str, alias_name: str, command_text: str) -> tuple: + """Add command alias""" + if not re.match(r'^[a-zA-Z0-9_]{1,20}$', alias_name): + return False, "Alias must be alphanumeric and underscore, 1-20 chars" + + with self.get_connection() as conn: + conn.execute(""" + INSERT OR REPLACE INTO command_aliases + (user_id, alias_name, command_text, created_at) + VALUES (?, ?, ?, ?) + """, (user_id, alias_name, command_text, time.time())) + return True, f"Alias /{alias_name} created" + + def get_alias(self, user_id: str, alias_name: str) -> Optional[str]: + """Get alias command""" + with self.get_connection() as conn: + row = conn.execute(""" + SELECT command_text FROM command_aliases + WHERE user_id = ? AND alias_name = ? + """, (user_id, alias_name)).fetchone() + + if row: + conn.execute(""" + UPDATE command_aliases + SET usage_count = usage_count + 1, last_used = ? + WHERE user_id = ? AND alias_name = ? + """, (time.time(), user_id, alias_name)) + return row['command_text'] + return None + + def list_aliases(self, user_id: str) -> List[Dict]: + """List all aliases for user""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM command_aliases + WHERE user_id = ? + ORDER BY alias_name + """, (user_id,)).fetchall() + return [dict(row) for row in rows] + + def remove_alias(self, user_id: str, alias_name: str) -> bool: + """Remove alias""" + with self.get_connection() as conn: + conn.execute(""" + DELETE FROM command_aliases + WHERE user_id = ? AND alias_name = ? + """, (user_id, alias_name)) + return True + + # ========== ANNOUNCEMENT METHODS ========== + + def set_announcement(self, room: str, content: str, creator: str, expires_hours: Optional[int] = None) -> None: + """Set room announcement""" + expires_at = time.time() + (expires_hours * 3600) if expires_hours else None + + with self.get_connection() as conn: + conn.execute(""" + INSERT OR REPLACE INTO room_announcements + (room, content, created_by, created_at, expires_at) + VALUES (?, ?, ?, ?, ?) + """, (room, content, creator, time.time(), expires_at)) + + def get_announcement(self, room: str) -> Optional[Dict]: + """Get room announcement""" + with self.get_connection() as conn: + row = conn.execute(""" + SELECT * FROM room_announcements WHERE room = ? + """, (room,)).fetchone() + + if row: + if row['expires_at'] and time.time() > row['expires_at']: + conn.execute( + "DELETE FROM room_announcements WHERE room = ?", + (room,) + ) + return None + return dict(row) + return None + + def clear_announcement(self, room: str) -> None: + """Clear room announcement""" + with self.get_connection() as conn: + conn.execute( + "DELETE FROM room_announcements WHERE room = ?", + (room,) + ) + + # ========== SCHEDULED MESSAGE METHODS ========== + + def schedule_message(self, room: str, user_id: str, nickname: str, + content: str, scheduled_time: float) -> int: + """Schedule message for future delivery""" + with self.get_connection() as conn: + cursor = conn.execute(""" + INSERT INTO scheduled_messages + (room, user_id, nickname, content, scheduled_time, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, (room, user_id, nickname, content, scheduled_time, time.time())) + return cast(int, cursor.lastrowid) if cursor.lastrowid is not None else 0 + + def get_due_messages(self) -> List[Dict]: + """Get messages due for delivery""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM scheduled_messages + WHERE scheduled_time <= ? + AND is_cancelled = 0 + AND is_sent = 0 + ORDER BY scheduled_time ASC + """, (time.time(),)).fetchall() + return [dict(row) for row in rows] + + def deliver_scheduled_message(self, room: str, nickname: str, user_id: str, content: str, msg_id: int) -> int: + """ATOMIC: Mark scheduled message as sent and save to history in one transaction.""" + with self.get_connection() as conn: + # 1. Mark sent + conn.execute("UPDATE scheduled_messages SET is_sent = 1 WHERE id = ?", (msg_id,)) + + # 2. Save to history + msg_id_str = f"sch_{secrets.token_hex(8)}" + cursor = conn.execute(""" + INSERT INTO messages + (message_id, room, nickname, user_id, content, timestamp, message_type) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (msg_id_str, room, nickname, user_id, content, time.time(), 'text')) + return cast(int, cursor.lastrowid) if cursor.lastrowid is not None else 0 + + def cancel_scheduled_message(self, user_id: str, msg_id: int) -> bool: + """Cancel scheduled message""" + with self.get_connection() as conn: + result = conn.execute(""" + UPDATE scheduled_messages + SET is_cancelled = 1 + WHERE id = ? AND user_id = ? AND is_sent = 0 + """, (msg_id, user_id)) + return result.rowcount > 0 + + def list_scheduled_messages(self, user_id: str) -> List[Dict]: + """List user's scheduled messages""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM scheduled_messages + WHERE user_id = ? AND is_cancelled = 0 AND is_sent = 0 + ORDER BY scheduled_time ASC + """, (user_id,)).fetchall() + return [dict(row) for row in rows] + + # ========== BOOKMARK METHODS ========== + + def add_bookmark(self, user_id: str, name: str, command: str, description: Optional[str] = None) -> tuple: + """Add room bookmark""" + if not re.match(r'^[a-zA-Z0-9_-]{1,30}$', name): + return False, "Bookmark name must be alphanumeric, underscore, hyphen" + + with self.get_connection() as conn: + conn.execute(""" + INSERT OR REPLACE INTO user_bookmarks + (user_id, name, command, description, created_at) + VALUES (?, ?, ?, ?, ?) + """, (user_id, name, command, description, time.time())) + return True, f"Bookmark '{name}' added" + + def get_bookmark(self, user_id: str, name: str) -> Optional[Dict]: + """Get bookmark""" + with self.get_connection() as conn: + row = conn.execute(""" + SELECT * FROM user_bookmarks + WHERE user_id = ? AND name = ? + """, (user_id, name)).fetchone() + + if row: + conn.execute(""" + UPDATE user_bookmarks + SET usage_count = usage_count + 1, last_used = ? + WHERE user_id = ? AND name = ? + """, (time.time(), user_id, name)) + return dict(row) + return None + + def list_bookmarks(self, user_id: str) -> List[Dict]: + """List all bookmarks for user""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM user_bookmarks + WHERE user_id = ? + ORDER BY usage_count DESC, name ASC + """, (user_id,)).fetchall() + return [dict(row) for row in rows] + + def remove_bookmark(self, user_id: str, name: str) -> bool: + """Remove bookmark""" + with self.get_connection() as conn: + conn.execute(""" + DELETE FROM user_bookmarks + WHERE user_id = ? AND name = ? + """, (user_id, name)) + return True + + # ========== NOTIFICATION RULE METHODS ========== + + def add_notification_rule(self, user_id: str, rule_type: str, value: str) -> int: + """Add notification rule (Safe: ignores duplicates)""" + with self.get_connection() as conn: + cursor = conn.execute(""" + INSERT OR IGNORE INTO notification_rules + (user_id, rule_type, value, created_at) + VALUES (?, ?, ?, ?) + """, (user_id, rule_type, value, time.time())) + return cast(int, cursor.lastrowid) if cursor.lastrowid is not None else 0 + + def get_notification_rules(self, user_id: str) -> List[Dict]: + """Get all notification rules for user""" + with self.get_connection() as conn: + rows = conn.execute(""" + SELECT * FROM notification_rules + WHERE user_id = ? + ORDER BY created_at DESC + """, (user_id,)).fetchall() + return [dict(row) for row in rows] + + def get_room_notification_rules(self, uids: List[str]) -> Dict[str, List[Dict]]: + """Batch fetch notification rules for multiple users. + SQL-SAFE: Chunks UIDs to prevent 'too many SQL variables' error in SQLite. + """ + if not uids: + return {} + + results = defaultdict(list) + # SQLite typically defaults to 999 or 32766, but we chunk at 500 for safety/perf + CHUNK_SIZE = 500 + + with self.get_connection() as conn: + for i in range(0, len(uids), CHUNK_SIZE): + chunk = cast(Any, uids)[i:i + CHUNK_SIZE] + placeholders = ','.join(['?'] * len(chunk)) + rows = conn.execute(f""" + SELECT * FROM notification_rules + WHERE user_id IN ({placeholders}) + """, chunk).fetchall() + + for row in rows: + results[row['user_id']].append(dict(row)) + return results + + def check_notification_match(self, recipient_id: str, rules: List[Dict], + message: str, room: str, sender_id: str, + sender_nickname: str, recipient_nickname: str) -> Optional[Dict]: + """Pure logic to check if rules match a message (No DB calls)""" + if recipient_id == sender_id: + return None + + for rule in rules: + if rule['rule_type'] == 'keyword': + if rule['value'].lower() in message.lower(): + return rule + + elif rule['rule_type'] == 'user': + if rule['value'] == sender_id: + return rule + + elif rule['rule_type'] == 'mention': + # Fix: Check if RECIPIENT is mentioned, not sender + if recipient_nickname and f"@{recipient_nickname.lower()}" in message.lower(): + return rule + + return None + + def remove_notification_rule(self, user_id: str, rule_id: int) -> bool: + """Remove notification rule""" + with self.get_connection() as conn: + result = conn.execute(""" + DELETE FROM notification_rules + WHERE id = ? AND user_id = ? + """, (rule_id, user_id)) + return result.rowcount > 0 + + # ========== OFFLINE MESSAGE METHODS ========== + # These methods are replaced by save_offline_message and get_offline_messages above + # def queue_offline_message(self, user_id: str, room: str, message_data: dict, + # ttl_hours: int = 72) -> None: + # """Queue message for offline user""" + # expires_at = time.time() + (ttl_hours * 3600) + + # with self.get_connection() as conn: + # conn.execute(""" + # INSERT INTO offline_messages + # (user_id, room, message, timestamp, expires_at) + # VALUES (?, ?, ?, ?, ?) + # """, (user_id, room, json.dumps(message_data), time.time(), expires_at)) + + # def get_offline_messages(self, user_id: str) -> List[Dict]: + # """Get undelivered offline messages""" + # with self.get_connection() as conn: + # rows = conn.execute(""" + # SELECT * FROM offline_messages + # WHERE user_id = ? AND delivered = 0 + # AND (expires_at IS NULL OR expires_at > ?) + # ORDER BY timestamp ASC + # """, (user_id, time.time())).fetchall() + # return [dict(row) for row in rows] + + # def mark_offline_delivered(self, msg_id: int) -> None: + # """Mark offline message as delivered""" + # with self.get_connection() as conn: + # conn.execute(""" + # UPDATE offline_messages + # SET delivered = 1, delivered_at = ? + # WHERE id = ? + # """, (time.time(), msg_id)) + + # def get_offline_count(self, user_id: str) -> int: + # """Get count of offline messages""" + # with self.get_connection() as conn: + # row = conn.execute(""" + # SELECT COUNT(*) as count FROM offline_messages + # WHERE user_id = ? AND delivered = 0 + # AND (expires_at IS NULL OR expires_at > ?) + # """, (user_id, time.time())).fetchone() + # return row['count'] if row else 0 + + # def clear_offline_messages(self, user_id: str) -> int: + # """Clear all offline messages for user""" + # with self.get_connection() as conn: + # cursor = conn.execute( + # "DELETE FROM offline_messages WHERE user_id = ? AND delivered = 0", + # (user_id,) + # ) + # return cursor.rowcount + + # ========== ROOM BACKUP METHODS ========== + + def create_room_backup(self, room: str, creator_id: str, password: Optional[str] = None) -> tuple: + """Create password-protected room backup""" + # Security: sanitize room name for filename even if validated at join + safe_room = "".join(c for c in room if c.isalnum() or c in ('-', '_')) + backup_id = f"backup_{int(time.time())}_{safe_room}" + # Use configured backup directory (Snyk fix) + backup_dir = self.backup_dir + os.makedirs(backup_dir, exist_ok=True) + + with self.get_connection() as conn: + room_info = conn.execute( + "SELECT * FROM rooms WHERE name = ?", + (room,) + ).fetchone() + + if not room_info: + return None, "Room not found" + + messages = conn.execute(""" + SELECT * FROM messages + WHERE room = ? AND deleted = 0 + ORDER BY timestamp ASC + """, (room,)).fetchall() + + announcement = conn.execute( + "SELECT * FROM room_announcements WHERE room = ?", + (room,) + ).fetchone() + + backup_data = { + 'backup_id': backup_id, + 'created_at': time.time(), + 'created_by': creator_id, + 'room': dict(room_info), + 'messages': [dict(m) for m in messages], + 'announcement': dict(announcement) if announcement else None, + 'message_count': len(messages) + } + + password_hash = None + if password: + salt = secrets.token_hex(16) + hash_obj = hashlib.pbkdf2_hmac( + 'sha256', + password.encode(), + salt.encode(), + 100000 + ) + password_hash = f"{salt}${hash_obj.hex()}" + backup_data['encrypted'] = True + + backup_file = os.path.join(backup_dir, f"{backup_id}.json") + with open(backup_file, 'w', encoding='utf-8') as f: + json.dump(backup_data, f, indent=2) + + conn.execute(""" + INSERT INTO room_backups + (backup_id, room, created_by, created_at, file_path, message_count, password_hash) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, (backup_id, room, creator_id, time.time(), backup_file, len(messages), password_hash)) + + return backup_id, f"Backup created: {backup_id}" + + def restore_room_backup(self, backup_id: str, password: Optional[str] = None) -> tuple: + """Restore room from password-protected backup. + + ATOMIC: Password verification and all data writes occur in a single + transaction to prevent TOCTOU races and partial-restore on crash. + """ + # Security: Redundant sanitization to ensure file stays within backup_dir + safe_id = Path(backup_id).name + backup_file = os.path.join(self.backup_dir, f"{safe_id}.json") + + if not os.path.exists(backup_file): + return False, "Backup file not found" + + # Parse JSON before opening DB to surface corruption early + try: + with open(backup_file, 'r', encoding='utf-8') as f: + backup_data = json.load(f) + except json.JSONDecodeError as e: + return False, f"Corrupt backup file: {e}" + + # Single atomic transaction: verify password + restore data in one commit + with self.get_connection() as conn: + try: + # Step 1: Password check (inside same transaction) + backup_record = conn.execute( + "SELECT password_hash FROM room_backups WHERE backup_id = ?", + (backup_id,) + ).fetchone() + + if backup_record and backup_record['password_hash']: + if not password: + return False, "Password required for this backup" + + stored = backup_record['password_hash'] + if '$' not in stored: + return False, "Malformed backup password record. Contact admin." + + salt, hash_value = stored.split('$', 1) + test_hash = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt.encode(), 100000 + ).hex() + + if not secrets.compare_digest(test_hash.encode(), hash_value.encode()): + return False, "Invalid backup password" + + # Step 2: Restore room row + room_name = backup_data['room']['name'] + room = backup_data['room'] + conn.execute(""" + INSERT OR REPLACE INTO rooms + (name, topic, is_private, password_hash, created_by, created_at, max_users, settings) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, (room['name'], room['topic'], room['is_private'], + room['password_hash'], room['created_by'], room['created_at'], + room['max_users'], room['settings'])) + + # Step 3: Restore messages (bulk insert, high performance) + msg_params = [ + ( + msg['message_id'], msg['room'], msg['nickname'], + msg['user_id'], msg['content'], msg['timestamp'], + msg['message_type'], msg.get('parent_id'), + msg.get('encrypted', 0), msg.get('deleted', 0) + ) + for msg in backup_data['messages'] + ] + cursor = conn.executemany(""" + INSERT OR IGNORE INTO messages + (message_id, room, nickname, user_id, content, timestamp, + message_type, parent_id, encrypted, deleted) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, msg_params) + restored = cursor.rowcount + + # Step 4: Restore announcement if present + if backup_data.get('announcement'): + ann = backup_data['announcement'] + conn.execute(""" + INSERT OR REPLACE INTO room_announcements + (room, content, created_by, created_at, expires_at, color) + VALUES (?, ?, ?, ?, ?, ?) + """, (ann['room'], ann['content'], ann['created_by'], + ann['created_at'], ann['expires_at'], ann.get('color', 'yellow'))) + + # Step 5: Mark backup as restored (same atomic transaction) + conn.execute(""" + UPDATE room_backups SET restored_at = ? WHERE backup_id = ? + """, (time.time(), backup_id)) + + return True, f"Restored {restored} messages to {room_name}" + + except KeyError as e: + # Explicitly rollback before returning False to prevent partial commit by get_connection + conn.rollback() + return False, f"Backup file is missing required field: {e}" + except Exception as e: + # Catch all other errors during restore to ensure rollback and clean error return + conn.rollback() + logger.error(f"FATAL Restore Error: {e}") + return False, f"Restore failed: {str(e)}" + + + def list_backups(self, room: Optional[str] = None) -> List[Dict]: + """List available backups""" + with self.get_connection() as conn: + query = "SELECT * FROM room_backups" + params = [] + + if room: + query += " WHERE room = ?" + params.append(room) + + query += " ORDER BY created_at DESC" + + rows = conn.execute(query, params).fetchall() + backups = [] + + for row in rows: + backup = cast(Dict[str, Any], dict(row)) + backup['created_str'] = datetime.fromtimestamp( + backup['created_at'] + ).strftime('%Y-%m-%d %H:%M:%S') + backup['has_password'] = backup['password_hash'] is not None + backups.append(backup) + + return backups + + # ========== RBAC METHODS ========== + + def set_room_role(self, room: str, user_id: str, role: str, assigned_by: str = "system") -> bool: + """Set a specific role for a user in a room""" + with self.get_connection() as conn: + conn.execute(""" + INSERT OR REPLACE INTO room_roles (room, user_id, role, assigned_at, assigned_by) + VALUES (?, ?, ?, ?, ?) + """, (room, user_id, role, time.time(), assigned_by)) + return True + + def get_user_room_role(self, room: str, user_id: str) -> Optional[str]: + """Get a user's role in a specific room""" + with self.get_connection() as conn: + row = conn.execute( + "SELECT role FROM room_roles WHERE room = ? AND user_id = ?", + (room, user_id) + ).fetchone() + return row['role'] if row else None + + def remove_room_role(self, room: str, user_id: str) -> bool: + """Remove a user's role from a room""" + with self.get_connection() as conn: + result = conn.execute( + "DELETE FROM room_roles WHERE room = ? AND user_id = ?", + (room, user_id) + ) + return result.rowcount > 0 + + def is_room_owner(self, room: str, user_id: str) -> bool: + """Check if user is the owner of the room""" + # Check database rooms table first (creator_id) + with self.get_connection() as conn: + row = conn.execute( + "SELECT created_by_id FROM rooms WHERE name = ?", + (room,) + ).fetchone() + if row and row['created_by_id'] == user_id: + return True + + # Then check room_roles table + return self.get_user_room_role(room, user_id) == 'owner' + + def is_room_moderator(self, room: str, user_id: str) -> bool: + """Check if user is a moderator in the room""" + if self.is_room_owner(room, user_id): + return True + return self.get_user_room_role(room, user_id) == 'moderator' + + # ========== SYSTEM STATS ========== + + def get_server_stats(self) -> Dict: + """Collect server-wide statistics from the database""" + stats = {} + with self.get_connection() as conn: + stats['total_users'] = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0] + stats['registered_users'] = conn.execute("SELECT COUNT(*) FROM users WHERE registered = 1").fetchone()[0] + stats['total_messages'] = conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0] + stats['total_rooms'] = conn.execute("SELECT COUNT(*) FROM rooms").fetchone()[0] + + # Database size + try: + stats['db_size_kb'] = os.path.getsize(self.db_path) // 1024 + except Exception: + stats['db_size_kb'] = 0 + + # Recent activity (last 24h) + day_ago = time.time() - 86400 + stats['recent_messages'] = conn.execute( + "SELECT COUNT(*) FROM messages WHERE timestamp > ?", (day_ago,) + ).fetchone()[0] + + return stats + + # ========== POUNCE METHODS ========== + + def add_pounce(self, pouncer_uid: str, target_uid: str) -> bool: + """Register interest in a user's return""" + with self.get_connection() as conn: + conn.execute(""" + INSERT OR IGNORE INTO user_pounces (pouncer_uid, target_uid, created_at) + VALUES (?, ?, ?) + """, (pouncer_uid, target_uid, time.time())) + return True + + def get_pounces_for_target(self, target_uid: str) -> List[str]: + """Get list of user IDs who are waiting for this user""" + with self.get_connection() as conn: + rows = conn.execute("SELECT pouncer_uid FROM user_pounces WHERE target_uid = ?", (target_uid,)).fetchall() + return [row['pouncer_uid'] for row in rows] + + def delete_pounce(self, pouncer_uid: str, target_uid: str) -> bool: + """Remove a pounce registration""" + with self.get_connection() as conn: + conn.execute("DELETE FROM user_pounces WHERE pouncer_uid = ? AND target_uid = ?", (pouncer_uid, target_uid)) + return True + + # ========== ROOM SETTINGS ========== + + def update_room_settings(self, room: str, settings: dict) -> bool: + """Persist room-specific configuration""" + settings_json = json.dumps(settings) + with self.get_connection() as conn: + conn.execute("UPDATE rooms SET settings = ? WHERE name = ?", (settings_json, room)) + return True + + def get_all_room_settings(self) -> Dict[str, dict]: + """Load all room settings into memory cache""" + results = {} + with self.get_connection() as conn: + rows = conn.execute("SELECT name, settings FROM rooms").fetchall() + for row in rows: + if row['settings']: + try: + results[row['name']] = json.loads(row['settings']) + except Exception: + results[row['name']] = {} + else: + results[row['name']] = {} + return results + + def list_rooms(self) -> List[Dict]: + """List all available rooms""" + with self.get_connection() as conn: + rows = conn.execute("SELECT * FROM rooms ORDER BY name ASC").fetchall() + return [dict(row) for row in rows] + + def update_room_topic(self, room: str, topic: str): + """Update room topic""" + with self.get_connection() as conn: + conn.execute("UPDATE rooms SET topic = ? WHERE name = ?", (topic, room)) + + def delete_room(self, room: str): + """Permanently delete a room""" + with self.get_connection() as conn: + conn.execute("DELETE FROM rooms WHERE name = ?", (room,)) + conn.execute("DELETE FROM messages WHERE room = ?", (room,)) + + +# ============================================================================ +# CHAT SERVER +# ============================================================================ + +class ChatServer: + """Main Chat Server Class""" + + def __init__(self, host: str = "0.0.0.0", port: int = 8765, + max_history: int = 100, use_ssl: bool = False, + ssl_cert: Optional[str] = None, ssl_key: Optional[str] = None, + admin_password: str = "", + max_message_length: int = 4096, + max_nickname_length: int = 32, + rate_limit_messages: int = 120, + rate_limit_window: int = 60): + self.host = host + self.port = port + self.max_history = max_history + self.use_ssl = use_ssl + self.ssl_cert = ssl_cert + self.ssl_key = ssl_key + self.admin_password = admin_password + self.max_message_length = max_message_length + self.max_nickname_length = max_nickname_length + self.rate_limit_messages = rate_limit_messages + self.rate_limit_window = rate_limit_window + + # Database + db_path = Config.DB_PATH if USE_CONFIG else "data/chat.db" + backup_dir = Config.BACKUP_DIR if USE_CONFIG else "data/backups" + self.db = DatabaseManager(db_path=db_path, backup_dir=backup_dir) + + # Rate limiter (Token Bucket: Burst capacity = 10, refill based on config) + refill_rate = rate_limit_messages / rate_limit_window if rate_limit_window > 0 else 2.0 + self.rate_limiter = RateLimiter(capacity=10, refill_rate=refill_rate) + + # Active connections + self.connections: Dict[str, Any] = {} + self.user_rooms: Dict[str, str] = {} + self.user_nicknames: Dict[str, str] = {} + self.nickname_to_uid: Dict[str, str] = {} # Rapid O(1) lookup + self.clients: Dict[str, Any] = {} # Store client objects + + # Security & Feature Caches + # NOTE: room_settings_cache is populated in start() via asyncio.to_thread + # after init_database() completes. Empty here to prevent blocking __init__. + self.room_settings_cache: Dict[str, dict] = {} + self.user_blocks_cache: Dict[str, Set[str]] = defaultdict(set) # Blocklist for fast message filtering + + # Room tracking + self.room_users: Dict[str, Set[str]] = defaultdict(set) + + # Audit & Connection Management + self.pending_nicks: Set[str] = set() # Prevent race conditions during handshake + self.reserved_nicks = {'admin', 'system', 'moderator', 'administrator', 'server', 'root'} + self.ip_connections: Dict[str, int] = defaultdict(int) + self.max_connections_per_ip = 20 + self.max_total_connections = 2500 # Feature 1: Total server capacity + # List of shared/school IPs that bypass the "Too many connections" IP block + self.whitelisted_ips: Set[str] = set() + if USE_CONFIG and hasattr(Config, 'IP_WHITELIST'): + whitelist_src = getattr(Config, 'IP_WHITELIST') + if whitelist_src: + self.whitelisted_ips.update(cast(Any, whitelist_src)) + + # Command locks to prevent race conditions during state transition + # WeakValueDictionary prunes locks when nobody is using them + self.session_locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary() + self._lock_creation_mutex: asyncio.Lock = asyncio.Lock() # Prevents lock creation races + + # Performance/Stress Caches + self.notification_rules_cache: Dict[str, List[Dict]] = {} # uid -> rules + self.history_cache: Dict[str, deque] = {} # room -> deque(max_history) + self.member_update_requests: Set[str] = set() # rooms needing member list broadcast + + # Premium/Moderation State + self.last_activity: Dict[str, float] = {} # user_id -> timestamp + self.vanished_uids: Set[str] = set() # Admins hidden from user lists + self.shadow_banned_uids: Set[str] = set() # Users whose messages are dropped silently + self.session_admins: Set[str] = set() # Temporary, non-persistent admins (guests) + + # Join Rate Limiter (Feature 2 - Protects DB/Hashing from flood) + # 5 joins per 60 seconds per IP, burst of 10 + self.join_rate_limiter = RateLimiter(capacity=10, refill_rate=5/60) + + # Command Registry + self.commands: Dict[str, Any] = {} + self._register_commands() + + # System status + self.start_time = time.time() + + # SSL context + self.ssl_context: Optional[ssl.SSLContext] = None + if self.use_ssl and self.ssl_cert and self.ssl_key: + try: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.load_cert_chain(certfile=cast(str, self.ssl_cert), keyfile=cast(str, self.ssl_key)) + self.ssl_context = ctx + logger.info(f"SSL enabled using cert: {self.ssl_cert}") + except Exception as e: + logger.error(f"Failed to load SSL certificates: {e}") + self.ssl_context = None + + async def get_session_lock(self, user_id: str) -> asyncio.Lock: + """Atomic fetch/create for session locks using WeakValueDictionary""" + async with self._lock_creation_mutex: + lock = self.session_locks.get(user_id) + if lock is not None: + return lock + new_lock = asyncio.Lock() + self.session_locks[user_id] = new_lock + return new_lock + return asyncio.Lock() # Fallback return to satisfy Pyre + + def _register_commands(self): + """Map command names to handler functions""" + # Account Management + self.commands['help'] = self.cmd_help + self.commands['register'] = self.cmd_register + self.commands['login'] = self.cmd_login + self.commands['identify'] = self.cmd_identify + self.commands['password'] = self.cmd_password + self.commands['nick'] = self.cmd_nick + self.commands['whoami'] = self.cmd_whoami + + # Room Management + self.commands['join'] = self.cmd_join + self.commands['leave'] = self.cmd_leave + self.commands['list'] = self.cmd_list + self.commands['users'] = self.cmd_users + self.commands['topic'] = self.cmd_topic + self.commands['deleteroom'] = self.cmd_deleteroom + self.commands['delroom'] = self.cmd_deleteroom + self.commands['delete-room'] = self.cmd_deleteroom + + # Invite System + self.commands['invite'] = self.cmd_invite + + # Messaging Features + self.commands['msg'] = self.cmd_msg + self.commands['whisper'] = self.cmd_msg + self.commands['w'] = self.cmd_msg + self.commands['reply'] = self.cmd_reply + self.commands['r'] = self.cmd_reply + self.commands['search'] = self.cmd_search + self.commands['find'] = self.cmd_search + + # Blocking + self.commands['ignore'] = self.cmd_ignore + self.commands['block'] = self.cmd_ignore + self.commands['unignore'] = self.cmd_unignore + self.commands['unblock'] = self.cmd_unignore + self.commands['ignorelist'] = self.cmd_ignorelist + self.commands['blocklist'] = self.cmd_ignorelist + self.commands['sync'] = self.cmd_sync # Client-side cache sync + self.commands['settings'] = self.cmd_settings # User preferences + + # Advanced Admin/Mods + self.commands['admin'] = self.cmd_admin + self.commands['user'] = self.cmd_user # Admin-only user management + self.commands['ban'] = self.cmd_ban + self.commands['unban'] = self.cmd_unban + self.commands['banlist'] = self.cmd_banlist + self.commands['mute'] = self.cmd_mute + self.commands['unmute'] = self.cmd_unmute + self.commands['whois'] = self.cmd_whois + self.commands['edit'] = self.cmd_edit + self.commands['delete'] = self.cmd_delete + self.commands['room'] = self.cmd_room_info + self.commands['history'] = self.cmd_history + self.commands['ping'] = self.cmd_ping + self.commands['clearhistory'] = self.cmd_clearhistory + self.commands['clear-all'] = self.cmd_clearhistory + self.commands['role'] = self.cmd_role # New RBAC command + self.commands['mod'] = self.cmd_mod # Room moderation toolkit (RBAC) + self.commands['mode'] = self.cmd_mode # Security mode + self.commands['onair'] = self.cmd_onair # Cross-room broadcast + self.commands['vanish'] = self.cmd_vanish # Admin visibility toggle + self.commands['shadowban'] = self.cmd_shadowban # Silent message suppression + self.commands['shad'] = self.cmd_shadowban + + # Advanced Utilities (Restored) + self.commands['alias'] = self.cmd_alias + self.commands['announce'] = self.cmd_announce + self.commands['schedule'] = self.cmd_schedule + self.commands['schedules'] = self.cmd_schedules + self.commands['cancel'] = self.cmd_cancel + self.commands['bookmark'] = self.cmd_bookmark + self.commands['export'] = self.cmd_export + self.commands['backup'] = self.cmd_backup + self.commands['notify'] = self.cmd_notify + + # Others + self.commands['quit'] = self.cmd_quit + self.commands['exit'] = self.cmd_quit + self.commands['clear'] = self.cmd_pass # Handled client-side + + async def cmd_pass(self, user_id, nickname, room, parts): + """No-op for commands handled elsewhere""" + pass + + async def cmd_help(self, user_id, nickname, room, parts): + """Display help information""" + help_text = """ +\u2554\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2557 +\u2551 CHAT COMMANDS \u2551 +\u2551 v2.1 Master \u2551 +\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563 +\u2551 [ 1. ACCOUNT & PROFILE ] \u2551 +\u2551 /register [email] - Register current nickname \u2551 +\u2551 /login - Login to a different account \u2551 +\u2551 /identify - Authenticate as current nickname \u2551 +\u2551 /password - Change your account password \u2551 +\u2551 /nick - Change nickname (temporary) \u2551 +\u2551 /whoami - Show your current profile info \u2551 +\u2551 /settings - View/edit your settings \u2551 +\u2551 \u2551 +\u2551 [ 2. ROOMS & NAVIGATION ] \u2551 +\u2551 /join [pass] [--e2ee] - Join/Create a room (--e2ee ruins plaintext) \u2551 +\u2551 /leave - Return to lobby \u2551 +\u2551 /list [filter] - List all public rooms \u2551 +\u2551 /users - Show users in current room \u2551 +\u2551 /topic - Update room topic \u2551 +\u2551 /deleteroom - Permanently delete current room \u2551 +\u2551 \u2551 +\u2551 [ 3. MESSAGING & PRIVACY ] \u2551 +\u2551 /msg - Private Msg (supports comma-separated nicks) \u2551 +\u2551 /reply - Reply to last whisper received \u2551 +\u2551 /ignore - Block a user (/block) \u2551 +\u2551 /unignore - Unblock a user (/unblock) \u2551 +\u2551 /ignorelist - List blocked users \u2551 +\u2551 /search - Search room history \u2551 +\u2551 /security - Quick security toggle (high|stealth|locked|open) \u2551 +\u2551 /crypt - Set the group/room password for SHARED mode \u2551 +\u2551 /sync - Sync history from a specific ID \u2551 +\u2551 \u2551 +\u2551 [ 4. SECURITY PROFILES ] \u2551 +\u2551 * high : E2EE (Private) + SHARED (Room) - Maximum Privacy \u2551 +\u2551 * stealth : E2EE (Private) + PLAIN (Room) - Hidden Private Chats \u2551 +\u2551 * locked : SHARED (Everything) - Forced Room Encryption \u2551 +\u2551 * open : PLAIN (Everything) - No Encryption \u2551 +\u2551 \u2551 +\u2551 [ 5. ADVANCED & ADMIN ] \u2551 +\u2551 /admin status - View server health \u2551 +\u2551 /admin list - List all registered users \u2551 +\u2551 /admin broadcast - Message EVERYONE \u2551 +\u2551 /admin kick - Disconnect a user \u2551 +\u2551 /admin grant - Grant administrator status \u2551 +\u2551 /admin clearhistory global - Wipe ALL rooms history \u2551 +\u2551 /clearhistory - Wipe current room history \u2551 +\u2551 /role - Assign moderator/owner/member \u2551 +\u2551 /quit - Disconnect and exit \u2551 +\u2551 \u2551 +\u2560\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2563 +\u2551 TIPS: \u2551 +\u2551 * Use TAB for command completion (on supported clients) \u2551 +\u2551 * Register your nickname to reserve it and gain persistent roles \u2551 +\u2551 * Server Admins are marked with """ + f"{E('ADM')}" + """ | Room Moderators are marked with """ + f"{E('MOD')}" + (""" \u2551 +""" if USE_EMOJIS else """ \u2551 +""") + """\u2551 * Use /mode [plain|shared|e2ee] to set your desired security levels \u2551 +\u2551 \u2551 +\u255a\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u255d +""" + await self.send_to_user(user_id, { + 'type': 'command_response', + 'message': help_text + }) + return user_id + async def cmd_register(self, user_id, nickname, room, parts): + """Register current nickname""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /register [email]'}) + return user_id + + password = parts[1] + email = parts[2] if len(parts) > 2 else None + + # Input Validation: Cap lengths to prevent oversized data insertion + if len(password) > 128: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Password too long (max 128 chars).'}) + return user_id + if email and len(email) > 255: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Email too long (max 255 chars).'}) + return user_id + + new_user_id, message = await asyncio.to_thread(cast(Any, self.db.register_user), nickname, password, email) + if new_user_id: + new_uid_str = cast(str, new_user_id) + # Sync session to new ID + await self._swap_user_session(user_id, new_uid_str) + + # Persist existing temporary rooms to this account (off-thread) + # MUST be a plain def, not async def, when used with asyncio.to_thread + def _claim_rooms(): + with self.db.get_connection() as conn: + conn.execute( + "UPDATE rooms SET expires_at = NULL, created_by_id = ? WHERE created_by_id = ?", + (new_uid_str, user_id) + ) + await asyncio.to_thread(cast(Any, _claim_rooms)) + + # Warm up cached data for this new user + asyncio.create_task(self._update_notification_cache(new_uid_str)) + + await self.send_to_user(new_uid_str, { + 'type': 'command_response', + 'message': f'{E("OK")} Registration successful! Your temporary rooms are now permanent.' + }) + return new_uid_str + else: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Registration failed: {message}'}) + return user_id + + async def cmd_identify(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Authenticate as registered user""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /identify '}) + return user_id + + password = parts[1] + new_user_id, message = await asyncio.to_thread(cast(Any, self.db.authenticate_user), nickname, password) + + if new_user_id: + if user_id != new_user_id: + await self._swap_user_session(user_id, new_user_id) + + # Warm up caches + asyncio.create_task(self._update_notification_cache(new_user_id)) + + await self.send_to_user(new_user_id, { + 'message': f'{E("OK")} Authentication successful! You are now verified.' + }) + return new_user_id + else: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Authentication failed: {message}'}) + return user_id + + async def cmd_login(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Register or login to an account""" + if len(parts) < 3: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /login '}) + return user_id + + target_username = parts[1] + password = parts[2] + + new_user_id, message = await asyncio.to_thread(cast(Any, self.db.authenticate_user), target_username, password) + if new_user_id: + old_nick = nickname + await self._swap_user_session(user_id, new_user_id, new_nickname=target_username) + + await self.send_to_user(new_user_id, { + 'type': 'command_response', + 'message': f'{E("OK")} Login successful! You are now logged in as {target_username}.' + }) + + if old_nick != target_username: + asyncio.create_task(self.broadcast_to_room(room, { + 'type': 'system', + 'timestamp': self.format_timestamp(), + 'message': f'{E("ADM")} {old_nick} is now known as {target_username} (logged in)' + })) + return new_user_id + else: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Login failed: {message}'}) + return user_id + + async def _swap_user_session(self, old_uid: str, new_uid: str, new_nickname: Optional[str] = None) -> None: + """Migrate all session data from old_uid to new_uid.""" + if old_uid == new_uid: + return + + target_nick = new_nickname or self.user_nicknames.get(old_uid) or "Anonymous" + if target_nick: + self.nickname_to_uid[target_nick.lower()] = new_uid + + if old_uid in self.connections: + self.connections[new_uid] = self.connections.pop(old_uid) + + self.user_nicknames.pop(old_uid, None) + self.user_nicknames[new_uid] = target_nick + + if old_uid in self.user_rooms: + current_room = self.user_rooms.pop(old_uid) + self.user_rooms[new_uid] = current_room + if current_room in self.room_users: + self.room_users[current_room].discard(old_uid) + self.room_users[current_room].add(new_uid) + + if old_uid in self.clients: + client_data = self.clients.pop(old_uid) + client_data['user_id'] = new_uid + client_data['nickname'] = target_nick + self.clients[new_uid] = client_data + + # Repopulate settings + settings_data = await asyncio.to_thread(cast(Any, self.db.get_user_settings), new_uid) + if settings_data and new_uid in self.clients: + merged = {'show_ids': False, 'theme': 'standard'} + merged.update(settings_data) + self.clients[new_uid]['settings'] = merged + # pyre-ignore[6] + await self.send_to_user(new_uid, {'type': 'data_update', 'settings': {'theme': cast(str, merged.get('theme', 'standard'))}}) + + # 2. Inform client of their new official nickname (Suggestion 1) + # This ensures they see themselves as 'Sharama2' in local echo if they were 'User745' + await self.send_to_user(new_uid, {'type': 'nick_update', 'nickname': target_nick}) + + self.user_blocks_cache.pop(old_uid, None) + await self._update_block_cache(new_uid) + + async def cmd_password(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Update your account password""" + if len(parts) < 3: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /password '}) + return user_id + + success, message = await asyncio.to_thread(cast(Any, self.db.change_password), user_id, parts[1], parts[2]) + status = "[OK]" if success else "[X]" + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{status} {message}'}) + return user_id + + async def cmd_nick(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Change current session nickname""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /nick '}) + return user_id + + new_nick = parts[1] + + # Validation + if len(new_nick) > self.max_nickname_length: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Nickname too long (max {self.max_nickname_length})'}) + return user_id + + if not re.match(r'^[a-zA-Z0-9_-]+$', new_nick): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Invalid nickname. Use only letters, numbers, underscore, and hyphen.'}) + return user_id + + if new_nick.lower() in self.nickname_to_uid or new_nick.lower() in self.pending_nicks: + if self.nickname_to_uid.get(new_nick.lower()) != user_id: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Nickname "{new_nick}" is already reserved or in use.'}) + return user_id + + # Security: Reserved Name Protection + if new_nick.lower() in self.reserved_nicks: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} This nickname is reserved for system use.'}) + return user_id + + # Atomic reservation -- holds the slot during the await below + self.pending_nicks.add(new_nick.lower()) + try: + # Security: Prevent taking a registered user's name + is_reg = await asyncio.to_thread(cast(Any, self.db.is_nickname_registered), new_nick) + if is_reg: + owner_id = await asyncio.to_thread(cast(Any, self.db.get_user_id_by_nickname), new_nick) + if owner_id != user_id: + self.pending_nicks.discard(new_nick.lower()) # FIX: Clear pending on early return + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Nickname "{new_nick}" is registered to another account.'}) + return user_id + except Exception: + # On any DB error, release reservation and bail + self.pending_nicks.discard(new_nick.lower()) + raise + + old_nick = nickname + self.user_nicknames[user_id] = new_nick + + # Atomically update global lookup ONLY if it's a real name change (not just casing) + if old_nick.lower() != new_nick.lower(): + self.nickname_to_uid[new_nick.lower()] = user_id + self.nickname_to_uid.pop(old_nick.lower(), None) + else: + # Case-only change: still update the mapping just in case, + # but don't pop since old_nick.lower() == new_nick.lower() + self.nickname_to_uid[new_nick.lower()] = user_id + + # State is committed -- NOW release the reservation + self.pending_nicks.discard(new_nick.lower()) + + asyncio.create_task(self.broadcast_to_room(room, { + 'type': 'system', + 'timestamp': self.format_timestamp(), + 'message': f"[*] {old_nick} is now known as {new_nick}" + })) + # Auto-update member lists + asyncio.create_task(self.broadcast_room_member_update(room)) + return user_id + + async def cmd_settings(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Manage user settings (Suggestion 6)""" + if user_id not in self.clients: + return user_id + + settings = self.clients[user_id].get('settings', { + 'show_ids': False, + 'theme': 'standard', + 'show_system': True, + 'compact_mode': False + }) + + if len(parts) == 1: + msg = f"{E('SETTINGS')} Your Current Settings:\n" + msg += f"- `show_ids`: {'[Enabled]' if settings.get('show_ids') else '[Disabled]'}\n" + msg += f"- `show_system`: {'[Enabled]' if settings.get('show_system', True) else '[Disabled]'}\n" + msg += f"- `compact_mode`: {'[Enabled]' if settings.get('compact_mode') else '[Disabled]'}\n" + msg += f"- `theme`: {settings.get('theme', 'standard')}\n" + msg += "\n*Example: /settings show_system false*" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + key = parts[1].lower() + if len(parts) < 3: + await self.send_to_user(user_id, {'type': 'error', 'message': f'Usage: /settings {key} '}) + return user_id + + val_str = parts[2].lower() + val_bool = val_str in ['true', 'on', 'yes', '1'] + + if key == 'show_ids': + settings['show_ids'] = val_bool + await self.send_to_user(user_id, {'type': 'command_response', 'message': f"{E('OK')} `show_ids` set to {val_bool}"}) + elif key == 'show_system': + settings['show_system'] = val_bool + await self.send_to_user(user_id, {'type': 'command_response', 'message': f"{E('OK')} `show_system` set to {val_bool}"}) + elif key == 'compact_mode': + settings['compact_mode'] = val_bool + await self.send_to_user(user_id, {'type': 'command_response', 'message': f"{E('OK')} `compact_mode` set to {val_bool}"}) + elif key == 'theme': + settings['theme'] = val_str + await self.send_to_user(user_id, {'type': 'command_response', 'message': f"{E('OK')} `theme` set to {val_str}"}) + # Push update to client + await self.send_to_user(user_id, { + 'type': 'data_update', + 'settings': {'theme': val_str} + }) + else: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Unknown setting: {key}'}) + + self.clients[user_id]['settings'] = settings + + # Persist to DB if registered + if await asyncio.to_thread(cast(Any, self.db.is_nickname_registered), nickname): + await asyncio.to_thread(cast(Any, self.db.update_user_settings), user_id, settings) + + return user_id + + async def cmd_whoami(self, user_id: str, nickname: str, room: str, parts: list): + """Display info about current user""" + # Both are DB reads - must be off-thread + is_registered, is_admin = await asyncio.gather( + asyncio.to_thread(cast(Any, self.db.is_nickname_registered), nickname), + asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id) + ) + role_str = "Admin" if is_admin else "User" + msg = ( + f"{E('PROFILE')} **Profile: {nickname}**\n" + f"- ID: {user_id}\n" + f"- Registered: {'Yes' if is_registered else 'No'}\n" + f"- System Role: {role_str}\n" + f"- Current Room: {room}" + ) + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_join(self, user_id, nickname, room, parts): + """Join a specific room""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /join [password]'}) + return user_id + + new_room = parts[1].lower() + password = None + security_mode = None + + # Parse flags and password + for p in parts[2:]: + if p.lower() == "--e2ee": + security_mode = "e2ee" + elif not password: + password = p + + success, message = await self.handle_join(user_id, nickname, new_room, password, security_mode=security_mode) + if not success: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Failed to join "{new_room}": {message}'}) + return user_id + + async def cmd_leave(self, user_id, nickname, room, parts): + """Leave current room and return to lobby""" + if room == 'lobby': + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} You are already in the lobby.'}) + return user_id + + await self.handle_join(user_id, nickname, 'lobby') + return user_id + + async def cmd_list(self, user_id, nickname, room, parts): + """List all active rooms""" + filter_str = parts[1].lower() if len(parts) > 1 else None + rooms = await asyncio.to_thread(cast(Any, self.db.list_rooms)) + + # Filter and format + room_list = [] + for r in rooms: + if filter_str and filter_str not in r['name'].lower(): + continue + + user_count = len(self.room_users.get(r['name'], set())) + lock = "[L]" if r['password_hash'] else "[ ]" + room_list.append(f"- {lock} **{r['name']}** ({user_count} users) - {r['topic'] or 'No topic'}") + + if not room_list: + msg = "No rooms found." + else: + msg = "Available Rooms:\n" + "\n".join(room_list) + + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_users(self, user_id: str, nickname: str, room: str, parts: list): + """List users in current room with status indicators""" + uids = list(self.room_users.get(room, set())) + now = time.time() + is_requesting_admin = await asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id) + + # Batch both DB checks per user in parallel to avoid per-iteration blocking + admin_checks = await asyncio.gather( + *[asyncio.to_thread(cast(Any, self.db.is_user_admin), uid) for uid in uids] + ) + mod_checks = await asyncio.gather( + *[asyncio.to_thread(cast(Any, self.db.is_room_moderator), room, uid) for uid in uids] + ) + + nicks = [] + for uid, is_admin, is_mod in zip(uids, admin_checks, mod_checks): + # Vanish check: Only admins can see other vanished admins + if uid in self.vanished_uids and not is_requesting_admin: + continue + + uid_str = str(uid) + nick = self.user_nicknames.get(uid_str, "Unknown") + + # Status Indicators + last_act = self.last_activity.get(uid_str, now) + status_prefix = "" + if (now - last_act) > 300: # 5 Minutes + status_prefix = "[IDLE] " + + perm_prefix = f"{E('ADM')} " if is_admin else (f"{E('MOD')} " if is_mod else "") + vanish_mark = " (H)" if uid in self.vanished_uids else "" + + nicks.append(f"{status_prefix}{perm_prefix}{nick}{vanish_mark}") + + msg = f"[LOBBY] Users in {room} ({len(nicks)}):\n" + ", ".join(sorted(nicks)) + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_vanish(self, user_id: str, nickname: str, room: str, parts: list): + """Admin Vanish: Hide from user lists""" + is_admin = await asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id) + if not is_admin: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Access denied.'}) + return user_id + + if user_id in self.vanished_uids: + self.vanished_uids.remove(user_id) + msg = f"{E('OK')} You are now VISIBLE." + else: + self.vanished_uids.add(user_id) + msg = f"{E('OK')} You are now VANISHED (Hidden from lists)." + + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + # Update everyone's tab completion + asyncio.create_task(self.broadcast_room_member_update(room)) + return user_id + + async def cmd_shadowban(self, user_id: str, nickname: str, room: str, parts: list): + """Shadowban a user: Their messages are dropped silently.""" + is_admin = await asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id) + if not is_admin: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Access denied.'}) + return user_id + + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /shadowban '}) + return user_id + + target_nick = parts[1] + target_uid = self.nickname_to_uid.get(target_nick.lower()) + if not target_uid: + await self.send_to_user(user_id, {'type': 'error', 'message': f'User "{target_nick}" not found.'}) + return user_id + + if target_uid in self.shadow_banned_uids: + self.shadow_banned_uids.remove(target_uid) + msg = f"{E('OK')} User {target_nick} is no longer shadow-banned." + else: + self.shadow_banned_uids.add(target_uid) + msg = f"{E('OK')} User {target_nick} is now SHADOW-BANNED." + + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_user(self, user_id: str, nickname: str, room: str, parts: list): + """Admin-only user management commands. Usage: /user list""" + # Require admin for all subcommands + is_admin = await asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id) + if not is_admin: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Access denied. Admin only.'}) + return user_id + + subcmd = parts[1].lower() if len(parts) > 1 else "" + + if subcmd == "list": + all_users = await asyncio.to_thread(cast(Any, self.db.list_registered_users)) + all_users = cast(Any, all_users)[:100] # Payload safety cap + if not all_users: + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{E("X")} No registered users found.'}) + return user_id + + # Build a fast lookup set of currently connected nicknames + online_nicks = {self.user_nicknames.get(uid) for uid in self.connections} + + msg = f"[LIST] All Users ({len(all_users)} shown):\n" + for u in all_users: + tag = "[ADM]" if u['is_admin'] else ("[Registered]" if u['registered'] else "[Guest]") + online = " (Online)" if u['nickname'] in online_nicks else " (Offline)" + last = datetime.fromtimestamp(u['last_seen']).strftime('%Y-%m-%d %H:%M') + msg += f" {tag} **{u['nickname']}**{online} | Last: {last}\n" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + else: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /user list'}) + + return user_id + + async def cmd_topic(self, user_id: str, nickname: str, room: str, parts: list): + """Change current room topic""" + # is_room_moderator is a DB read - must be off-thread + if not await asyncio.to_thread(cast(Any, self.db.is_room_moderator), room, user_id): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Only room owners or moderators can change the topic.'}) + return user_id + + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /topic '}) + return user_id + + new_topic = " ".join(cast(Any, parts)[1:]) + # Security: Strip control characters and potential ANSI escape sequences + new_topic = re.sub(r'[\x00-\x1f\x7f-\x9f\033\[]', '', cast(str, new_topic)).strip() + + # Cap topic length to prevent DoS-style oversized broadcasts + MAX_TOPIC_LEN = 200 + if len(new_topic) > MAX_TOPIC_LEN: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Topic too long (max {MAX_TOPIC_LEN} chars).'}) + return user_id + + await asyncio.to_thread(cast(Any, self.db.update_room_topic), room, new_topic) + await asyncio.to_thread(cast(Any, self.db.add_audit_log), user_id, nickname, "ROOM_TOPIC", details=f"Room: {room}, Topic: {new_topic}") + asyncio.create_task(self.broadcast_to_room(room, { + 'type': 'system', + 'timestamp': self.format_timestamp(), + 'message': f"[*] {nickname} changed the topic to: {new_topic}" + })) + return user_id + + async def cmd_deleteroom(self, user_id: str, nickname: str, room: str, parts: list): + """Delete a room (Owner/Admin only)""" + target_room = parts[1].lower() if len(parts) >= 2 else room + + # Both DB checks must be off-thread + is_admin, is_owner = await asyncio.gather( + asyncio.to_thread(cast(Any, self.db.is_user_admin), user_id), + asyncio.to_thread(cast(Any, self.db.is_room_owner), target_room, user_id) + ) + if not (is_admin or is_owner): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} You do not have permission to delete this room.'}) + return user_id + + if target_room == 'lobby': + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} The lobby cannot be deleted.'}) + return user_id + + # Notify everyone at once before shifting them + evac_msg = { + 'type': 'system', + 'timestamp': self.format_timestamp(), + 'message': f'{E("DOOR")} Room "{target_room}" was deleted by {nickname}. Moving everyone to lobby...' + } + await self.broadcast_to_room(target_room, evac_msg) + + # Evacuate users in parallel (Suggestion 5 focus on high-concurrency) + uids = list(self.room_users.get(target_room, set())) + if uids: + # We don't await the full join sequentially to avoid N-squared delays + migration_tasks = [ + self.handle_join(uid, self.user_nicknames.get(uid, 'Unknown'), 'lobby') + for uid in uids + ] + await asyncio.gather(*migration_tasks, return_exceptions=True) + + await asyncio.to_thread(cast(Any, self.db.delete_room), target_room) + await asyncio.to_thread(cast(Any, self.db.add_audit_log), user_id, nickname, "DELETE_ROOM", details=f"Room: {target_room}") + self.history_cache.pop(target_room, None) # Important: Clear memory cache too + self.room_settings_cache.pop(target_room, None) + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{E("OK")} Room "{target_room}" has been deleted.'}) + return user_id + + async def cmd_invite(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Generate an invite code for the current room""" + code = await asyncio.to_thread(cast(Any, self.db.create_invite), room, nickname) + msg = f"[MAIL] **Invite Code for {room}:** `{code}`\n- Expires in: 24 hours\n- Usage: 1-time use" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_msg(self, user_id: str, nickname: str, room: str, parts: List[str]) -> str: + """Send a private message""" + if len(parts) < 3: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /msg '}) + return user_id + + target_nick = parts[1] + msg_content = " ".join(cast(Any, parts)[2:]) + + target_uid = self.nickname_to_uid.get(target_nick.lower()) + + if target_uid: + # User is online + if target_uid == user_id: + await self.send_to_user(user_id, {'type': 'error', 'message': "Talking to yourself is a sign of madness."}) + return user_id + + # Blocklist Exception for whispering + if user_id in self.user_blocks_cache.get(target_uid, set()): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} User "{target_nick}" is not accepting your messages.'}) + return user_id + + # Determine security mode for the whisper. + # Whispers are always considered private, so we'll use 'e2ee' if the client supports it, + # otherwise 'plain' for the message content itself. + # The 'encrypted' flag indicates if the *content* of the message is encrypted. + # For /msg, the server doesn't decrypt, so if the client sends an encrypted message, + # it should be marked as such. However, the current implementation of /msg + # takes plain text. If the client sends an encrypted message via /msg, + # it would be treated as plain text by the server. + # For now, we'll assume /msg content is plain text unless explicitly marked by client. + # The 'mode' field here is more about the *channel* security, not the message content. + # For whispers, we can assume a 'private' mode. + + # The original instruction had a comment about `message` being decrypted. + # The server does not decrypt messages. If a client sends an encrypted message + # via /msg, the server treats the encrypted blob as the message content. + # The `is_encrypted` flag would need to be passed from the client for this to be accurate. + # Since /msg is a command, it's typically for plain text. + # For now, we'll set 'encrypted' to False and 'mode' to 'plain' for /msg, + # as the server isn't handling E2EE for this command directly. + # If the client sends an E2EE message via /msg, it should be handled by a different mechanism + # or the client should indicate it's encrypted. + + # Reverting the proposed change for 'encrypted' and 'mode' in cmd_msg + # as it implies server-side knowledge of encryption which is not present + # for /msg command content. + is_encrypted = False # Assuming /msg content is plain text unless client explicitly indicates otherwise + msg_to_send = { + 'type': 'message', + 'timestamp': self.format_timestamp(), + 'nickname': nickname, + 'message': f"[Whisper from {nickname}]: {msg_content}", + 'id': None, + 'encrypted': is_encrypted, + 'mode': 'private' + } + await self.send_to_user(target_uid, msg_to_send) + + await self.send_to_user(user_id, { + 'type': 'message', + 'timestamp': self.format_timestamp(), + 'nickname': target_nick, + 'message': f"[Whisper to {target_nick}]: {msg_content}", + 'id': None, + 'encrypted': is_encrypted, + 'mode': 'private' + }) + + # Track last whisper for easy reply -- Guarded against disconnects + if user_id in self.clients: + self.clients[user_id]['last_whisper_from'] = target_uid + if target_uid in self.clients: + self.clients[target_uid]['last_whisper_from'] = user_id + + else: + # User is offline - check if registered for offline delivery + recipient_id = await asyncio.to_thread(cast(Any, self.db.get_user_id_by_nickname), target_nick) + + if recipient_id: + # Save to offline mailbox + await asyncio.to_thread(cast(Any, self.db.save_offline_message), recipient_id, nickname, msg_content) + # Register a pounce + await asyncio.to_thread(cast(Any, self.db.add_pounce), user_id, recipient_id) + await self.send_to_user(user_id, { + 'type': 'command_response', + 'message': f"{E('X')} {target_nick} is offline. Message saved to mailbox and Pounce registered." + }) + else: + await self.send_to_user(user_id, {'type': 'error', 'message': f"{E('X')} User '{target_nick}' not found or offline."}) + + return user_id + + async def cmd_reply(self, user_id, nickname, room, parts): + """Reply to the last private message received""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /reply '}) + return user_id + + target_uid = self.clients[user_id].get('last_whisper_from') + if not target_uid or target_uid not in self.connections: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} No one has whispered you recently, or they went offline.'}) + return user_id + + target_nick = self.user_nicknames.get(target_uid) + return await self.cmd_msg(user_id, nickname, room, cast(List[str], ['msg', target_nick] + parts[1:])) + + async def cmd_search(self, user_id: str, nickname: str, room: str, parts: list): + """Search message history""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /search '}) + return user_id + + query = " ".join(cast(Any, parts)[1:]) + + # Cap query length to maintain performance + MAX_QUERY_LEN = 100 + if len(query) > MAX_QUERY_LEN: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Search term too long (max {MAX_QUERY_LEN} chars).'}) + return user_id + + results = await asyncio.to_thread(cast(Any, self.db.search_messages), query, room=room, limit=10) + + if not results: + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'No results found for "{query}" in this room.'}) + else: + msg = f"[SEARCH] **Search results for \"{query}\":**\n" + for r in reversed(results): + t = datetime.fromtimestamp(r['timestamp']).strftime('%H:%M:%S') + msg += f"[{t}] <{r['nickname']}> {r['content']}\n" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_ignore(self, user_id, nickname, room, parts): + """Block/Ignore a user""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /ignore '}) + return user_id + + target_nick = parts[1] + target_uid = await asyncio.to_thread(cast(Any, self.db.get_user_id_by_nickname), target_nick) + + if not target_uid: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} User "{target_nick}" not found.'}) + return user_id + + await asyncio.to_thread(cast(Any, self.db.add_block), user_id, target_uid, target_nick) + await self._update_block_cache(user_id) + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{E("OK")} You are now ignoring {target_nick}.'}) + return user_id + + async def cmd_unignore(self, user_id, nickname, room, parts): + """Unblock a user""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /unignore '}) + return user_id + + target_nick = parts[1] + target_uid = await asyncio.to_thread(cast(Any, self.db.get_user_id_by_nickname), target_nick) + + if not target_uid: + # Try to find by nickname in blocklist if ID lookup fails + await asyncio.to_thread(cast(Any, self.db.remove_block_by_nick), user_id, target_nick) + else: + await asyncio.to_thread(cast(Any, self.db.remove_block), user_id, target_uid) + + await self._update_block_cache(user_id) + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{E("OK")} You have unblocked {target_nick}.'}) + return user_id + + async def cmd_ignorelist(self, user_id, nickname, room, parts): + """Show list of ignored users""" + blocks = await asyncio.to_thread(cast(Any, self.db.get_blocks), user_id) + if not blocks: + await self.send_to_user(user_id, {'type': 'command_response', 'message': 'You are not ignoring anyone.'}) + else: + msg = "[BANNED] **Ignored Users:**\n" + ", ".join([b['blocked_nickname'] for b in blocks]) + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + async def cmd_sync(self, user_id, nickname, room, parts): + """Request incremental history sync (Suggestion 5)""" + if len(parts) < 2: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /sync '}) + return user_id + + try: + since_id = int(parts[1]) + except ValueError: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Invalid message ID.'}) + return user_id + + history = await asyncio.to_thread(cast(Any, self.db.get_message_history), room, limit=self.max_history, since_id=since_id) + if history: + await self.send_to_user(user_id, { + 'type': 'history', + 'messages': list(reversed(history)), + 'is_sync': True + }) + else: + await self.send_to_user(user_id, { + 'type': 'command_response', + 'message': f'{E("OK")} You are already up to date.' + }) + return user_id + + async def cmd_alias(self, user_id, nickname, room, parts): + """Manage command aliases""" + if len(parts) == 1: + aliases = await asyncio.to_thread(cast(Any, self.db.list_aliases), user_id) + if not aliases: + await self.send_to_user(user_id, {'type': 'command_response', 'message': '[POST] No aliases defined. Use /alias = '}) + return user_id + + msg = "[MARK] **Your Command Aliases:**\n" + for a in aliases: + msg += f"- `/{a['alias_name']}` -> `{a['command_text']}` (Used {a['usage_count']}x)\n" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + return user_id + + if len(parts) >= 4 and parts[2] == '=': + alias_name = parts[1].lower() + command_text = " ".join(parts[3:]) + + # L-2: Validate alias name -- must be safe for command dispatcher + if not re.match(r'^[a-zA-Z0-9_-]+$', alias_name): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Alias name may only contain letters, numbers, underscores, and hyphens.'}) + return user_id + + # L-1: Cap command text length to prevent DB/frame bloat + MAX_ALIAS_CMD_LEN = 200 + if len(command_text) > MAX_ALIAS_CMD_LEN: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Alias command too long (max {MAX_ALIAS_CMD_LEN} chars).'}) + return user_id + + # Security: block aliases that invoke privileged commands + BLOCKED_ALIAS_PREFIXES = ('/admin', '/op', '/ban', '/kick', '/clearhistory') + if any(command_text.lstrip('/').lower().startswith(p.lstrip('/')) for p in BLOCKED_ALIAS_PREFIXES): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Aliases cannot invoke privileged commands.'}) + return user_id + + success, message = await asyncio.to_thread(cast(Any, self.db.add_alias), user_id, alias_name, command_text) + status = "[OK]" if success else "[X]" + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{status} {message}'}) + + elif parts[1].lower() == "remove" and len(parts) >= 3: + await asyncio.to_thread(cast(Any, self.db.remove_alias), user_id, parts[2].lower()) + await self.send_to_user(user_id, {'type': 'command_response', 'message': f'{E("OK")} Alias `/{parts[2]}` removed.'}) + else: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /alias = OR /alias remove '}) + return user_id + + async def cmd_announce(self, user_id, nickname, room, parts): + """Set or view room announcement""" + if len(parts) == 1: + ann = await asyncio.to_thread(cast(Any, self.db.get_announcement), room) + if ann: + msg = f"[BROADCAST] **Announcement for {room}:**\n{ann['content']}\n- Set by {ann['created_by']} at {datetime.fromtimestamp(ann['created_at']).strftime('%H:%M')}" + await self.send_to_user(user_id, {'type': 'command_response', 'message': msg}) + else: + await self.send_to_user(user_id, {'type': 'command_response', 'message': '[BROADCAST] No active announcement in this room.'}) + return user_id + + if not self.db.is_room_moderator(room, user_id): + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Only moderators can set announcements.'}) + return user_id + + if parts[1].lower() == "clear": + await asyncio.to_thread(cast(Any, self.db.clear_announcement), room) + asyncio.create_task(self.broadcast_to_room(room, {'type': 'system', 'message': '[BROADCAST] Announcement cleared.'})) + else: + content = " ".join(parts[1:]) + # Cap length to prevent DoS via massive room-wide broadcasts + MAX_ANNOUNCEMENT_LEN = 500 + if len(content) > MAX_ANNOUNCEMENT_LEN: + await self.send_to_user(user_id, {'type': 'error', 'message': f'{E("X")} Announcement too long (max {MAX_ANNOUNCEMENT_LEN} chars).'}) + return user_id + await asyncio.to_thread(cast(Any, self.db.set_announcement), room, content, nickname) + asyncio.create_task(self.broadcast_to_room(room, { + 'type': 'announcement', + 'content': content, + 'created_by': nickname, + 'timestamp': self.format_timestamp() + })) + return user_id + + async def cmd_schedule(self, user_id, nickname, room, parts): + """Schedule a message for later""" + if len(parts) < 3: + await self.send_to_user(user_id, {'type': 'error', 'message': 'Usage: /schedule