Files
Web-Line-Radio/client.py
2026-03-02 09:48:34 -05:00

1579 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Ultimate Chat Client Merged & Enhanced
- Secure Encryption (Random Salt per Message, AES-256)
- Chunked File Transfer via WebSocket
- Local History Persistence (SQLite)
- Server-synced Themes (matrix, cyberpunk, noir, standard)
- Offline Mailbox display (📬)
- ANSI colors on Windows
- Random nickname on empty input
- ASCII art from images (Pillow)
- Full server command support (/help, /join, /msg, /settings, etc.)
- Auto-reconnect with nickname conflict handling
- Full auth handshake for registered users
"""
import asyncio
import json
import time
import sys
import os
import random
import string
import zlib
import base64
import secrets
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Union, cast
# ===== Enable ANSI colors on Windows =====
if os.name == 'nt':
try:
import ctypes
kernel32 = cast(Any, ctypes).windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except Exception:
pass
import websockets # type: ignore
from websockets.exceptions import ConnectionClosed, WebSocketException # type: ignore
# Optional imports features gracefully disabled if not installed
try:
from cryptography.fernet import Fernet, InvalidToken # type: ignore
from cryptography.hazmat.primitives import hashes, serialization # type: ignore
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # type: ignore
from cryptography.hazmat.primitives.asymmetric import x25519 # type: ignore
from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore
from cryptography.hazmat.backends import default_backend # type: ignore
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
CRYPTOGRAPHY_AVAILABLE = False
try:
from PIL import Image # type: ignore
PILLOW_AVAILABLE = True
except ImportError:
PILLOW_AVAILABLE = False
try:
from prompt_toolkit import PromptSession, print_formatted_text, ANSI # type: ignore
from prompt_toolkit.patch_stdout import patch_stdout # type: ignore
from prompt_toolkit.completion import WordCompleter # type: ignore
from prompt_toolkit.key_binding import KeyBindings # type: ignore
PROMPT_TOOLKIT_AVAILABLE = True
except ImportError:
PROMPT_TOOLKIT_AVAILABLE = False
try:
from plyer import notification # type: ignore
PLYER_AVAILABLE = True
except ImportError:
PLYER_AVAILABLE = False
# ============================================================================
# ANSI color codes (static fallback)
# ============================================================================
class Colors:
HEADER = '\x1b[95m'
BLUE = '\x1b[94m'
CYAN = '\x1b[96m'
GREEN = '\x1b[92m'
YELLOW = '\x1b[93m'
RED = '\x1b[91m'
MAGENTA = '\x1b[95m'
RESET = '\x1b[0m'
BOLD = '\x1b[1m'
UNDERLINE = '\x1b[4m'
# ============================================================================
# Global State
# ============================================================================
RECONNECT_DELAY = 3
DEFAULT_SERVER = "wss://server.wholeworldcoding.com/radio"
nickname = ""
room = "lobby"
server_url = DEFAULT_SERVER
# Note: Security state is now managed by the 'security' object.
pending_file_transfers = {} # transfer_id -> offer dict
file_chunks_buffer = {} # transfer_id -> {chunks, total, filename, received_count}
outgoing_queue: Optional[asyncio.Queue] = None # Initialized in chat_client()
_LAST_PASS_USED: str = "" # Global for auth tracker
CLIENT_SETTINGS = {
'interactive_prompts': True,
'current_theme': 'standard',
'db_path': 'chat_history.db',
'show_ids': False,
'last_server': DEFAULT_SERVER,
'last_room': 'lobby',
'last_nick': '',
'last_pass': '',
'save_profile': False
}
recent_rooms: List[str] = [] # Track last 9 rooms for Alt+1-9 logic
_LAST_PASS_USED: str = ""
_SESSION_INSTANCE = None
def load_settings():
"""Load settings from local SQLite."""
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT key, value FROM settings")
for row in cursor:
k, v = row['key'], row['value']
if v.lower() == 'true': v = True
elif v.lower() == 'false': v = False
CLIENT_SETTINGS[k] = v
conn.close()
except Exception:
pass
def save_setting(key, value):
"""Save a single setting to local SQLite."""
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.execute("INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, str(value)))
conn.commit()
conn.close()
except Exception:
pass
# ============================================================================
# Theme Management
# ============================================================================
# Utilities
# ============================================================================
_SESSION_INSTANCE = None
def pt_prompt(prompt_str, is_password=False, default=""):
"""Helper to call PromptSession.prompt with fallback to input()."""
global _SESSION_INSTANCE
if PROMPT_TOOLKIT_AVAILABLE and _SESSION_INSTANCE:
try:
return cast(Any, _SESSION_INSTANCE).prompt(prompt_str, is_password=is_password, default=default)
except (KeyboardInterrupt, EOFError):
return None
else:
if is_password:
import getpass
return getpass.getpass(prompt_str)
return input(prompt_str)
def enable_windows_vt():
"""Enable Virtual Terminal Processing on Windows for ANSI support."""
if os.name == 'nt':
import ctypes
from ctypes import wintypes
kernel32 = getattr(ctypes, 'windll').kernel32
hOut = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
if hOut == -1: return False
mode = wintypes.DWORD()
if not kernel32.GetConsoleMode(hOut, ctypes.byref(mode)):
return False
# ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
if not kernel32.SetConsoleMode(hOut, mode.value | 0x0004):
return False
return True
return False
THEMES = {
'standard': {
'HEADER': '\x1b[95m', 'BLUE': '\x1b[94m', 'CYAN': '\x1b[96m',
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[93m','RED': '\x1b[91m',
'MAGENTA': '\x1b[95m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
},
'matrix': {
'HEADER': '\x1b[92m', 'BLUE': '\x1b[32m', 'CYAN': '\x1b[92m',
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[92m','RED': '\x1b[31m',
'MAGENTA': '\x1b[32m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
},
'cyberpunk': {
'HEADER': '\x1b[96m', 'BLUE': '\x1b[94m', 'CYAN': '\x1b[96m',
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[93m','RED': '\x1b[91m',
'MAGENTA': '\x1b[95m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
},
'noir': {
'HEADER': '\x1b[37m', 'BLUE': '\x1b[90m', 'CYAN': '\x1b[37m',
'GREEN': '\x1b[90m', 'YELLOW': '\x1b[37m','RED': '\x1b[97m',
'MAGENTA': '\x1b[90m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
},
}
def T() -> dict:
"""Return current theme color palette."""
return cast(dict, THEMES.get(cast(str, CLIENT_SETTINGS.get('current_theme', 'standard')), THEMES['standard']))
def apply_theme(name: str):
"""Switch the current UI theme."""
name = name.lower()
if name in THEMES:
CLIENT_SETTINGS['current_theme'] = name
tc = T()
cprint(f"{tc['GREEN']}Theme switched to: {name.upper()}{tc['RESET']}")
else:
tc = T()
cprint(f"{tc['RED']}Theme '{name}' not found. Available: {', '.join(THEMES.keys())}{tc['RESET']}")
# ----------------------------------------------------------------------------
# Security Infrastructure
# ----------------------------------------------------------------------------
MODE_PLAIN = "plain"
MODE_SHARED = "shared"
MODE_E2EE = "e2ee"
class SecurityManager:
"""Manages multi-modal encryption and per-peer session keys."""
def __init__(self):
self.mode = MODE_PLAIN
self.shared_passphrase = None
self.active_keys = {} # nickname -> Fernet object
self.outgoing_queue = None # Set later
if CRYPTOGRAPHY_AVAILABLE:
self.priv_key = x25519.X25519PrivateKey.generate()
self.pub_bytes = self.priv_key.public_key().public_bytes_raw()
else:
self.priv_key = None
self.pub_bytes = None
def set_mode(self, new_mode):
if new_mode not in (MODE_PLAIN, MODE_SHARED, MODE_E2EE):
return False, "Invalid mode"
if new_mode != MODE_PLAIN and not CRYPTOGRAPHY_AVAILABLE:
return False, "Cryptography library not installed"
# When switching modes, we preserve any E2EE peer keys we already have,
# but cleared room keys to force re-handshake if needed.
self.mode = new_mode
# Keep peer keys, clear specifically 'room' keys
# This allows hybrid usage (peer keys persist while room logic changes)
return True, f"Security mode set to {new_mode.upper()}"
def derive_shared(self, salt, name):
if not self.shared_passphrase: return
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(), length=32, salt=salt, iterations=200000
)
if self.shared_passphrase is None:
raise SecurityError("Shared passphrase not set.")
key = base64.urlsafe_b64encode(kdf.derive(cast(str, self.shared_passphrase).encode()))
self.active_keys[name] = Fernet(key)
def derive_e2ee(self, peer_pub_bytes, name):
peer_pub = x25519.X25519PublicKey.from_public_bytes(peer_pub_bytes)
shared_secret = self.priv_key.exchange(peer_pub)
hkdf = HKDF(algorithm=hashes.SHA256(), length=32, salt=None, info=b"clradio-e2ee")
key = base64.urlsafe_b64encode(hkdf.derive(shared_secret))
self.active_keys[name] = Fernet(key)
def encrypt(self, text, target):
"""Encrypt for room (SHARED) or peer (E2EE/SHARED_PRIVATE)."""
if self.mode == MODE_PLAIN: return text, False
# Use room key for broadcast, target key for private
key_name = target if target else room
key = self.active_keys.get(key_name)
if not key:
# Fallback to room for encryption
# 1. If in SHARED mode, we MUST use the room key
if self.mode == MODE_SHARED:
key = self.active_keys.get(room)
# 2. If in E2EE mode (Hybrid), we can use the room key if it exists (SHARED in Public)
elif self.mode == MODE_E2EE and not target:
key = self.active_keys.get(room)
if not key:
# If in E2EE mode but no key available for target/room, allow plain fallback for ROOM only
if self.mode == MODE_E2EE and not target:
return text, False
raise SecurityError(f"No active session key for {key_name}. Handshake required for private messages.")
return key.encrypt(text.encode()).decode(), True
def decrypt(self, token, sender):
"""Decrypt incoming payload."""
if not CRYPTOGRAPHY_AVAILABLE: return token
# Try per-sender key, then room key
key = self.active_keys.get(sender) or self.active_keys.get(room)
# If no key is found but it looks like a Fernet token, give a helpful placeholder
if not key:
if isinstance(token, str) and token.startswith('gAAAAA'):
return f"[🔒 Encrypted (SHARED)]"
return token
try:
# Handle potential string/bytes mismatch
token_bytes = cast(str, token).encode() if isinstance(token, str) else token
return key.decrypt(token_bytes).decode()
except Exception:
# If it's a Fernet token but we failed to decrypt (wrong password), mask it
if isinstance(token, str) and token.startswith('gAAAAA'):
return f"[🔒 Encrypted (SHARED) - Wrong Passphrase?]"
return token
class SecurityError(Exception): pass
security = SecurityManager()
room_nicks = set() # For TAB completion
# ============================================================================
# Local SQLite History
# ============================================================================
def init_db():
"""Initialize local SQLite database for message history."""
try:
db = cast(str, CLIENT_SETTINGS['db_path'])
conn = sqlite3.connect(db)
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
sender TEXT,
room TEXT,
content TEXT,
type TEXT DEFAULT 'message'
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS drafts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL,
room TEXT,
payload TEXT
)
''')
conn.commit()
conn.close()
load_settings()
except Exception as e:
cprint(f"{Colors.RED}[DB Error] Init failed: {e}{Colors.RESET}")
def save_message_local(sender: str, content: str, room_name: str, msg_type: str = 'message'):
"""Save a message to local history (non-blocking wrapper, call via asyncio.to_thread)."""
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.execute(
"INSERT INTO messages (timestamp, sender, room, content, type) VALUES (?, ?, ?, ?, ?)",
(time.time(), sender, room_name, content, msg_type)
)
conn.commit()
conn.close()
except Exception:
pass # fail silently chat must not break on DB error
def get_local_history(limit: int = 50, filter_room: Optional[str] = None) -> list:
"""Return last N messages from local DB, chronological order. Optionally filtered by room."""
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.row_factory = sqlite3.Row
if filter_room:
rows = conn.execute(
"SELECT * FROM messages WHERE room = ? ORDER BY timestamp DESC LIMIT ?",
(filter_room.lower(), limit)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
conn.close()
return list(reversed(rows))
except Exception:
return []
def queue_draft_message(room_name: str, payload_dict: dict):
"""Save a fully formed JSON payload to local drafts while offline."""
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.execute(
"INSERT INTO drafts (timestamp, room, payload) VALUES (?, ?, ?)",
(time.time(), room_name, json.dumps(payload_dict))
)
conn.commit()
conn.close()
cprint(f"{Colors.YELLOW}[Offline] Message saved to drafts.{Colors.RESET}")
except Exception:
cprint(f"{Colors.RED}[Offline] Failed to save draft.{Colors.RESET}")
def get_and_clear_drafts() -> list:
"""Retrieve all offline drafts and empty the queue."""
drafts = []
try:
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT id, room, payload FROM drafts ORDER BY timestamp ASC").fetchall()
for r in rows:
drafts.append(r['payload'])
conn.execute("DELETE FROM drafts")
conn.commit()
conn.close()
except Exception:
pass
return drafts
# ============================================================================
# Encryption Per-Message Random Salt (AES-256 via Fernet)
# ============================================================================
# Legacy encryption functions removed in favor of SecurityManager
# ============================================================================
# ASCII Art
# ============================================================================
def image_to_ascii(image_path: str, width: int = 60) -> str:
if not PILLOW_AVAILABLE:
return ""
try:
img = Image.open(image_path)
w, h = img.size
# Maintain aspect ratio for standard terminal fonts (roughly 1:2 w/h)
new_h = int(h / w * width * 0.5)
if new_h <= 0:
new_h = 1
img = img.resize((width, new_h))
img = img.convert('L') # Greyscale
# Standard ASCII gradient from dark to light
chars = "@%#*+=-:. "
# Calculate pixel-to-char mapping safely
pixel_data = list(img.getdata())
# Mapping 0-255 luminance to 0-9 indices in `chars`
pixels = "".join(chars[min(9, int((p / 255.0) * 9.99))] for p in pixel_data)
# Slicing the joined string into lines safely
lines = []
for i in range(0, len(pixels), width):
lines.append(cast(Any, pixels)[i:i+width])
return "\n" + "\n".join(lines)
except Exception as e:
return f"{Colors.RED}ASCII Error: {e}{Colors.RESET}"
# ============================================================================
# File Transfer
# ============================================================================
async def send_file_data(offer: dict, _target_nick: str):
"""Read file in 16 KB chunks and push to the outgoing_queue."""
filepath = offer.get('filepath', '')
tid = offer.get('transfer_id', '')
if not filepath or not os.path.exists(filepath):
cprint(f"{T()['RED']}File not found for transfer: {filepath}{T()['RESET']}")
return
try:
filesize = os.path.getsize(filepath)
chunk_size = 16 * 1024
total = (filesize + chunk_size - 1) // chunk_size
fname = os.path.basename(filepath)
cprint(f"{T()['GREEN']}Sending {fname} ({filesize:,} bytes, {total} chunks)...{T()['RESET']}")
with open(filepath, 'rb') as f:
idx = 0
while True:
chunk = f.read(chunk_size)
if not chunk:
break
await cast(Any, outgoing_queue).put({
"type": "file_chunk",
"transfer_id": tid,
"chunk_idx": idx,
"total_chunks": total,
"filename": fname,
"target": offer.get('target', ''),
"data": base64.b64encode(chunk).decode()
})
idx += 1
await asyncio.sleep(0.005)
if idx % 20 == 0:
sys.stdout.write(f"\rSending: {idx*100//total}% ")
sys.stdout.flush()
cprint(f"\n{T()['GREEN']}✅ Transfer complete: {fname}{T()['RESET']}")
except Exception as e:
cprint(f"{T()['RED']}Transfer failed: {e}{T()['RESET']}")
def handle_file_chunk(data: dict):
"""Buffer incoming file chunks and reassemble on completion."""
tid = data.get('transfer_id', '')
idx = data.get('chunk_idx', 0)
total = data.get('total_chunks', 1)
fname = data.get('filename', 'received_file')
b64_data = data.get('data', '')
buf = cast(Dict[str, Any], file_chunks_buffer.setdefault(tid, {
'chunks': {}, 'total': total, 'filename': fname, 'received_count': 0
}))
if idx not in cast(Dict[Any, Any], buf['chunks']):
cast(Dict[Any, Any], buf['chunks'])[idx] = base64.b64decode(b64_data)
buf['received_count'] = cast(int, buf['received_count']) + 1
received_count = cast(int, buf['received_count'])
if received_count % 20 == 0:
sys.stdout.write(f"\rReceiving {fname}: {received_count*100//total}% ")
sys.stdout.flush()
if buf['received_count'] >= total:
cprint("")
_save_received_file(tid)
def _save_received_file(tid: str):
"""Write buffered chunks to disk in order."""
buf = cast(Any, file_chunks_buffer).get(tid)
if not buf:
return
# Sanitize filename to prevent path traversal
raw_fname = buf.get('filename', 'received_file')
safe_fname = Path(raw_fname).name
save_path = Path("downloads") / f"received_{safe_fname}"
# Ensure downloads directory exists
try:
save_path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
pass # Fallback to current dir if cannot create
save_path = Path(f"received_{safe_fname}")
try:
with open(save_path, 'wb') as f:
for i in range(buf['total']):
chunk = buf['chunks'].get(i)
if chunk is None:
cprint(f"{T()['RED']}Missing chunk {i}! File may be corrupted.{T()['RESET']}")
return
f.write(chunk)
cprint(f"{T()['GREEN']}📁 Saved -> {os.path.abspath(save_path)}{T()['RESET']}")
file_chunks_buffer.pop(tid, None)
except Exception as e:
cprint(f"{T()['RED']}Failed to save file: {e}{T()['RESET']}")
# ============================================================================
# Setup Wizard
# ============================================================================
def setup_wizard():
global nickname, server_url, room
init_db()
load_settings()
tc = T()
cprint(f"{tc['HEADER']}" + ""*48 + "" + f"{tc['RESET']}")
cprint(f"{tc['HEADER']}" + f"{tc['BOLD']} ULTIMATE RADIO CLIENT v2.5 MASTER {tc['RESET']}{tc['HEADER']}{tc['RESET']}")
cprint(f"{tc['HEADER']}" + ""*48 + "" + f"{tc['RESET']}")
cprint(f"{tc['HEADER']}{tc['RESET']} Premium P2P & WebSocket Encrypted Bridge {tc['HEADER']}{tc['RESET']}")
cprint(f"{tc['HEADER']}" + ""*48 + "" + f"{tc['RESET']}\n")
if len(sys.argv) > 1:
server_url = sys.argv[1]
else:
default_srv = CLIENT_SETTINGS.get('last_server', DEFAULT_SERVER)
url_in = input(f"Server URL (Default: {default_srv}): ").strip()
server_url = url_in if url_in else default_srv
save_setting('last_server', server_url)
# Nickname logic with saved profile support
last_nick = CLIENT_SETTINGS.get('last_nick', '')
if last_nick:
nick_in = input(f"Nickname (Default: {last_nick}): ").strip()
nickname = nick_in if nick_in else last_nick
else:
nick_in = input(f"Nickname (Enter for random): ").strip()
if not nick_in:
nickname = "User" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
else:
nickname = nick_in
default_rm = CLIENT_SETTINGS.get('last_room', 'lobby')
room_in = input(f"Room (Default: {default_rm}): ").strip()
room = room_in if room_in else default_rm
save_setting('last_room', room)
if CRYPTOGRAPHY_AVAILABLE:
cprint(f"\n{tc['CYAN']}Select Security Mode:{tc['RESET']}")
cprint(f" [1] {tc['BOLD']}PLAIN{tc['RESET']} - No encryption")
cprint(f" [2] {tc['BOLD']}SHARED{tc['RESET']} - Symmetric (Group Passphrase)")
cprint(f" [3] {tc['BOLD']}E2EE{tc['RESET']} - Point-to-Point (X25519 Handshake)")
mode_in = input("Choice [1]: ").strip()
if mode_in == '2':
import getpass
pwd = getpass.getpass("Shared passphrase: ")
if pwd:
cast(Any, security).shared_passphrase = pwd
security.set_mode(MODE_SHARED)
cprint(f"{tc['GREEN']}✅ Mode: SHARED (AES-256 enabled){tc['RESET']}")
elif mode_in == '3':
security.set_mode(MODE_E2EE)
cprint(f"{tc['GREEN']}✅ Mode: E2EE (X25519 sessions active){tc['RESET']}")
else:
security.set_mode(MODE_PLAIN)
cprint(f"{tc['YELLOW']}✅ Mode: PLAIN{tc['RESET']}")
else:
cprint(f"{tc['YELLOW']}Tip: pip install cryptography to enable E2EE/SHARED modes.{tc['RESET']}")
# Remember Me / Account Auto-login
save_pref = CLIENT_SETTINGS.get('save_profile', False)
save_input = input(f"Save credentials for fast login? (y/N) [Default: {'Yes' if save_pref else 'No'}]: ").strip().lower()
do_save = save_pref
if save_input == 'y': do_save = True
elif save_input == 'n': do_save = False
if do_save:
save_setting('save_profile', True)
save_setting('last_nick', nickname)
# We don't save the password yet here because we haven't asked for it.
# But we'll mark it to be saved after a successful login in the command handler.
else:
save_setting('save_profile', False)
save_setting('last_nick', '')
save_setting('last_pass', '')
cprint(f"\n{tc['GREEN']}Connecting to {server_url} as '{nickname}' in '{room}'...{tc['RESET']}")
cprint(f"{tc['CYAN']}Type /help for server commands. Local: /history, /theme, /mode, /crypt, /security, /clear{tc['RESET']}")
cprint("-" * 50)
# ============================================================================
# Utilities
# ============================================================================
def cprint(text: str):
"""Prints text with ANSI colors, using prompt_toolkit if available."""
# Ensure raw \x1b is used, not escaped literal \\x1b
if isinstance(text, bytes):
text = text.decode('utf-8', errors='replace')
if PROMPT_TOOLKIT_AVAILABLE:
try:
with patch_stdout():
print_formatted_text(ANSI(text))
return
except Exception:
pass
# Fallback: manually strip ANSI if it looks like the terminal is broken
# but for now we try to support it via enable_windows_vt()
print(text)
def display_message(data: dict):
"""Format and print a server message, respecting the active theme."""
global nickname, room, room_nicks
# Handle raw history rows (no 'type' key)
if 'type' not in data:
raw_ts = data.get('timestamp', time.time())
try:
# Handle mixed timestamp types from database/JSON
if isinstance(raw_ts, str):
if ':' in raw_ts:
ts = raw_ts # Already formatted HH:MM:SS
else:
ts = datetime.fromtimestamp(float(raw_ts)).strftime('%H:%M:%S')
else:
ts = datetime.fromtimestamp(float(raw_ts)).strftime('%H:%M:%S')
except:
ts = datetime.now().strftime('%H:%M:%S')
data = {
'type': 'message',
'timestamp': ts,
'nickname': data.get('nickname') or data.get('sender') or '?',
# Robust content extraction: try every possible key used by server or legacy DB
'message': data.get('message') or data.get('content') or data.get('text') or '',
'id': data.get('id'),
'encrypted': bool(data.get('encrypted', 0)),
'mode': data.get('mode', 'plain')
}
tc = T()
t = data.get('type')
# -- System --------------------------------------------------------------
if t == 'system':
text = str(data.get('message') or '')
if 'joined' in text:
color = tc['GREEN']
elif 'left' in text or 'disconnect' in text.lower():
color = tc['RED']
else:
color = tc['YELLOW']
cprint(f"{color}{text}{tc['RESET']}")
# Track local room state
if 'You are now known as' in text:
# Atomic update of global nickname to keep state in sync with server
nickname = text.split('as ')[-1].strip('.')
elif 'Joined room:' in text:
new_room = text.split(': ')[-1].strip()
# Update recent rooms for Alt+1-9 hotkeys
if new_room != room:
if room and room not in recent_rooms:
recent_rooms.insert(0, room)
if len(recent_rooms) > 9:
recent_rooms.pop()
room = new_room
return
# -- Chat Message ---------------------------------------------------------
elif t == 'message':
content = str(data.get('message') or '')
timestamp = data.get('timestamp', datetime.now().strftime('%H:%M:%S'))
sender = data.get('nickname', '?')
# Skip own echo (we have local echo)
# We use a robust comparison to handle any normalization the server might do
my_nick = (nickname or "").lower().strip()
sender_nick = (sender or "").lower().strip()
if my_nick and sender_nick == my_nick and not str(content or "").startswith('/me'):
return
# Decrypt if needed
is_encrypted = data.get('encrypted', False)
# Auto-detect encrypted content even if not explicitly tagged (legacy support)
if not is_encrypted and isinstance(content, str) and content.startswith('gAAAAA'):
is_encrypted = True
sec_mode = data.get('mode', 'plain')
if is_encrypted:
content = security.decrypt(content, sender)
# Security Fallback: If content somehow became empty, show the raw message or a placeholder
if not content and data.get('message'):
content = str(data.get('message') or '')
if not content:
msg_id = data.get('id', '??')
content = f"{tc['YELLOW']}[No content for msg #{msg_id}]{tc['RESET']}"
content_str = cast(str, content)
# Whisper styling
if content_str.startswith('[Whisper from') or content_str.startswith('[Whisper to'):
prefix = f"{tc['MAGENTA']}[E2EE] " if sec_mode == MODE_E2EE else ""
cprint(f"{tc['MAGENTA']}{tc['BOLD']}[{timestamp}] {prefix}{content_str}{tc['RESET']}")
asyncio.get_event_loop().run_in_executor(
None, cast(Any, save_message_local), sender, content_str, room, 'whisper'
)
return
# Mention alert
mention_color = ""
if f"@{nickname}" in content_str:
mention_color = tc['MAGENTA']
# Per-user color (skip in mono themes)
if CLIENT_SETTINGS.get('current_theme') in ('matrix', 'noir'):
sender_color = tc['CYAN']
else:
palette = [tc['CYAN'], tc['BLUE'], tc['YELLOW'], tc['GREEN'], tc['MAGENTA'], tc['HEADER']]
sender_color = palette[sum(ord(c) for c in sender) % len(palette)]
# Optional message ID prefix
msg_id = data.get('id')
id_str = f"{tc['HEADER']}[{msg_id}]{tc['RESET']} " if msg_id and CLIENT_SETTINGS.get('show_ids') else ""
# Use 🌐 icon to indicate server-confirmed message
line = f"{tc['YELLOW']}[{timestamp}]{tc['RESET']} 🌐 {id_str}{sender_color}[{sender}]{tc['RESET']}: {mention_color}{content_str}{tc['RESET'] if mention_color else ''}"
cprint(line)
# Save to local history asynchronously
asyncio.get_event_loop().run_in_executor(
None, save_message_local, sender, content_str, room
)
# Feature 6: Desktop Notifications (Suggestion 6)
if PLYER_AVAILABLE and sender != nickname:
should_notify = False
if f"@{nickname}" in content_str:
should_notify = True
if should_notify:
try:
notification.notify(
title=f"Radio - {sender}",
message=content_str[:150], # type: ignore
app_name="Radio",
timeout=5
)
except Exception: pass
return
# -- Command Response ------------------------------------------------------
elif t == 'command_response':
cprint(f"{tc['CYAN']}{data.get('message', '')}{tc['RESET']}")
return
# -- Error -----------------------------------------------------------------
elif t == 'error':
cprint(f"{tc['RED']}{data.get('message', '')}{tc['RESET']}")
return
# -- Offline Mailbox -------------------------------------------------------
elif t == 'offline':
ts = data.get('timestamp', '??:??')
sender = data.get('nickname', '?')
content = data.get('message', '')
cprint(f"{tc['MAGENTA']}📬 [OFFLINE {ts}] {tc['BOLD']}{sender}{tc['RESET']}{tc['MAGENTA']}: {content}{tc['RESET']}")
return
# -- Announcement ----------------------------------------------------------
elif t == 'announcement':
cprint(f"{tc['YELLOW']}📢 {data.get('content', '')} (by {data.get('created_by', 'Admin')}){tc['RESET']}")
return
# -- Notification ----------------------------------------------------------
elif t == 'notification':
msg = data.get('message', '')
sender = data.get('from', 'System')
room_name = data.get('room', '??')
cprint(f"{tc['HEADER']}🔔 NOTIFICATION: {tc['BOLD']}{sender}{tc['RESET']}{tc['HEADER']} in #{room_name}: {msg}{tc['RESET']}")
return
# -- History Bundle --------------------------------------------------------
elif t == 'history':
msgs = data.get('messages', [])
cprint(f"\n{tc['HEADER']}-- Room History (last {len(msgs)}) --{tc['RESET']}")
for msg in msgs:
display_message(msg)
cprint(f"{tc['HEADER']}--------------------------------------{tc['RESET']}\n")
return
# -- Edit/Delete Notifications ---------------------------------------------
elif t == 'message_edit':
msg_id = data.get('id')
new_text = data.get('message')
editor = data.get('edited_by')
cprint(f"{tc['CYAN']}📝 Message #{msg_id} was edited by {editor}: {new_text}{tc['RESET']}")
return
elif t == 'message_delete':
msg_id = data.get('id')
remover = data.get('deleted_by')
cprint(f"{tc['RED']}🗑️ Message #{msg_id} was deleted by {remover}{tc['RESET']}")
return
# -- File Events -----------------------------------------------------------
elif t == 'file_offer':
if data.get('sub') == 'chunk':
handle_file_chunk(data)
else:
tid = data.get('transfer_id', '')
cprint(f"{tc['HEADER']}📁 File offer from {data.get('from')}: {data.get('filename')} ({data.get('size', 0):,} bytes){tc['RESET']}")
cprint(f"{tc['HEADER']} Accept with: /accept {tid}{tc['RESET']}")
pending_file_transfers[tid] = data
return
elif t == 'file_chunk':
handle_file_chunk(data)
return
elif t == 'file_accept':
tid = data.get('transfer_id', '')
if tid in pending_file_transfers:
asyncio.ensure_future(send_file_data(pending_file_transfers[tid], data.get('target', '')))
return
# -- Clear History (admin action) ------------------------------------------
elif t == 'clear_history':
os.system('cls' if os.name == 'nt' else 'clear')
cprint(f"{tc['YELLOW']}🗑️ Chat history cleared by {data.get('cleared_by', 'admin')}{tc['RESET']}")
return
# -- Room Change -----------------------------------------------------------
elif t == 'room_change':
room = data.get('room', room)
settings = data.get('settings', {})
# Auto-initialize SHARED security if a persistent salt exists (Suggestion 1)
if security.mode == MODE_SHARED and settings.get('shared_salt') and security.shared_passphrase:
salt_hex = settings['shared_salt']
security.derive_shared(bytes.fromhex(salt_hex), room)
cprint(f"{tc['GREEN']}🔒 Room security synchronized with persistent salt.{tc['RESET']}")
if settings.get('security') == 'e2ee' and security.mode == MODE_PLAIN:
cprint(f"{tc['YELLOW']}🔒 This is a SECURE room. Enable /mode e2ee or /mode shared to participate.{tc['RESET']}")
return
# -- Server-Pushed Settings (theme sync) ----------------------------------
elif t == 'data_update':
settings = data.get('settings', {})
if 'theme' in settings:
new_theme = settings['theme'].lower()
if new_theme in THEMES:
CLIENT_SETTINGS['current_theme'] = new_theme
if 'nicks' in data:
room_nicks = set(data['nicks'])
return
# -- E2EE Handshakes ------------------------------------------------------
elif t == 'e2ee_handshake':
peer_name = data.get('from', '?')
pub_raw = base64.b64decode(cast(str, data.get('pub', '')))
security.derive_e2ee(pub_raw, peer_name)
cprint(f"{T()['GREEN']}🔒 Secure E2EE channel established with {peer_name}{T()['RESET']}")
return
elif t == 'shared_init':
peer_name = data.get('from', '?')
salt = base64.b64decode(data.get('salt', ''))
security.derive_shared(salt, room) # Update room key
cprint(f"{T()['GREEN']}🔒 Room security updated (SHARED-INIT by {peer_name}){T()['RESET']}")
return
# -- Identity Synchronization (Suggestion 1) ------------------------------
elif t == 'nick_update':
new_nick = data.get('nickname')
if new_nick:
nickname = str(new_nick)
cprint(f"{T()['CYAN']}👤 Identity updated: {T()['BOLD']}{nickname}{T()['RESET']}")
return
# -- Message ACK (Read Receipt) --------------------------------------------
elif t == 'msg_ack':
return
# -- Silently ignore internals ---------------------------------------------
else:
if t not in ('ping', 'pong', 'auth_response'):
pass # Uncomment for debug: cprint(f"[DEBUG] {data}")
return
# ============================================================================
# Input Handler
# ============================================================================
async def input_handler(websocket):
"""Read stdin lines, process local commands, forward server commands."""
global nickname, room
security.outgoing_queue = websocket # Bind for handshake relay
commands = [
'/help', '/join', '/nick', '/msg', '/whoami', '/settings', '/mode', '/crypt', '/security',
'/list', '/users', '/clear', '/quit', '/theme', '/history', '/e2ee', '/search', '/room',
'/deleteroom', '/delroom', '/admin', '/admin logs', '/admin vacuum'
]
# Feature 3: Rich Key-Bindings (Suggestion 3)
kb = KeyBindings() if PROMPT_TOOLKIT_AVAILABLE else None
if kb:
@kb.add('escape', 'u') # Alt + u
def _(event):
asyncio.create_task(websocket.send(json.dumps({"type": "message", "message": "/users"})))
event.app.current_buffer.text = ''
@kb.add('escape', 'c') # Alt + c
def _(event):
os.system('cls' if os.name == 'nt' else 'clear')
event.app.current_buffer.text = ''
# Alt + 1-9 for quick-switch rooms
for i in range(1, 10):
@kb.add('escape', str(i))
def _(event, i=i):
if len(recent_rooms) >= i:
target_room = recent_rooms[i-1]
# We send a join command
asyncio.create_task(websocket.send(json.dumps({
"type": "join", "nickname": nickname, "room": target_room
})))
# Clear current buffer
event.app.current_buffer.text = ''
session = None
if PROMPT_TOOLKIT_AVAILABLE:
import re
pat = re.compile(r'[/a-zA-Z0-9_\-]+')
completer = cast(Any, WordCompleter)(
commands + list(room_nicks),
pattern=pat,
ignore_case=True,
match_middle=False
)
session = cast(Any, PromptSession)(completer=completer, key_bindings=kb)
global _SESSION_INSTANCE
_SESSION_INSTANCE = session
loop = asyncio.get_running_loop()
while True:
try:
# Slimmer prompt: [ROOM/NICK]>
mode_sym = "🌐" if server_url.startswith('ws') else "📡"
prompt_str = f"[{room}/{nickname}]> "
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
try:
with patch_stdout():
line = await cast(Any, session).prompt_async(prompt_str, is_password=False)
except (EOFError, KeyboardInterrupt):
return # Exit client on Ctrl-D or Ctrl-C at prompt
else:
line = await loop.run_in_executor(None, input, prompt_str)
if line is None:
break
text = cast(str, line).strip()
if not text:
continue
# -- LOCAL-ONLY commands -------------------------------------------
if text.startswith('/'):
parts = cast(Any, text)[1:].split()
cmd = parts[0].lower() if parts else ''
# /clear clear terminal
if cmd == 'clear':
os.system('cls' if os.name == 'nt' else 'clear')
continue
# /history show local SQLite history
elif cmd == 'history':
limit = int(cast(List[str], parts)[1]) if len(cast(List[str], parts)) > 1 and cast(List[str], parts)[1].isdigit() else 50
rows = await loop.run_in_executor(None, get_local_history, limit, room)
tc = T()
cprint(f"\n{tc['HEADER']}-- Local History (last {len(rows)}) --{tc['RESET']}")
for row in rows:
ts = datetime.fromtimestamp(row['timestamp']).strftime('%H:%M:%S')
cprint(f"{tc['YELLOW']}[{ts}]{tc['RESET']} [{row['sender']}]: {row['content']}")
print()
continue
# /theme <name> switch theme locally
elif cmd == 'theme':
if len(parts) < 2:
cprint(f"Available themes: {', '.join(THEMES.keys())}")
else:
apply_theme(parts[1])
continue
# /ascii <path> [@nick] [width] send ascii art
elif cmd == 'ascii':
if not PILLOW_AVAILABLE:
cprint(f"{T()['RED']}Pillow not installed.{T()['RESET']}")
continue
if len(parts) < 2:
print("Usage: /ascii <image_path> [@nick] [width]")
continue
target = None
a_width = 60 # Default width
path_parts = parts[1:]
if path_parts and path_parts[-1].startswith('@'):
target = path_parts[-1][1:]
path_parts = path_parts[:-1]
if path_parts and path_parts[-1].isdigit():
a_width = int(path_parts[-1])
path_parts = path_parts[:-1]
f_path = " ".join(path_parts).strip().strip('"').strip("'")
if not f_path:
cprint(f"{T()['RED']}Error: No image path provided.{T()['RESET']}")
continue
art = image_to_ascii(f_path, width=a_width)
if art:
ciphertext, enc = security.encrypt(art, target)
await websocket.send(json.dumps({
"type": "message",
"message": f"/msg {target} {ciphertext}" if target else ciphertext,
"encrypted": enc,
"mode": security.mode
}))
cprint(f"{T()['GREEN']}ASCII art sent to {target or room}:{T()['RESET']}")
print(art)
continue
# /sendfile <nick> <path> offer a file
elif cmd == 'sendfile':
if len(parts) < 3:
cprint(f"{T()['RED']}Usage: /sendfile <nickname> <filepath>{T()['RESET']}")
continue
target = parts[1]
filepath = ' '.join(parts[2:])
if not os.path.exists(filepath):
cprint(f"{T()['RED']}File not found: {filepath}{T()['RESET']}")
continue
tid = f"{nickname}_{int(time.time())}"
filesize = os.path.getsize(filepath)
# Store with local path so we can read on accept
pending_file_transfers[tid] = {
'filepath': filepath, 'target': target,
'transfer_id': tid, 'filename': os.path.basename(filepath)
}
await websocket.send(json.dumps({
"type": "file_offer",
"target": target,
"filename": os.path.basename(filepath),
"size": filesize,
"transfer_id": tid
}))
cprint(f"{T()['GREEN']}📤 File offer sent to {target}.{T()['RESET']}")
continue
# /accept <tid> accept a file offer
elif cmd == 'accept':
if len(parts) < 2:
cprint(f"{T()['RED']}Usage: /accept <transfer_id>{T()['RESET']}")
continue
tid = parts[1]
if tid in pending_file_transfers:
offer = pending_file_transfers[tid]
await websocket.send(json.dumps({
"type": "file_accept",
"transfer_id": tid,
"target": offer.get('from', offer.get('target', ''))
}))
cprint(f"{T()['GREEN']}✅ Accepted transfer {tid}.{T()['RESET']}")
else:
cprint(f"{T()['RED']}No pending transfer: {tid}{T()['RESET']}")
continue
# /color legacy colour picker (no-op, for compat)
elif cmd == 'color':
cprint(f"{T()['YELLOW']}Use /theme <name> to change look. /settings theme <name> for server sync.{T()['RESET']}")
continue
# /login interactive prompt with password masking
elif cmd == 'login':
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
try:
default_user = parts[1] if len(parts) > 1 else ""
u = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Username: ", default=default_user)))
if not u: continue
p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Password: ", is_password=True)))
if not p: continue
await websocket.send(json.dumps({'type': 'message', 'message': f'/login {u} {p}'}))
continue
except KeyboardInterrupt:
cprint(f"\n{T()['YELLOW']}Login cancelled.{T()['RESET']}")
continue
# /password interactive change with masking
elif cmd == 'password':
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
try:
old_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Current Password: ", is_password=True)))
if not old_p: continue
new_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("New Password: ", is_password=True)))
if not new_p: continue
conf_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Confirm New Password: ", is_password=True)))
if not conf_p: continue
if new_p != conf_p:
cprint(f"{T()['RED']}Passwords do not match!{T()['RESET']}")
continue
await websocket.send(json.dumps({'type': 'message', 'message': f'/password {old_p} {new_p}'}))
continue
except KeyboardInterrupt:
cprint(f"\n{T()['YELLOW']}Cancelled.{T()['RESET']}")
continue
# /mode [plain|shared|e2ee]
elif cmd == 'mode':
if len(parts) < 2:
cprint(f"{T()['YELLOW']}Current Security: {security.mode.upper()}{T()['RESET']}")
continue
success, msg = security.set_mode(parts[1].lower())
color = T()['GREEN'] if success else T()['RED']
cprint(f"{color}{msg}{T()['RESET']}")
# If we switched to SHARED, we need to init
if success and security.mode == MODE_SHARED and security.shared_passphrase:
salt = os.urandom(16)
security.derive_shared(salt, room)
await websocket.send(json.dumps({
"type": "shared_init", "salt": base64.b64encode(salt).decode()
}))
continue
# /crypt <passphrase>
elif cmd == 'crypt':
if len(parts) < 2:
cprint(f"{T()['RED']}Usage: /crypt <passphrase>{T()['RESET']}")
else:
security.shared_passphrase = parts[1]
cprint(f"{T()['GREEN']}Room/Group Passphrase updated. (Use /sec high or /sec locked to apply){T()['RESET']}")
continue
# /security [high|stealth|locked|open] (New User-Friendly Hybrid Command)
elif cmd in ('security', 'sec'):
tc = T()
if len(parts) < 2:
cprint(f"\n{tc['HEADER']}-- Security Profiles --{tc['RESET']}")
cprint(f" {tc['BOLD']}high{tc['RESET']} - E2EE (Private) + SHARED (Room)")
cprint(f" {tc['BOLD']}stealth{tc['RESET']} - E2EE (Private) + PLAIN (Room)")
cprint(f" {tc['BOLD']}locked{tc['RESET']} - SHARED (Everything)")
cprint(f" {tc['BOLD']}open{tc['RESET']} - PLAIN (Everything)")
cprint(f"\nCurrent Mode: {tc['CYAN']}{security.mode.upper()}{tc['RESET']}")
continue
sub = parts[1].lower()
if sub == 'high':
# E2EE Mode but with SHARED room fallback
security.mode = MODE_E2EE
cprint(f"{tc['GREEN']}Profile Set: HIGH SECURITY (Hybrid E2EE + SHARED){tc['RESET']}")
# Trigger shared init if possible
if security.shared_passphrase:
salt = os.urandom(16)
security.derive_shared(salt, room)
await websocket.send(json.dumps({"type": "shared_init", "salt": base64.b64encode(salt).decode()}))
elif sub == 'stealth':
security.mode = MODE_E2EE
cprint(f"{tc['GREEN']}Profile Set: STEALTH (E2EE Private + PLAIN Room){tc['RESET']}")
elif sub == 'locked':
security.mode = MODE_SHARED
cprint(f"{tc['GREEN']}Profile Set: LOCKED (Full SHARED Encryption){tc['RESET']}")
if security.shared_passphrase:
salt = os.urandom(16)
security.derive_shared(salt, room)
await websocket.send(json.dumps({"type": "shared_init", "salt": base64.b64encode(salt).decode()}))
elif sub == 'open':
security.mode = MODE_PLAIN
cprint(f"{tc['YELLOW']}Profile Set: OPEN (No Encryption){tc['RESET']}")
else:
cprint(f"{tc['RED']}Unknown profile. Use: /security [high|stealth|locked|open]{tc['RESET']}")
continue
# /msg private message (supports multiple targets: /msg nick1,nick2 msg)
elif cmd == 'msg':
if len(parts) > 2:
target_list = parts[1].split(',')
body = ' '.join(parts[2:])
for target in target_list:
target = target.strip()
if not target: continue
# Auto-handshake for E2EE if no key exists
if security.mode == MODE_E2EE and target not in security.active_keys:
await websocket.send(json.dumps({
"type": "e2ee_handshake", "target": target,
"pub": base64.b64encode(security.pub_bytes).decode()
}))
cprint(f"{T()['YELLOW']}Handshake sent to {target}. Retrying message shortly...{T()['RESET']}")
# Continue to next target so we don't block
continue
try:
ciphertext, enc = security.encrypt(body, target)
await websocket.send(json.dumps({
"type": "message", "message": f"/msg {target} {ciphertext}",
"encrypted": enc, "mode": security.mode
}))
# Local echo
ts = datetime.now().strftime('%H:%M:%S')
lock = "🔒 " if enc else ""
cprint(f"{T()['MAGENTA']}[{ts}] {lock}[Whisper to {target}]: {body}{T()['RESET']}")
except SecurityError as e:
cprint(f"{T()['RED']}Error for {target}: {e}{T()['RESET']}")
continue
# /client-settings local config panel
elif cmd == 'client-settings':
if len(parts) < 2:
tc = T()
cprint(f"\n{tc['HEADER']}-- Client Settings --{tc['RESET']}")
cprint(f" Theme: {CLIENT_SETTINGS.get('current_theme', 'standard')}")
cprint(f" Interactive prompts: {'ON' if CLIENT_SETTINGS['interactive_prompts'] else 'OFF'}")
cprint(f" Show message IDs: {'ON' if CLIENT_SETTINGS.get('show_ids') else 'OFF'}")
cprint(f" History DB: {CLIENT_SETTINGS['db_path']}")
cprint(f"\nUsage: /client-settings prompts on|off")
else:
sub = parts[1].lower()
if sub in ('prompts', 'interactive'):
val = parts[2].lower() if len(parts) > 2 else 'off'
CLIENT_SETTINGS['interactive_prompts'] = val in ('on', 'true', '1', 'enable')
cprint(f"{T()['GREEN']}Interactive prompts {'ON' if CLIENT_SETTINGS['interactive_prompts'] else 'OFF'}{T()['RESET']}")
elif sub == 'ids':
val = parts[2].lower() if len(parts) > 2 else 'off'
CLIENT_SETTINGS['show_ids'] = val in ('on', 'true', '1', 'enable')
cprint(f"{T()['GREEN']}show_ids {'ON' if CLIENT_SETTINGS['show_ids'] else 'OFF'}{T()['RESET']}")
continue
# /me action message sent directly
elif cmd == 'me':
if len(parts) > 1:
action = ' '.join(parts[1:])
await websocket.send(json.dumps({"type": "message", "message": f"/me {action}"}))
continue
# -- Forward to server ---------------------------------------------
payload = None
if not text.startswith('/'):
try:
ciphertext, enc = security.encrypt(text, None) # None uses room
payload = {
"type": "message", "message": ciphertext,
"encrypted": enc, "mode": security.mode
}
# 1. Local Echo (Snappy UI)
ts = datetime.now().strftime('%H:%M:%S')
tc = T()
prefix = ""
if security.mode == MODE_E2EE:
prefix = f"{tc['YELLOW']}[🌐 Private]{tc['RESET']} "
elif security.mode == MODE_SHARED:
prefix = f"{tc['GREEN']}[🔒 SHARED]{tc['RESET']} "
print(f"{tc['YELLOW']}[{ts}]{tc['RESET']} {prefix}{tc['BOLD']}[{nickname}]{tc['RESET']} {text}")
except SecurityError as e:
cprint(f"{T()['RED']}Security Error: {e}{T()['RESET']}")
continue
else:
final = text
payload = {"type": "message", "message": final}
if payload:
try:
await websocket.send(json.dumps(payload))
except ConnectionClosed:
if not text.startswith('/'):
# Offline Draft Queueing!
queue_draft_message(room, payload)
break
except Exception as e:
cprint(f"{T()['RED']}[Send error] {e}{T()['RESET']}")
# Assume disconnected if random send error occurs
if not text.startswith('/'):
queue_draft_message(room, payload)
break
except asyncio.CancelledError:
break
except ConnectionClosed:
break
except Exception as e:
cprint(f"{T()['RED']}[Input error] {e}{T()['RESET']}")
break
# ============================================================================
# Outgoing Queue Sender (file chunks and other async sends)
# ============================================================================
async def queue_sender(websocket):
"""Drain the outgoing_queue and send messages over WebSocket."""
while True:
try:
msg = await cast(asyncio.Queue, outgoing_queue).get()
await websocket.send(json.dumps(msg))
cast(Any, outgoing_queue).task_done()
except asyncio.CancelledError:
break
except ConnectionClosed:
break
except Exception as e:
cprint(f"{T()['RED']}[Queue sender error] {e}{T()['RESET']}")
# ============================================================================
# Receive Loop
# ============================================================================
async def receive_handler(websocket):
"""Continuously receive and display server messages."""
try:
async for raw in websocket:
try:
data = json.loads(cast(Any, raw))
except json.JSONDecodeError:
cprint(f"{T()['RED']}[Invalid JSON] {raw[:120]}{T()['RESET']}")
continue
display_message(data)
except ConnectionClosed:
pass
except Exception as e:
cprint(f"{T()['RED']}[Receive error] {e}{T()['RESET']}")
# ============================================================================
# Main Client with Auto-Reconnect
# ============================================================================
async def chat_client():
global nickname, room, server_url, outgoing_queue
setup_wizard()
outgoing_queue = asyncio.Queue()
retry_count: int = 0
while True:
try:
session_start = time.time()
async with websockets.connect(
server_url,
ping_interval=20,
ping_timeout=60,
max_size=10 * 1024 * 1024 # 10 MB for file chunks
) as websocket:
# -- 1. Send join ----------------------------------------------
await websocket.send(json.dumps({
"type": "join",
"nickname": nickname,
"room": room
}))
# -- 2. Auth Handshake & Auto-login ----------------------------
try:
raw = await asyncio.wait_for(websocket.recv(), timeout=5.0)
data = json.loads(raw)
if data.get('type') == 'auth_required':
import getpass
loop = asyncio.get_running_loop()
saved_pass = CLIENT_SETTINGS.get('last_pass', '')
use_saved = False
if saved_pass and CLIENT_SETTINGS.get('save_profile'):
cprint(f"{Colors.GREEN}[*] Using saved credentials for '{nickname}'...{Colors.RESET}")
pwd = saved_pass
use_saved = True
else:
pwd = await loop.run_in_executor(
None, cast(Any, getpass.getpass),
f"{Colors.YELLOW}{data.get('message', 'Password: ')}{Colors.RESET} "
)
# Store temporary password on the websocket object handle (used by display_message tracker)
# Note: This is an ad-hoc attribute on the wrapper but works for this single-instance client.
# In the real code we don't have 'self' in chat_client, but display_message is global.
# I will store it in a global for simplicity in this specific client architecture.
global _LAST_PASS_USED
_LAST_PASS_USED = cast(str, pwd)
await websocket.send(json.dumps({'type': 'auth_response', 'password': cast(str, pwd)}))
# Wait for result of authentication
raw2 = await asyncio.wait_for(websocket.recv(), timeout=15.0)
data2 = json.loads(raw2)
if data2.get('type') == 'error':
cprint(f"{Colors.RED}[X] {data2.get('message')}{Colors.RESET}")
if use_saved:
cprint(f"{Colors.YELLOW}[!] Saved password failed. Clearing it.{Colors.RESET}")
save_setting('last_pass', '')
await asyncio.sleep(RECONNECT_DELAY)
continue
else:
# Success! Save password if "Save Profile" is checked
if CLIENT_SETTINGS.get('save_profile'):
save_setting('last_pass', pwd)
display_message(data2)
else:
display_message(data)
except asyncio.TimeoutError:
pass # No immediate message proceed
except Exception as e:
cprint(f"{Colors.RED}Handshake error: {e}{Colors.RESET}")
# -- 3. Offline Draft Drain -------------------------------------
drafts = await asyncio.to_thread(cast(Any, get_and_clear_drafts))
if drafts:
cprint(f"{T()['YELLOW']}[Offline] Sending {len(drafts)} drafted message(s)...{T()['RESET']}")
for d_payload_str in cast(list, drafts):
try:
# Parse it back to json and put it in the robust outgoing queue so it retries nicely if it fails
payload_evt = json.loads(d_payload_str)
await outgoing_queue.put(payload_evt)
except Exception:
pass
# -- 4. Start tasks ---------------------------------------------
# HEARTBEAT WORKER (Suggestion 1)
async def heartbeat_worker(ws):
while True:
try:
await asyncio.sleep(30)
await ws.send(json.dumps({"type": "ping"}))
except Exception:
break
input_task = asyncio.create_task(input_handler(websocket))
recv_task = asyncio.create_task(receive_handler(websocket))
sender_task = asyncio.create_task(queue_sender(websocket))
heartbeat_task = asyncio.create_task(heartbeat_worker(websocket))
done, pending = await asyncio.wait(
[input_task, recv_task, sender_task, heartbeat_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# User typed /quit or Ctrl-D -> don't reconnect
if input_task in done and not recv_task.done():
cprint(f"{Colors.YELLOW}\nDisconnecting cleanly.{Colors.RESET}")
return
# Auto-reconnect with Exponential Backoff
session_duration = time.time() - session_start
cprint(f"{Colors.RED}\nConnection lost.{Colors.RESET}")
# If we failed very quickly, maybe the nickname is taken or rejected
if session_duration < 2:
loop = asyncio.get_running_loop()
try:
cprint(f"{Colors.YELLOW}Connection lasted < 2s. Identity rejection?{Colors.RESET}")
choice = await loop.run_in_executor(
None, cast(Any, lambda: input(f"Try a different Nickname/Room? (y/N): ").strip().lower())
)
if choice == 'y':
new_nick = await loop.run_in_executor(None, cast(Any, lambda: input(f"New Nickname (Current: {nickname}): ").strip()))
if new_nick: nickname = new_nick
new_room = await loop.run_in_executor(None, cast(Any, lambda: input(f"New Room (Current: {room}): ").strip()))
if new_room: room = new_room; save_setting('last_room', room)
except: pass
# Reset retry count if we were connected for a meaningful amount of time
if session_duration > 30:
retry_count = 0
else:
retry_count += 1
# 2^retry_count, maxed at 60 seconds
current_backoff = min(60, 2 ** int(cast(Any, retry_count)))
cprint(f"{Colors.GREEN}Reconnecting as '{nickname}' in {current_backoff}s...{Colors.RESET}")
await asyncio.sleep(current_backoff)
except (ConnectionRefusedError, OSError, WebSocketException) as e:
current_backoff = min(30, (cast(int, retry_count) + 1) * 5)
cprint(f"{Colors.RED}Cannot connect: {e}. Retrying in {current_backoff}s...{Colors.RESET}")
await asyncio.sleep(current_backoff)
retry_count += 1
except KeyboardInterrupt:
cprint(f"{Colors.YELLOW}\nInterrupted. Goodbye!{Colors.RESET}")
return
# ============================================================================
# Entry Point
# ============================================================================
if __name__ == "__main__":
# Ensure UTF-8 and VT mode on Windows for better emoji/box/color support
if os.name == 'nt':
enable_windows_vt()
if hasattr(sys.stdout, 'reconfigure'):
cast(Any, sys.stdout).reconfigure(encoding='utf-8')
try:
asyncio.run(chat_client())
except KeyboardInterrupt:
cprint(f"\n{Colors.YELLOW}Goodbye!{Colors.RESET}")