1579 lines
70 KiB
Python
1579 lines
70 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Ultimate Chat Client – Merged & Enhanced
|
||
- Secure Encryption (Random Salt per Message, AES-256)
|
||
- Chunked File Transfer via WebSocket
|
||
- Local History Persistence (SQLite)
|
||
- Server-synced Themes (matrix, cyberpunk, noir, standard)
|
||
- Offline Mailbox display (📬)
|
||
- ANSI colors on Windows
|
||
- Random nickname on empty input
|
||
- ASCII art from images (Pillow)
|
||
- Full server command support (/help, /join, /msg, /settings, etc.)
|
||
- Auto-reconnect with nickname conflict handling
|
||
- Full auth handshake for registered users
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import time
|
||
import sys
|
||
import os
|
||
import random
|
||
import string
|
||
import zlib
|
||
import base64
|
||
import secrets
|
||
import sqlite3
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Optional, Union, cast
|
||
|
||
# ===== Enable ANSI colors on Windows =====
|
||
if os.name == 'nt':
|
||
try:
|
||
import ctypes
|
||
kernel32 = cast(Any, ctypes).windll.kernel32
|
||
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
|
||
except Exception:
|
||
pass
|
||
|
||
import websockets # type: ignore
|
||
from websockets.exceptions import ConnectionClosed, WebSocketException # type: ignore
|
||
|
||
# Optional imports – features gracefully disabled if not installed
|
||
try:
|
||
from cryptography.fernet import Fernet, InvalidToken # type: ignore
|
||
from cryptography.hazmat.primitives import hashes, serialization # type: ignore
|
||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # type: ignore
|
||
from cryptography.hazmat.primitives.asymmetric import x25519 # type: ignore
|
||
from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore
|
||
from cryptography.hazmat.backends import default_backend # type: ignore
|
||
CRYPTOGRAPHY_AVAILABLE = True
|
||
except ImportError:
|
||
CRYPTOGRAPHY_AVAILABLE = False
|
||
|
||
try:
|
||
from PIL import Image # type: ignore
|
||
PILLOW_AVAILABLE = True
|
||
except ImportError:
|
||
PILLOW_AVAILABLE = False
|
||
|
||
try:
|
||
from prompt_toolkit import PromptSession, print_formatted_text, ANSI # type: ignore
|
||
from prompt_toolkit.patch_stdout import patch_stdout # type: ignore
|
||
from prompt_toolkit.completion import WordCompleter # type: ignore
|
||
from prompt_toolkit.key_binding import KeyBindings # type: ignore
|
||
PROMPT_TOOLKIT_AVAILABLE = True
|
||
except ImportError:
|
||
PROMPT_TOOLKIT_AVAILABLE = False
|
||
|
||
try:
|
||
from plyer import notification # type: ignore
|
||
PLYER_AVAILABLE = True
|
||
except ImportError:
|
||
PLYER_AVAILABLE = False
|
||
|
||
# ============================================================================
|
||
# ANSI color codes (static fallback)
|
||
# ============================================================================
|
||
class Colors:
|
||
HEADER = '\x1b[95m'
|
||
BLUE = '\x1b[94m'
|
||
CYAN = '\x1b[96m'
|
||
GREEN = '\x1b[92m'
|
||
YELLOW = '\x1b[93m'
|
||
RED = '\x1b[91m'
|
||
MAGENTA = '\x1b[95m'
|
||
RESET = '\x1b[0m'
|
||
BOLD = '\x1b[1m'
|
||
UNDERLINE = '\x1b[4m'
|
||
|
||
# ============================================================================
|
||
# Global State
|
||
# ============================================================================
|
||
RECONNECT_DELAY = 3
|
||
DEFAULT_SERVER = "wss://server.wholeworldcoding.com/radio"
|
||
|
||
nickname = ""
|
||
room = "lobby"
|
||
server_url = DEFAULT_SERVER
|
||
# Note: Security state is now managed by the 'security' object.
|
||
pending_file_transfers = {} # transfer_id -> offer dict
|
||
file_chunks_buffer = {} # transfer_id -> {chunks, total, filename, received_count}
|
||
outgoing_queue: Optional[asyncio.Queue] = None # Initialized in chat_client()
|
||
_LAST_PASS_USED: str = "" # Global for auth tracker
|
||
|
||
CLIENT_SETTINGS = {
|
||
'interactive_prompts': True,
|
||
'current_theme': 'standard',
|
||
'db_path': 'chat_history.db',
|
||
'show_ids': False,
|
||
'last_server': DEFAULT_SERVER,
|
||
'last_room': 'lobby',
|
||
'last_nick': '',
|
||
'last_pass': '',
|
||
'save_profile': False
|
||
}
|
||
recent_rooms: List[str] = [] # Track last 9 rooms for Alt+1-9 logic
|
||
_LAST_PASS_USED: str = ""
|
||
_SESSION_INSTANCE = None
|
||
|
||
def load_settings():
|
||
"""Load settings from local SQLite."""
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.row_factory = sqlite3.Row
|
||
cursor = conn.execute("SELECT key, value FROM settings")
|
||
for row in cursor:
|
||
k, v = row['key'], row['value']
|
||
if v.lower() == 'true': v = True
|
||
elif v.lower() == 'false': v = False
|
||
CLIENT_SETTINGS[k] = v
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
def save_setting(key, value):
|
||
"""Save a single setting to local SQLite."""
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.execute("INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, str(value)))
|
||
conn.commit()
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
# ============================================================================
|
||
# Theme Management
|
||
# ============================================================================
|
||
# Utilities
|
||
# ============================================================================
|
||
_SESSION_INSTANCE = None
|
||
|
||
def pt_prompt(prompt_str, is_password=False, default=""):
|
||
"""Helper to call PromptSession.prompt with fallback to input()."""
|
||
global _SESSION_INSTANCE
|
||
if PROMPT_TOOLKIT_AVAILABLE and _SESSION_INSTANCE:
|
||
try:
|
||
return cast(Any, _SESSION_INSTANCE).prompt(prompt_str, is_password=is_password, default=default)
|
||
except (KeyboardInterrupt, EOFError):
|
||
return None
|
||
else:
|
||
if is_password:
|
||
import getpass
|
||
return getpass.getpass(prompt_str)
|
||
return input(prompt_str)
|
||
|
||
def enable_windows_vt():
|
||
"""Enable Virtual Terminal Processing on Windows for ANSI support."""
|
||
if os.name == 'nt':
|
||
import ctypes
|
||
from ctypes import wintypes
|
||
kernel32 = getattr(ctypes, 'windll').kernel32
|
||
hOut = kernel32.GetStdHandle(-11) # STD_OUTPUT_HANDLE
|
||
if hOut == -1: return False
|
||
mode = wintypes.DWORD()
|
||
if not kernel32.GetConsoleMode(hOut, ctypes.byref(mode)):
|
||
return False
|
||
# ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
|
||
if not kernel32.SetConsoleMode(hOut, mode.value | 0x0004):
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
THEMES = {
|
||
'standard': {
|
||
'HEADER': '\x1b[95m', 'BLUE': '\x1b[94m', 'CYAN': '\x1b[96m',
|
||
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[93m','RED': '\x1b[91m',
|
||
'MAGENTA': '\x1b[95m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
|
||
},
|
||
'matrix': {
|
||
'HEADER': '\x1b[92m', 'BLUE': '\x1b[32m', 'CYAN': '\x1b[92m',
|
||
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[92m','RED': '\x1b[31m',
|
||
'MAGENTA': '\x1b[32m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
|
||
},
|
||
'cyberpunk': {
|
||
'HEADER': '\x1b[96m', 'BLUE': '\x1b[94m', 'CYAN': '\x1b[96m',
|
||
'GREEN': '\x1b[92m', 'YELLOW': '\x1b[93m','RED': '\x1b[91m',
|
||
'MAGENTA': '\x1b[95m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
|
||
},
|
||
'noir': {
|
||
'HEADER': '\x1b[37m', 'BLUE': '\x1b[90m', 'CYAN': '\x1b[37m',
|
||
'GREEN': '\x1b[90m', 'YELLOW': '\x1b[37m','RED': '\x1b[97m',
|
||
'MAGENTA': '\x1b[90m','RESET': '\x1b[0m', 'BOLD': '\x1b[1m',
|
||
},
|
||
}
|
||
|
||
def T() -> dict:
|
||
"""Return current theme color palette."""
|
||
return cast(dict, THEMES.get(cast(str, CLIENT_SETTINGS.get('current_theme', 'standard')), THEMES['standard']))
|
||
|
||
def apply_theme(name: str):
|
||
"""Switch the current UI theme."""
|
||
name = name.lower()
|
||
if name in THEMES:
|
||
CLIENT_SETTINGS['current_theme'] = name
|
||
tc = T()
|
||
cprint(f"{tc['GREEN']}Theme switched to: {name.upper()}{tc['RESET']}")
|
||
else:
|
||
tc = T()
|
||
cprint(f"{tc['RED']}Theme '{name}' not found. Available: {', '.join(THEMES.keys())}{tc['RESET']}")
|
||
|
||
|
||
# ----------------------------------------------------------------------------
|
||
# Security Infrastructure
|
||
# ----------------------------------------------------------------------------
|
||
MODE_PLAIN = "plain"
|
||
MODE_SHARED = "shared"
|
||
MODE_E2EE = "e2ee"
|
||
|
||
class SecurityManager:
|
||
"""Manages multi-modal encryption and per-peer session keys."""
|
||
def __init__(self):
|
||
self.mode = MODE_PLAIN
|
||
self.shared_passphrase = None
|
||
self.active_keys = {} # nickname -> Fernet object
|
||
self.outgoing_queue = None # Set later
|
||
|
||
if CRYPTOGRAPHY_AVAILABLE:
|
||
self.priv_key = x25519.X25519PrivateKey.generate()
|
||
self.pub_bytes = self.priv_key.public_key().public_bytes_raw()
|
||
else:
|
||
self.priv_key = None
|
||
self.pub_bytes = None
|
||
|
||
def set_mode(self, new_mode):
|
||
if new_mode not in (MODE_PLAIN, MODE_SHARED, MODE_E2EE):
|
||
return False, "Invalid mode"
|
||
if new_mode != MODE_PLAIN and not CRYPTOGRAPHY_AVAILABLE:
|
||
return False, "Cryptography library not installed"
|
||
|
||
# When switching modes, we preserve any E2EE peer keys we already have,
|
||
# but cleared room keys to force re-handshake if needed.
|
||
self.mode = new_mode
|
||
# Keep peer keys, clear specifically 'room' keys
|
||
# This allows hybrid usage (peer keys persist while room logic changes)
|
||
return True, f"Security mode set to {new_mode.upper()}"
|
||
|
||
def derive_shared(self, salt, name):
|
||
if not self.shared_passphrase: return
|
||
kdf = PBKDF2HMAC(
|
||
algorithm=hashes.SHA256(), length=32, salt=salt, iterations=200000
|
||
)
|
||
if self.shared_passphrase is None:
|
||
raise SecurityError("Shared passphrase not set.")
|
||
key = base64.urlsafe_b64encode(kdf.derive(cast(str, self.shared_passphrase).encode()))
|
||
self.active_keys[name] = Fernet(key)
|
||
|
||
def derive_e2ee(self, peer_pub_bytes, name):
|
||
peer_pub = x25519.X25519PublicKey.from_public_bytes(peer_pub_bytes)
|
||
shared_secret = self.priv_key.exchange(peer_pub)
|
||
hkdf = HKDF(algorithm=hashes.SHA256(), length=32, salt=None, info=b"clradio-e2ee")
|
||
key = base64.urlsafe_b64encode(hkdf.derive(shared_secret))
|
||
self.active_keys[name] = Fernet(key)
|
||
|
||
def encrypt(self, text, target):
|
||
"""Encrypt for room (SHARED) or peer (E2EE/SHARED_PRIVATE)."""
|
||
if self.mode == MODE_PLAIN: return text, False
|
||
|
||
# Use room key for broadcast, target key for private
|
||
key_name = target if target else room
|
||
key = self.active_keys.get(key_name)
|
||
|
||
if not key:
|
||
# Fallback to room for encryption
|
||
# 1. If in SHARED mode, we MUST use the room key
|
||
if self.mode == MODE_SHARED:
|
||
key = self.active_keys.get(room)
|
||
# 2. If in E2EE mode (Hybrid), we can use the room key if it exists (SHARED in Public)
|
||
elif self.mode == MODE_E2EE and not target:
|
||
key = self.active_keys.get(room)
|
||
|
||
if not key:
|
||
# If in E2EE mode but no key available for target/room, allow plain fallback for ROOM only
|
||
if self.mode == MODE_E2EE and not target:
|
||
return text, False
|
||
raise SecurityError(f"No active session key for {key_name}. Handshake required for private messages.")
|
||
|
||
return key.encrypt(text.encode()).decode(), True
|
||
|
||
def decrypt(self, token, sender):
|
||
"""Decrypt incoming payload."""
|
||
if not CRYPTOGRAPHY_AVAILABLE: return token
|
||
|
||
# Try per-sender key, then room key
|
||
key = self.active_keys.get(sender) or self.active_keys.get(room)
|
||
|
||
# If no key is found but it looks like a Fernet token, give a helpful placeholder
|
||
if not key:
|
||
if isinstance(token, str) and token.startswith('gAAAAA'):
|
||
return f"[🔒 Encrypted (SHARED)]"
|
||
return token
|
||
|
||
try:
|
||
# Handle potential string/bytes mismatch
|
||
token_bytes = cast(str, token).encode() if isinstance(token, str) else token
|
||
return key.decrypt(token_bytes).decode()
|
||
except Exception:
|
||
# If it's a Fernet token but we failed to decrypt (wrong password), mask it
|
||
if isinstance(token, str) and token.startswith('gAAAAA'):
|
||
return f"[🔒 Encrypted (SHARED) - Wrong Passphrase?]"
|
||
return token
|
||
|
||
class SecurityError(Exception): pass
|
||
|
||
security = SecurityManager()
|
||
room_nicks = set() # For TAB completion
|
||
# ============================================================================
|
||
# Local SQLite History
|
||
# ============================================================================
|
||
def init_db():
|
||
"""Initialize local SQLite database for message history."""
|
||
try:
|
||
db = cast(str, CLIENT_SETTINGS['db_path'])
|
||
conn = sqlite3.connect(db)
|
||
conn.execute('''
|
||
CREATE TABLE IF NOT EXISTS messages (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
timestamp REAL,
|
||
sender TEXT,
|
||
room TEXT,
|
||
content TEXT,
|
||
type TEXT DEFAULT 'message'
|
||
)
|
||
''')
|
||
conn.execute('''
|
||
CREATE TABLE IF NOT EXISTS settings (
|
||
key TEXT PRIMARY KEY,
|
||
value TEXT
|
||
)
|
||
''')
|
||
conn.execute('''
|
||
CREATE TABLE IF NOT EXISTS drafts (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
timestamp REAL,
|
||
room TEXT,
|
||
payload TEXT
|
||
)
|
||
''')
|
||
conn.commit()
|
||
conn.close()
|
||
load_settings()
|
||
except Exception as e:
|
||
cprint(f"{Colors.RED}[DB Error] Init failed: {e}{Colors.RESET}")
|
||
|
||
|
||
def save_message_local(sender: str, content: str, room_name: str, msg_type: str = 'message'):
|
||
"""Save a message to local history (non-blocking wrapper, call via asyncio.to_thread)."""
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.execute(
|
||
"INSERT INTO messages (timestamp, sender, room, content, type) VALUES (?, ?, ?, ?, ?)",
|
||
(time.time(), sender, room_name, content, msg_type)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
except Exception:
|
||
pass # fail silently – chat must not break on DB error
|
||
|
||
|
||
def get_local_history(limit: int = 50, filter_room: Optional[str] = None) -> list:
|
||
"""Return last N messages from local DB, chronological order. Optionally filtered by room."""
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.row_factory = sqlite3.Row
|
||
if filter_room:
|
||
rows = conn.execute(
|
||
"SELECT * FROM messages WHERE room = ? ORDER BY timestamp DESC LIMIT ?",
|
||
(filter_room.lower(), limit)
|
||
).fetchall()
|
||
else:
|
||
rows = conn.execute(
|
||
"SELECT * FROM messages ORDER BY timestamp DESC LIMIT ?", (limit,)
|
||
).fetchall()
|
||
conn.close()
|
||
return list(reversed(rows))
|
||
except Exception:
|
||
return []
|
||
|
||
def queue_draft_message(room_name: str, payload_dict: dict):
|
||
"""Save a fully formed JSON payload to local drafts while offline."""
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.execute(
|
||
"INSERT INTO drafts (timestamp, room, payload) VALUES (?, ?, ?)",
|
||
(time.time(), room_name, json.dumps(payload_dict))
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
cprint(f"{Colors.YELLOW}[Offline] Message saved to drafts.{Colors.RESET}")
|
||
except Exception:
|
||
cprint(f"{Colors.RED}[Offline] Failed to save draft.{Colors.RESET}")
|
||
|
||
def get_and_clear_drafts() -> list:
|
||
"""Retrieve all offline drafts and empty the queue."""
|
||
drafts = []
|
||
try:
|
||
conn = sqlite3.connect(cast(str, CLIENT_SETTINGS['db_path']))
|
||
conn.row_factory = sqlite3.Row
|
||
rows = conn.execute("SELECT id, room, payload FROM drafts ORDER BY timestamp ASC").fetchall()
|
||
for r in rows:
|
||
drafts.append(r['payload'])
|
||
conn.execute("DELETE FROM drafts")
|
||
conn.commit()
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return drafts
|
||
|
||
# ============================================================================
|
||
# Encryption – Per-Message Random Salt (AES-256 via Fernet)
|
||
# ============================================================================
|
||
# Legacy encryption functions removed in favor of SecurityManager
|
||
|
||
# ============================================================================
|
||
# ASCII Art
|
||
# ============================================================================
|
||
def image_to_ascii(image_path: str, width: int = 60) -> str:
|
||
if not PILLOW_AVAILABLE:
|
||
return ""
|
||
try:
|
||
img = Image.open(image_path)
|
||
w, h = img.size
|
||
# Maintain aspect ratio for standard terminal fonts (roughly 1:2 w/h)
|
||
new_h = int(h / w * width * 0.5)
|
||
if new_h <= 0:
|
||
new_h = 1
|
||
img = img.resize((width, new_h))
|
||
img = img.convert('L') # Greyscale
|
||
|
||
# Standard ASCII gradient from dark to light
|
||
chars = "@%#*+=-:. "
|
||
|
||
# Calculate pixel-to-char mapping safely
|
||
pixel_data = list(img.getdata())
|
||
# Mapping 0-255 luminance to 0-9 indices in `chars`
|
||
pixels = "".join(chars[min(9, int((p / 255.0) * 9.99))] for p in pixel_data)
|
||
|
||
# Slicing the joined string into lines safely
|
||
lines = []
|
||
for i in range(0, len(pixels), width):
|
||
lines.append(cast(Any, pixels)[i:i+width])
|
||
|
||
return "\n" + "\n".join(lines)
|
||
except Exception as e:
|
||
return f"{Colors.RED}ASCII Error: {e}{Colors.RESET}"
|
||
|
||
# ============================================================================
|
||
# File Transfer
|
||
# ============================================================================
|
||
async def send_file_data(offer: dict, _target_nick: str):
|
||
"""Read file in 16 KB chunks and push to the outgoing_queue."""
|
||
filepath = offer.get('filepath', '')
|
||
tid = offer.get('transfer_id', '')
|
||
if not filepath or not os.path.exists(filepath):
|
||
cprint(f"{T()['RED']}File not found for transfer: {filepath}{T()['RESET']}")
|
||
return
|
||
try:
|
||
filesize = os.path.getsize(filepath)
|
||
chunk_size = 16 * 1024
|
||
total = (filesize + chunk_size - 1) // chunk_size
|
||
fname = os.path.basename(filepath)
|
||
cprint(f"{T()['GREEN']}Sending {fname} ({filesize:,} bytes, {total} chunks)...{T()['RESET']}")
|
||
|
||
with open(filepath, 'rb') as f:
|
||
idx = 0
|
||
while True:
|
||
chunk = f.read(chunk_size)
|
||
if not chunk:
|
||
break
|
||
await cast(Any, outgoing_queue).put({
|
||
"type": "file_chunk",
|
||
"transfer_id": tid,
|
||
"chunk_idx": idx,
|
||
"total_chunks": total,
|
||
"filename": fname,
|
||
"target": offer.get('target', ''),
|
||
"data": base64.b64encode(chunk).decode()
|
||
})
|
||
idx += 1
|
||
await asyncio.sleep(0.005)
|
||
if idx % 20 == 0:
|
||
sys.stdout.write(f"\rSending: {idx*100//total}% ")
|
||
sys.stdout.flush()
|
||
cprint(f"\n{T()['GREEN']}✅ Transfer complete: {fname}{T()['RESET']}")
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}Transfer failed: {e}{T()['RESET']}")
|
||
|
||
|
||
def handle_file_chunk(data: dict):
|
||
"""Buffer incoming file chunks and reassemble on completion."""
|
||
tid = data.get('transfer_id', '')
|
||
idx = data.get('chunk_idx', 0)
|
||
total = data.get('total_chunks', 1)
|
||
fname = data.get('filename', 'received_file')
|
||
b64_data = data.get('data', '')
|
||
|
||
buf = cast(Dict[str, Any], file_chunks_buffer.setdefault(tid, {
|
||
'chunks': {}, 'total': total, 'filename': fname, 'received_count': 0
|
||
}))
|
||
if idx not in cast(Dict[Any, Any], buf['chunks']):
|
||
cast(Dict[Any, Any], buf['chunks'])[idx] = base64.b64decode(b64_data)
|
||
buf['received_count'] = cast(int, buf['received_count']) + 1
|
||
received_count = cast(int, buf['received_count'])
|
||
if received_count % 20 == 0:
|
||
sys.stdout.write(f"\rReceiving {fname}: {received_count*100//total}% ")
|
||
sys.stdout.flush()
|
||
if buf['received_count'] >= total:
|
||
cprint("")
|
||
_save_received_file(tid)
|
||
|
||
|
||
def _save_received_file(tid: str):
|
||
"""Write buffered chunks to disk in order."""
|
||
buf = cast(Any, file_chunks_buffer).get(tid)
|
||
if not buf:
|
||
return
|
||
# Sanitize filename to prevent path traversal
|
||
raw_fname = buf.get('filename', 'received_file')
|
||
safe_fname = Path(raw_fname).name
|
||
save_path = Path("downloads") / f"received_{safe_fname}"
|
||
|
||
# Ensure downloads directory exists
|
||
try:
|
||
save_path.parent.mkdir(parents=True, exist_ok=True)
|
||
except Exception:
|
||
pass # Fallback to current dir if cannot create
|
||
save_path = Path(f"received_{safe_fname}")
|
||
|
||
try:
|
||
with open(save_path, 'wb') as f:
|
||
for i in range(buf['total']):
|
||
chunk = buf['chunks'].get(i)
|
||
if chunk is None:
|
||
cprint(f"{T()['RED']}Missing chunk {i}! File may be corrupted.{T()['RESET']}")
|
||
return
|
||
f.write(chunk)
|
||
cprint(f"{T()['GREEN']}📁 Saved -> {os.path.abspath(save_path)}{T()['RESET']}")
|
||
file_chunks_buffer.pop(tid, None)
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}Failed to save file: {e}{T()['RESET']}")
|
||
|
||
# ============================================================================
|
||
# Setup Wizard
|
||
# ============================================================================
|
||
def setup_wizard():
|
||
global nickname, server_url, room
|
||
|
||
init_db()
|
||
load_settings()
|
||
|
||
tc = T()
|
||
cprint(f"{tc['HEADER']}╔" + "═"*48 + "╗" + f"{tc['RESET']}")
|
||
cprint(f"{tc['HEADER']}║" + f"{tc['BOLD']} ULTIMATE RADIO CLIENT v2.5 MASTER {tc['RESET']}{tc['HEADER']}║{tc['RESET']}")
|
||
cprint(f"{tc['HEADER']}╠" + "═"*48 + "╣" + f"{tc['RESET']}")
|
||
cprint(f"{tc['HEADER']}║{tc['RESET']} Premium P2P & WebSocket Encrypted Bridge {tc['HEADER']}║{tc['RESET']}")
|
||
cprint(f"{tc['HEADER']}╚" + "═"*48 + "╝" + f"{tc['RESET']}\n")
|
||
|
||
if len(sys.argv) > 1:
|
||
server_url = sys.argv[1]
|
||
else:
|
||
default_srv = CLIENT_SETTINGS.get('last_server', DEFAULT_SERVER)
|
||
url_in = input(f"Server URL (Default: {default_srv}): ").strip()
|
||
server_url = url_in if url_in else default_srv
|
||
|
||
save_setting('last_server', server_url)
|
||
|
||
# Nickname logic with saved profile support
|
||
last_nick = CLIENT_SETTINGS.get('last_nick', '')
|
||
if last_nick:
|
||
nick_in = input(f"Nickname (Default: {last_nick}): ").strip()
|
||
nickname = nick_in if nick_in else last_nick
|
||
else:
|
||
nick_in = input(f"Nickname (Enter for random): ").strip()
|
||
if not nick_in:
|
||
nickname = "User" + ''.join(random.choices(string.ascii_uppercase + string.digits, k=4))
|
||
else:
|
||
nickname = nick_in
|
||
|
||
default_rm = CLIENT_SETTINGS.get('last_room', 'lobby')
|
||
room_in = input(f"Room (Default: {default_rm}): ").strip()
|
||
room = room_in if room_in else default_rm
|
||
save_setting('last_room', room)
|
||
|
||
if CRYPTOGRAPHY_AVAILABLE:
|
||
cprint(f"\n{tc['CYAN']}Select Security Mode:{tc['RESET']}")
|
||
cprint(f" [1] {tc['BOLD']}PLAIN{tc['RESET']} - No encryption")
|
||
cprint(f" [2] {tc['BOLD']}SHARED{tc['RESET']} - Symmetric (Group Passphrase)")
|
||
cprint(f" [3] {tc['BOLD']}E2EE{tc['RESET']} - Point-to-Point (X25519 Handshake)")
|
||
|
||
mode_in = input("Choice [1]: ").strip()
|
||
if mode_in == '2':
|
||
import getpass
|
||
pwd = getpass.getpass("Shared passphrase: ")
|
||
if pwd:
|
||
cast(Any, security).shared_passphrase = pwd
|
||
security.set_mode(MODE_SHARED)
|
||
cprint(f"{tc['GREEN']}✅ Mode: SHARED (AES-256 enabled){tc['RESET']}")
|
||
elif mode_in == '3':
|
||
security.set_mode(MODE_E2EE)
|
||
cprint(f"{tc['GREEN']}✅ Mode: E2EE (X25519 sessions active){tc['RESET']}")
|
||
else:
|
||
security.set_mode(MODE_PLAIN)
|
||
cprint(f"{tc['YELLOW']}✅ Mode: PLAIN{tc['RESET']}")
|
||
else:
|
||
cprint(f"{tc['YELLOW']}Tip: pip install cryptography to enable E2EE/SHARED modes.{tc['RESET']}")
|
||
|
||
# Remember Me / Account Auto-login
|
||
save_pref = CLIENT_SETTINGS.get('save_profile', False)
|
||
save_input = input(f"Save credentials for fast login? (y/N) [Default: {'Yes' if save_pref else 'No'}]: ").strip().lower()
|
||
|
||
do_save = save_pref
|
||
if save_input == 'y': do_save = True
|
||
elif save_input == 'n': do_save = False
|
||
|
||
if do_save:
|
||
save_setting('save_profile', True)
|
||
save_setting('last_nick', nickname)
|
||
# We don't save the password yet here because we haven't asked for it.
|
||
# But we'll mark it to be saved after a successful login in the command handler.
|
||
else:
|
||
save_setting('save_profile', False)
|
||
save_setting('last_nick', '')
|
||
save_setting('last_pass', '')
|
||
|
||
cprint(f"\n{tc['GREEN']}Connecting to {server_url} as '{nickname}' in '{room}'...{tc['RESET']}")
|
||
cprint(f"{tc['CYAN']}Type /help for server commands. Local: /history, /theme, /mode, /crypt, /security, /clear{tc['RESET']}")
|
||
cprint("-" * 50)
|
||
|
||
# ============================================================================
|
||
# Utilities
|
||
# ============================================================================
|
||
def cprint(text: str):
|
||
"""Prints text with ANSI colors, using prompt_toolkit if available."""
|
||
# Ensure raw \x1b is used, not escaped literal \\x1b
|
||
if isinstance(text, bytes):
|
||
text = text.decode('utf-8', errors='replace')
|
||
|
||
if PROMPT_TOOLKIT_AVAILABLE:
|
||
try:
|
||
with patch_stdout():
|
||
print_formatted_text(ANSI(text))
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
# Fallback: manually strip ANSI if it looks like the terminal is broken
|
||
# but for now we try to support it via enable_windows_vt()
|
||
print(text)
|
||
|
||
def display_message(data: dict):
|
||
"""Format and print a server message, respecting the active theme."""
|
||
global nickname, room, room_nicks
|
||
|
||
# Handle raw history rows (no 'type' key)
|
||
if 'type' not in data:
|
||
raw_ts = data.get('timestamp', time.time())
|
||
try:
|
||
# Handle mixed timestamp types from database/JSON
|
||
if isinstance(raw_ts, str):
|
||
if ':' in raw_ts:
|
||
ts = raw_ts # Already formatted HH:MM:SS
|
||
else:
|
||
ts = datetime.fromtimestamp(float(raw_ts)).strftime('%H:%M:%S')
|
||
else:
|
||
ts = datetime.fromtimestamp(float(raw_ts)).strftime('%H:%M:%S')
|
||
except:
|
||
ts = datetime.now().strftime('%H:%M:%S')
|
||
|
||
data = {
|
||
'type': 'message',
|
||
'timestamp': ts,
|
||
'nickname': data.get('nickname') or data.get('sender') or '?',
|
||
# Robust content extraction: try every possible key used by server or legacy DB
|
||
'message': data.get('message') or data.get('content') or data.get('text') or '',
|
||
'id': data.get('id'),
|
||
'encrypted': bool(data.get('encrypted', 0)),
|
||
'mode': data.get('mode', 'plain')
|
||
}
|
||
|
||
tc = T()
|
||
t = data.get('type')
|
||
|
||
# -- System --------------------------------------------------------------
|
||
if t == 'system':
|
||
text = str(data.get('message') or '')
|
||
if 'joined' in text:
|
||
color = tc['GREEN']
|
||
elif 'left' in text or 'disconnect' in text.lower():
|
||
color = tc['RED']
|
||
else:
|
||
color = tc['YELLOW']
|
||
cprint(f"{color}{text}{tc['RESET']}")
|
||
# Track local room state
|
||
if 'You are now known as' in text:
|
||
# Atomic update of global nickname to keep state in sync with server
|
||
nickname = text.split('as ')[-1].strip('.')
|
||
elif 'Joined room:' in text:
|
||
new_room = text.split(': ')[-1].strip()
|
||
# Update recent rooms for Alt+1-9 hotkeys
|
||
if new_room != room:
|
||
if room and room not in recent_rooms:
|
||
recent_rooms.insert(0, room)
|
||
if len(recent_rooms) > 9:
|
||
recent_rooms.pop()
|
||
room = new_room
|
||
return
|
||
|
||
# -- Chat Message ---------------------------------------------------------
|
||
elif t == 'message':
|
||
content = str(data.get('message') or '')
|
||
timestamp = data.get('timestamp', datetime.now().strftime('%H:%M:%S'))
|
||
sender = data.get('nickname', '?')
|
||
|
||
# Skip own echo (we have local echo)
|
||
# We use a robust comparison to handle any normalization the server might do
|
||
my_nick = (nickname or "").lower().strip()
|
||
sender_nick = (sender or "").lower().strip()
|
||
|
||
if my_nick and sender_nick == my_nick and not str(content or "").startswith('/me'):
|
||
return
|
||
|
||
# Decrypt if needed
|
||
is_encrypted = data.get('encrypted', False)
|
||
# Auto-detect encrypted content even if not explicitly tagged (legacy support)
|
||
if not is_encrypted and isinstance(content, str) and content.startswith('gAAAAA'):
|
||
is_encrypted = True
|
||
|
||
sec_mode = data.get('mode', 'plain')
|
||
|
||
if is_encrypted:
|
||
content = security.decrypt(content, sender)
|
||
|
||
# Security Fallback: If content somehow became empty, show the raw message or a placeholder
|
||
if not content and data.get('message'):
|
||
content = str(data.get('message') or '')
|
||
if not content:
|
||
msg_id = data.get('id', '??')
|
||
content = f"{tc['YELLOW']}[No content for msg #{msg_id}]{tc['RESET']}"
|
||
|
||
content_str = cast(str, content)
|
||
|
||
# Whisper styling
|
||
if content_str.startswith('[Whisper from') or content_str.startswith('[Whisper to'):
|
||
prefix = f"{tc['MAGENTA']}[E2EE] " if sec_mode == MODE_E2EE else ""
|
||
cprint(f"{tc['MAGENTA']}{tc['BOLD']}[{timestamp}] {prefix}{content_str}{tc['RESET']}")
|
||
asyncio.get_event_loop().run_in_executor(
|
||
None, cast(Any, save_message_local), sender, content_str, room, 'whisper'
|
||
)
|
||
return
|
||
|
||
# Mention alert
|
||
mention_color = ""
|
||
if f"@{nickname}" in content_str:
|
||
mention_color = tc['MAGENTA']
|
||
|
||
# Per-user color (skip in mono themes)
|
||
if CLIENT_SETTINGS.get('current_theme') in ('matrix', 'noir'):
|
||
sender_color = tc['CYAN']
|
||
else:
|
||
palette = [tc['CYAN'], tc['BLUE'], tc['YELLOW'], tc['GREEN'], tc['MAGENTA'], tc['HEADER']]
|
||
sender_color = palette[sum(ord(c) for c in sender) % len(palette)]
|
||
|
||
# Optional message ID prefix
|
||
msg_id = data.get('id')
|
||
id_str = f"{tc['HEADER']}[{msg_id}]{tc['RESET']} " if msg_id and CLIENT_SETTINGS.get('show_ids') else ""
|
||
|
||
# Use 🌐 icon to indicate server-confirmed message
|
||
line = f"{tc['YELLOW']}[{timestamp}]{tc['RESET']} 🌐 {id_str}{sender_color}[{sender}]{tc['RESET']}: {mention_color}{content_str}{tc['RESET'] if mention_color else ''}"
|
||
cprint(line)
|
||
|
||
# Save to local history asynchronously
|
||
asyncio.get_event_loop().run_in_executor(
|
||
None, save_message_local, sender, content_str, room
|
||
)
|
||
|
||
# Feature 6: Desktop Notifications (Suggestion 6)
|
||
if PLYER_AVAILABLE and sender != nickname:
|
||
should_notify = False
|
||
if f"@{nickname}" in content_str:
|
||
should_notify = True
|
||
|
||
if should_notify:
|
||
try:
|
||
notification.notify(
|
||
title=f"Radio - {sender}",
|
||
message=content_str[:150], # type: ignore
|
||
app_name="Radio",
|
||
timeout=5
|
||
)
|
||
except Exception: pass
|
||
return
|
||
|
||
# -- Command Response ------------------------------------------------------
|
||
elif t == 'command_response':
|
||
cprint(f"{tc['CYAN']}{data.get('message', '')}{tc['RESET']}")
|
||
return
|
||
|
||
# -- Error -----------------------------------------------------------------
|
||
elif t == 'error':
|
||
cprint(f"{tc['RED']}❌ {data.get('message', '')}{tc['RESET']}")
|
||
return
|
||
|
||
# -- Offline Mailbox -------------------------------------------------------
|
||
elif t == 'offline':
|
||
ts = data.get('timestamp', '??:??')
|
||
sender = data.get('nickname', '?')
|
||
content = data.get('message', '')
|
||
cprint(f"{tc['MAGENTA']}📬 [OFFLINE {ts}] {tc['BOLD']}{sender}{tc['RESET']}{tc['MAGENTA']}: {content}{tc['RESET']}")
|
||
return
|
||
|
||
# -- Announcement ----------------------------------------------------------
|
||
elif t == 'announcement':
|
||
cprint(f"{tc['YELLOW']}📢 {data.get('content', '')} (by {data.get('created_by', 'Admin')}){tc['RESET']}")
|
||
return
|
||
|
||
# -- Notification ----------------------------------------------------------
|
||
elif t == 'notification':
|
||
msg = data.get('message', '')
|
||
sender = data.get('from', 'System')
|
||
room_name = data.get('room', '??')
|
||
cprint(f"{tc['HEADER']}🔔 NOTIFICATION: {tc['BOLD']}{sender}{tc['RESET']}{tc['HEADER']} in #{room_name}: {msg}{tc['RESET']}")
|
||
return
|
||
|
||
# -- History Bundle --------------------------------------------------------
|
||
elif t == 'history':
|
||
msgs = data.get('messages', [])
|
||
cprint(f"\n{tc['HEADER']}-- Room History (last {len(msgs)}) --{tc['RESET']}")
|
||
for msg in msgs:
|
||
display_message(msg)
|
||
cprint(f"{tc['HEADER']}--------------------------------------{tc['RESET']}\n")
|
||
return
|
||
|
||
# -- Edit/Delete Notifications ---------------------------------------------
|
||
elif t == 'message_edit':
|
||
msg_id = data.get('id')
|
||
new_text = data.get('message')
|
||
editor = data.get('edited_by')
|
||
cprint(f"{tc['CYAN']}📝 Message #{msg_id} was edited by {editor}: {new_text}{tc['RESET']}")
|
||
return
|
||
|
||
elif t == 'message_delete':
|
||
msg_id = data.get('id')
|
||
remover = data.get('deleted_by')
|
||
cprint(f"{tc['RED']}🗑️ Message #{msg_id} was deleted by {remover}{tc['RESET']}")
|
||
return
|
||
|
||
# -- File Events -----------------------------------------------------------
|
||
elif t == 'file_offer':
|
||
if data.get('sub') == 'chunk':
|
||
handle_file_chunk(data)
|
||
else:
|
||
tid = data.get('transfer_id', '')
|
||
cprint(f"{tc['HEADER']}📁 File offer from {data.get('from')}: {data.get('filename')} ({data.get('size', 0):,} bytes){tc['RESET']}")
|
||
cprint(f"{tc['HEADER']} Accept with: /accept {tid}{tc['RESET']}")
|
||
pending_file_transfers[tid] = data
|
||
return
|
||
|
||
elif t == 'file_chunk':
|
||
handle_file_chunk(data)
|
||
return
|
||
|
||
elif t == 'file_accept':
|
||
tid = data.get('transfer_id', '')
|
||
if tid in pending_file_transfers:
|
||
asyncio.ensure_future(send_file_data(pending_file_transfers[tid], data.get('target', '')))
|
||
return
|
||
|
||
# -- Clear History (admin action) ------------------------------------------
|
||
elif t == 'clear_history':
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
cprint(f"{tc['YELLOW']}🗑️ Chat history cleared by {data.get('cleared_by', 'admin')}{tc['RESET']}")
|
||
return
|
||
|
||
# -- Room Change -----------------------------------------------------------
|
||
elif t == 'room_change':
|
||
room = data.get('room', room)
|
||
settings = data.get('settings', {})
|
||
|
||
# Auto-initialize SHARED security if a persistent salt exists (Suggestion 1)
|
||
if security.mode == MODE_SHARED and settings.get('shared_salt') and security.shared_passphrase:
|
||
salt_hex = settings['shared_salt']
|
||
security.derive_shared(bytes.fromhex(salt_hex), room)
|
||
cprint(f"{tc['GREEN']}🔒 Room security synchronized with persistent salt.{tc['RESET']}")
|
||
|
||
if settings.get('security') == 'e2ee' and security.mode == MODE_PLAIN:
|
||
cprint(f"{tc['YELLOW']}🔒 This is a SECURE room. Enable /mode e2ee or /mode shared to participate.{tc['RESET']}")
|
||
return
|
||
|
||
# -- Server-Pushed Settings (theme sync) ----------------------------------
|
||
elif t == 'data_update':
|
||
settings = data.get('settings', {})
|
||
if 'theme' in settings:
|
||
new_theme = settings['theme'].lower()
|
||
if new_theme in THEMES:
|
||
CLIENT_SETTINGS['current_theme'] = new_theme
|
||
if 'nicks' in data:
|
||
room_nicks = set(data['nicks'])
|
||
return
|
||
|
||
# -- E2EE Handshakes ------------------------------------------------------
|
||
elif t == 'e2ee_handshake':
|
||
peer_name = data.get('from', '?')
|
||
pub_raw = base64.b64decode(cast(str, data.get('pub', '')))
|
||
security.derive_e2ee(pub_raw, peer_name)
|
||
cprint(f"{T()['GREEN']}🔒 Secure E2EE channel established with {peer_name}{T()['RESET']}")
|
||
return
|
||
|
||
elif t == 'shared_init':
|
||
peer_name = data.get('from', '?')
|
||
salt = base64.b64decode(data.get('salt', ''))
|
||
security.derive_shared(salt, room) # Update room key
|
||
cprint(f"{T()['GREEN']}🔒 Room security updated (SHARED-INIT by {peer_name}){T()['RESET']}")
|
||
return
|
||
|
||
# -- Identity Synchronization (Suggestion 1) ------------------------------
|
||
elif t == 'nick_update':
|
||
new_nick = data.get('nickname')
|
||
if new_nick:
|
||
nickname = str(new_nick)
|
||
cprint(f"{T()['CYAN']}👤 Identity updated: {T()['BOLD']}{nickname}{T()['RESET']}")
|
||
return
|
||
|
||
# -- Message ACK (Read Receipt) --------------------------------------------
|
||
elif t == 'msg_ack':
|
||
return
|
||
|
||
# -- Silently ignore internals ---------------------------------------------
|
||
else:
|
||
if t not in ('ping', 'pong', 'auth_response'):
|
||
pass # Uncomment for debug: cprint(f"[DEBUG] {data}")
|
||
return
|
||
|
||
# ============================================================================
|
||
# Input Handler
|
||
# ============================================================================
|
||
async def input_handler(websocket):
|
||
"""Read stdin lines, process local commands, forward server commands."""
|
||
global nickname, room
|
||
security.outgoing_queue = websocket # Bind for handshake relay
|
||
|
||
commands = [
|
||
'/help', '/join', '/nick', '/msg', '/whoami', '/settings', '/mode', '/crypt', '/security',
|
||
'/list', '/users', '/clear', '/quit', '/theme', '/history', '/e2ee', '/search', '/room',
|
||
'/deleteroom', '/delroom', '/admin', '/admin logs', '/admin vacuum'
|
||
]
|
||
|
||
# Feature 3: Rich Key-Bindings (Suggestion 3)
|
||
kb = KeyBindings() if PROMPT_TOOLKIT_AVAILABLE else None
|
||
if kb:
|
||
@kb.add('escape', 'u') # Alt + u
|
||
def _(event):
|
||
asyncio.create_task(websocket.send(json.dumps({"type": "message", "message": "/users"})))
|
||
event.app.current_buffer.text = ''
|
||
|
||
@kb.add('escape', 'c') # Alt + c
|
||
def _(event):
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
event.app.current_buffer.text = ''
|
||
|
||
# Alt + 1-9 for quick-switch rooms
|
||
for i in range(1, 10):
|
||
@kb.add('escape', str(i))
|
||
def _(event, i=i):
|
||
if len(recent_rooms) >= i:
|
||
target_room = recent_rooms[i-1]
|
||
# We send a join command
|
||
asyncio.create_task(websocket.send(json.dumps({
|
||
"type": "join", "nickname": nickname, "room": target_room
|
||
})))
|
||
# Clear current buffer
|
||
event.app.current_buffer.text = ''
|
||
|
||
session = None
|
||
if PROMPT_TOOLKIT_AVAILABLE:
|
||
import re
|
||
pat = re.compile(r'[/a-zA-Z0-9_\-]+')
|
||
completer = cast(Any, WordCompleter)(
|
||
commands + list(room_nicks),
|
||
pattern=pat,
|
||
ignore_case=True,
|
||
match_middle=False
|
||
)
|
||
session = cast(Any, PromptSession)(completer=completer, key_bindings=kb)
|
||
|
||
global _SESSION_INSTANCE
|
||
_SESSION_INSTANCE = session
|
||
|
||
loop = asyncio.get_running_loop()
|
||
|
||
while True:
|
||
try:
|
||
# Slimmer prompt: [ROOM/NICK]>
|
||
mode_sym = "🌐" if server_url.startswith('ws') else "📡"
|
||
prompt_str = f"[{room}/{nickname}]> "
|
||
|
||
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
|
||
try:
|
||
with patch_stdout():
|
||
line = await cast(Any, session).prompt_async(prompt_str, is_password=False)
|
||
except (EOFError, KeyboardInterrupt):
|
||
return # Exit client on Ctrl-D or Ctrl-C at prompt
|
||
else:
|
||
line = await loop.run_in_executor(None, input, prompt_str)
|
||
|
||
if line is None:
|
||
break
|
||
|
||
text = cast(str, line).strip()
|
||
if not text:
|
||
continue
|
||
|
||
# -- LOCAL-ONLY commands -------------------------------------------
|
||
if text.startswith('/'):
|
||
parts = cast(Any, text)[1:].split()
|
||
cmd = parts[0].lower() if parts else ''
|
||
|
||
# /clear – clear terminal
|
||
if cmd == 'clear':
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
continue
|
||
|
||
# /history – show local SQLite history
|
||
elif cmd == 'history':
|
||
limit = int(cast(List[str], parts)[1]) if len(cast(List[str], parts)) > 1 and cast(List[str], parts)[1].isdigit() else 50
|
||
rows = await loop.run_in_executor(None, get_local_history, limit, room)
|
||
tc = T()
|
||
cprint(f"\n{tc['HEADER']}-- Local History (last {len(rows)}) --{tc['RESET']}")
|
||
for row in rows:
|
||
ts = datetime.fromtimestamp(row['timestamp']).strftime('%H:%M:%S')
|
||
cprint(f"{tc['YELLOW']}[{ts}]{tc['RESET']} [{row['sender']}]: {row['content']}")
|
||
print()
|
||
continue
|
||
|
||
# /theme <name> – switch theme locally
|
||
elif cmd == 'theme':
|
||
if len(parts) < 2:
|
||
cprint(f"Available themes: {', '.join(THEMES.keys())}")
|
||
else:
|
||
apply_theme(parts[1])
|
||
continue
|
||
|
||
# /ascii <path> [@nick] [width] – send ascii art
|
||
elif cmd == 'ascii':
|
||
if not PILLOW_AVAILABLE:
|
||
cprint(f"{T()['RED']}Pillow not installed.{T()['RESET']}")
|
||
continue
|
||
if len(parts) < 2:
|
||
print("Usage: /ascii <image_path> [@nick] [width]")
|
||
continue
|
||
target = None
|
||
a_width = 60 # Default width
|
||
path_parts = parts[1:]
|
||
if path_parts and path_parts[-1].startswith('@'):
|
||
target = path_parts[-1][1:]
|
||
path_parts = path_parts[:-1]
|
||
if path_parts and path_parts[-1].isdigit():
|
||
a_width = int(path_parts[-1])
|
||
path_parts = path_parts[:-1]
|
||
f_path = " ".join(path_parts).strip().strip('"').strip("'")
|
||
|
||
if not f_path:
|
||
cprint(f"{T()['RED']}Error: No image path provided.{T()['RESET']}")
|
||
continue
|
||
|
||
art = image_to_ascii(f_path, width=a_width)
|
||
if art:
|
||
ciphertext, enc = security.encrypt(art, target)
|
||
await websocket.send(json.dumps({
|
||
"type": "message",
|
||
"message": f"/msg {target} {ciphertext}" if target else ciphertext,
|
||
"encrypted": enc,
|
||
"mode": security.mode
|
||
}))
|
||
cprint(f"{T()['GREEN']}ASCII art sent to {target or room}:{T()['RESET']}")
|
||
print(art)
|
||
continue
|
||
|
||
# /sendfile <nick> <path> – offer a file
|
||
elif cmd == 'sendfile':
|
||
if len(parts) < 3:
|
||
cprint(f"{T()['RED']}Usage: /sendfile <nickname> <filepath>{T()['RESET']}")
|
||
continue
|
||
target = parts[1]
|
||
filepath = ' '.join(parts[2:])
|
||
if not os.path.exists(filepath):
|
||
cprint(f"{T()['RED']}File not found: {filepath}{T()['RESET']}")
|
||
continue
|
||
tid = f"{nickname}_{int(time.time())}"
|
||
filesize = os.path.getsize(filepath)
|
||
# Store with local path so we can read on accept
|
||
pending_file_transfers[tid] = {
|
||
'filepath': filepath, 'target': target,
|
||
'transfer_id': tid, 'filename': os.path.basename(filepath)
|
||
}
|
||
await websocket.send(json.dumps({
|
||
"type": "file_offer",
|
||
"target": target,
|
||
"filename": os.path.basename(filepath),
|
||
"size": filesize,
|
||
"transfer_id": tid
|
||
}))
|
||
cprint(f"{T()['GREEN']}📤 File offer sent to {target}.{T()['RESET']}")
|
||
continue
|
||
|
||
# /accept <tid> – accept a file offer
|
||
elif cmd == 'accept':
|
||
if len(parts) < 2:
|
||
cprint(f"{T()['RED']}Usage: /accept <transfer_id>{T()['RESET']}")
|
||
continue
|
||
tid = parts[1]
|
||
if tid in pending_file_transfers:
|
||
offer = pending_file_transfers[tid]
|
||
await websocket.send(json.dumps({
|
||
"type": "file_accept",
|
||
"transfer_id": tid,
|
||
"target": offer.get('from', offer.get('target', ''))
|
||
}))
|
||
cprint(f"{T()['GREEN']}✅ Accepted transfer {tid}.{T()['RESET']}")
|
||
else:
|
||
cprint(f"{T()['RED']}No pending transfer: {tid}{T()['RESET']}")
|
||
continue
|
||
|
||
# /color – legacy colour picker (no-op, for compat)
|
||
elif cmd == 'color':
|
||
cprint(f"{T()['YELLOW']}Use /theme <name> to change look. /settings theme <name> for server sync.{T()['RESET']}")
|
||
continue
|
||
|
||
# /login – interactive prompt with password masking
|
||
elif cmd == 'login':
|
||
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
|
||
try:
|
||
default_user = parts[1] if len(parts) > 1 else ""
|
||
u = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Username: ", default=default_user)))
|
||
if not u: continue
|
||
p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Password: ", is_password=True)))
|
||
if not p: continue
|
||
await websocket.send(json.dumps({'type': 'message', 'message': f'/login {u} {p}'}))
|
||
continue
|
||
except KeyboardInterrupt:
|
||
cprint(f"\n{T()['YELLOW']}Login cancelled.{T()['RESET']}")
|
||
continue
|
||
|
||
# /password – interactive change with masking
|
||
elif cmd == 'password':
|
||
if PROMPT_TOOLKIT_AVAILABLE and CLIENT_SETTINGS.get('interactive_prompts'):
|
||
try:
|
||
old_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Current Password: ", is_password=True)))
|
||
if not old_p: continue
|
||
new_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("New Password: ", is_password=True)))
|
||
if not new_p: continue
|
||
conf_p = await loop.run_in_executor(None, cast(Any, lambda: pt_prompt("Confirm New Password: ", is_password=True)))
|
||
if not conf_p: continue
|
||
if new_p != conf_p:
|
||
cprint(f"{T()['RED']}Passwords do not match!{T()['RESET']}")
|
||
continue
|
||
await websocket.send(json.dumps({'type': 'message', 'message': f'/password {old_p} {new_p}'}))
|
||
continue
|
||
except KeyboardInterrupt:
|
||
cprint(f"\n{T()['YELLOW']}Cancelled.{T()['RESET']}")
|
||
continue
|
||
|
||
# /mode [plain|shared|e2ee]
|
||
elif cmd == 'mode':
|
||
if len(parts) < 2:
|
||
cprint(f"{T()['YELLOW']}Current Security: {security.mode.upper()}{T()['RESET']}")
|
||
continue
|
||
success, msg = security.set_mode(parts[1].lower())
|
||
color = T()['GREEN'] if success else T()['RED']
|
||
cprint(f"{color}{msg}{T()['RESET']}")
|
||
|
||
# If we switched to SHARED, we need to init
|
||
if success and security.mode == MODE_SHARED and security.shared_passphrase:
|
||
salt = os.urandom(16)
|
||
security.derive_shared(salt, room)
|
||
await websocket.send(json.dumps({
|
||
"type": "shared_init", "salt": base64.b64encode(salt).decode()
|
||
}))
|
||
continue
|
||
|
||
# /crypt <passphrase>
|
||
elif cmd == 'crypt':
|
||
if len(parts) < 2:
|
||
cprint(f"{T()['RED']}Usage: /crypt <passphrase>{T()['RESET']}")
|
||
else:
|
||
security.shared_passphrase = parts[1]
|
||
cprint(f"{T()['GREEN']}Room/Group Passphrase updated. (Use /sec high or /sec locked to apply){T()['RESET']}")
|
||
continue
|
||
|
||
# /security [high|stealth|locked|open] (New User-Friendly Hybrid Command)
|
||
elif cmd in ('security', 'sec'):
|
||
tc = T()
|
||
if len(parts) < 2:
|
||
cprint(f"\n{tc['HEADER']}-- Security Profiles --{tc['RESET']}")
|
||
cprint(f" {tc['BOLD']}high{tc['RESET']} - E2EE (Private) + SHARED (Room)")
|
||
cprint(f" {tc['BOLD']}stealth{tc['RESET']} - E2EE (Private) + PLAIN (Room)")
|
||
cprint(f" {tc['BOLD']}locked{tc['RESET']} - SHARED (Everything)")
|
||
cprint(f" {tc['BOLD']}open{tc['RESET']} - PLAIN (Everything)")
|
||
cprint(f"\nCurrent Mode: {tc['CYAN']}{security.mode.upper()}{tc['RESET']}")
|
||
continue
|
||
|
||
sub = parts[1].lower()
|
||
if sub == 'high':
|
||
# E2EE Mode but with SHARED room fallback
|
||
security.mode = MODE_E2EE
|
||
cprint(f"{tc['GREEN']}Profile Set: HIGH SECURITY (Hybrid E2EE + SHARED){tc['RESET']}")
|
||
# Trigger shared init if possible
|
||
if security.shared_passphrase:
|
||
salt = os.urandom(16)
|
||
security.derive_shared(salt, room)
|
||
await websocket.send(json.dumps({"type": "shared_init", "salt": base64.b64encode(salt).decode()}))
|
||
elif sub == 'stealth':
|
||
security.mode = MODE_E2EE
|
||
cprint(f"{tc['GREEN']}Profile Set: STEALTH (E2EE Private + PLAIN Room){tc['RESET']}")
|
||
elif sub == 'locked':
|
||
security.mode = MODE_SHARED
|
||
cprint(f"{tc['GREEN']}Profile Set: LOCKED (Full SHARED Encryption){tc['RESET']}")
|
||
if security.shared_passphrase:
|
||
salt = os.urandom(16)
|
||
security.derive_shared(salt, room)
|
||
await websocket.send(json.dumps({"type": "shared_init", "salt": base64.b64encode(salt).decode()}))
|
||
elif sub == 'open':
|
||
security.mode = MODE_PLAIN
|
||
cprint(f"{tc['YELLOW']}Profile Set: OPEN (No Encryption){tc['RESET']}")
|
||
else:
|
||
cprint(f"{tc['RED']}Unknown profile. Use: /security [high|stealth|locked|open]{tc['RESET']}")
|
||
continue
|
||
|
||
# /msg – private message (supports multiple targets: /msg nick1,nick2 msg)
|
||
elif cmd == 'msg':
|
||
if len(parts) > 2:
|
||
target_list = parts[1].split(',')
|
||
body = ' '.join(parts[2:])
|
||
|
||
for target in target_list:
|
||
target = target.strip()
|
||
if not target: continue
|
||
|
||
# Auto-handshake for E2EE if no key exists
|
||
if security.mode == MODE_E2EE and target not in security.active_keys:
|
||
await websocket.send(json.dumps({
|
||
"type": "e2ee_handshake", "target": target,
|
||
"pub": base64.b64encode(security.pub_bytes).decode()
|
||
}))
|
||
cprint(f"{T()['YELLOW']}Handshake sent to {target}. Retrying message shortly...{T()['RESET']}")
|
||
# Continue to next target so we don't block
|
||
continue
|
||
|
||
try:
|
||
ciphertext, enc = security.encrypt(body, target)
|
||
await websocket.send(json.dumps({
|
||
"type": "message", "message": f"/msg {target} {ciphertext}",
|
||
"encrypted": enc, "mode": security.mode
|
||
}))
|
||
# Local echo
|
||
ts = datetime.now().strftime('%H:%M:%S')
|
||
lock = "🔒 " if enc else ""
|
||
cprint(f"{T()['MAGENTA']}[{ts}] {lock}[Whisper to {target}]: {body}{T()['RESET']}")
|
||
except SecurityError as e:
|
||
cprint(f"{T()['RED']}Error for {target}: {e}{T()['RESET']}")
|
||
continue
|
||
|
||
# /client-settings – local config panel
|
||
elif cmd == 'client-settings':
|
||
if len(parts) < 2:
|
||
tc = T()
|
||
cprint(f"\n{tc['HEADER']}-- Client Settings --{tc['RESET']}")
|
||
cprint(f" Theme: {CLIENT_SETTINGS.get('current_theme', 'standard')}")
|
||
cprint(f" Interactive prompts: {'ON' if CLIENT_SETTINGS['interactive_prompts'] else 'OFF'}")
|
||
cprint(f" Show message IDs: {'ON' if CLIENT_SETTINGS.get('show_ids') else 'OFF'}")
|
||
cprint(f" History DB: {CLIENT_SETTINGS['db_path']}")
|
||
cprint(f"\nUsage: /client-settings prompts on|off")
|
||
else:
|
||
sub = parts[1].lower()
|
||
if sub in ('prompts', 'interactive'):
|
||
val = parts[2].lower() if len(parts) > 2 else 'off'
|
||
CLIENT_SETTINGS['interactive_prompts'] = val in ('on', 'true', '1', 'enable')
|
||
cprint(f"{T()['GREEN']}Interactive prompts {'ON' if CLIENT_SETTINGS['interactive_prompts'] else 'OFF'}{T()['RESET']}")
|
||
elif sub == 'ids':
|
||
val = parts[2].lower() if len(parts) > 2 else 'off'
|
||
CLIENT_SETTINGS['show_ids'] = val in ('on', 'true', '1', 'enable')
|
||
cprint(f"{T()['GREEN']}show_ids {'ON' if CLIENT_SETTINGS['show_ids'] else 'OFF'}{T()['RESET']}")
|
||
continue
|
||
|
||
# /me – action message sent directly
|
||
elif cmd == 'me':
|
||
if len(parts) > 1:
|
||
action = ' '.join(parts[1:])
|
||
await websocket.send(json.dumps({"type": "message", "message": f"/me {action}"}))
|
||
continue
|
||
|
||
# -- Forward to server ---------------------------------------------
|
||
payload = None
|
||
if not text.startswith('/'):
|
||
try:
|
||
ciphertext, enc = security.encrypt(text, None) # None uses room
|
||
payload = {
|
||
"type": "message", "message": ciphertext,
|
||
"encrypted": enc, "mode": security.mode
|
||
}
|
||
|
||
# 1. Local Echo (Snappy UI)
|
||
ts = datetime.now().strftime('%H:%M:%S')
|
||
tc = T()
|
||
prefix = ""
|
||
if security.mode == MODE_E2EE:
|
||
prefix = f"{tc['YELLOW']}[🌐 Private]{tc['RESET']} "
|
||
elif security.mode == MODE_SHARED:
|
||
prefix = f"{tc['GREEN']}[🔒 SHARED]{tc['RESET']} "
|
||
|
||
print(f"{tc['YELLOW']}[{ts}]{tc['RESET']} {prefix}{tc['BOLD']}[{nickname}]{tc['RESET']} {text}")
|
||
except SecurityError as e:
|
||
cprint(f"{T()['RED']}Security Error: {e}{T()['RESET']}")
|
||
continue
|
||
else:
|
||
final = text
|
||
payload = {"type": "message", "message": final}
|
||
|
||
if payload:
|
||
try:
|
||
await websocket.send(json.dumps(payload))
|
||
except ConnectionClosed:
|
||
if not text.startswith('/'):
|
||
# Offline Draft Queueing!
|
||
queue_draft_message(room, payload)
|
||
break
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}[Send error] {e}{T()['RESET']}")
|
||
# Assume disconnected if random send error occurs
|
||
if not text.startswith('/'):
|
||
queue_draft_message(room, payload)
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
break
|
||
except ConnectionClosed:
|
||
break
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}[Input error] {e}{T()['RESET']}")
|
||
break
|
||
|
||
# ============================================================================
|
||
# Outgoing Queue Sender (file chunks and other async sends)
|
||
# ============================================================================
|
||
async def queue_sender(websocket):
|
||
"""Drain the outgoing_queue and send messages over WebSocket."""
|
||
while True:
|
||
try:
|
||
msg = await cast(asyncio.Queue, outgoing_queue).get()
|
||
await websocket.send(json.dumps(msg))
|
||
cast(Any, outgoing_queue).task_done()
|
||
except asyncio.CancelledError:
|
||
break
|
||
except ConnectionClosed:
|
||
break
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}[Queue sender error] {e}{T()['RESET']}")
|
||
|
||
# ============================================================================
|
||
# Receive Loop
|
||
# ============================================================================
|
||
async def receive_handler(websocket):
|
||
"""Continuously receive and display server messages."""
|
||
try:
|
||
async for raw in websocket:
|
||
try:
|
||
data = json.loads(cast(Any, raw))
|
||
except json.JSONDecodeError:
|
||
cprint(f"{T()['RED']}[Invalid JSON] {raw[:120]}{T()['RESET']}")
|
||
continue
|
||
display_message(data)
|
||
except ConnectionClosed:
|
||
pass
|
||
except Exception as e:
|
||
cprint(f"{T()['RED']}[Receive error] {e}{T()['RESET']}")
|
||
|
||
# ============================================================================
|
||
# Main Client with Auto-Reconnect
|
||
# ============================================================================
|
||
async def chat_client():
|
||
global nickname, room, server_url, outgoing_queue
|
||
|
||
setup_wizard()
|
||
outgoing_queue = asyncio.Queue()
|
||
retry_count: int = 0
|
||
|
||
while True:
|
||
try:
|
||
session_start = time.time()
|
||
|
||
async with websockets.connect(
|
||
server_url,
|
||
ping_interval=20,
|
||
ping_timeout=60,
|
||
max_size=10 * 1024 * 1024 # 10 MB for file chunks
|
||
) as websocket:
|
||
|
||
# -- 1. Send join ----------------------------------------------
|
||
await websocket.send(json.dumps({
|
||
"type": "join",
|
||
"nickname": nickname,
|
||
"room": room
|
||
}))
|
||
|
||
# -- 2. Auth Handshake & Auto-login ----------------------------
|
||
try:
|
||
raw = await asyncio.wait_for(websocket.recv(), timeout=5.0)
|
||
data = json.loads(raw)
|
||
|
||
if data.get('type') == 'auth_required':
|
||
import getpass
|
||
loop = asyncio.get_running_loop()
|
||
|
||
saved_pass = CLIENT_SETTINGS.get('last_pass', '')
|
||
use_saved = False
|
||
|
||
if saved_pass and CLIENT_SETTINGS.get('save_profile'):
|
||
cprint(f"{Colors.GREEN}[*] Using saved credentials for '{nickname}'...{Colors.RESET}")
|
||
pwd = saved_pass
|
||
use_saved = True
|
||
else:
|
||
pwd = await loop.run_in_executor(
|
||
None, cast(Any, getpass.getpass),
|
||
f"{Colors.YELLOW}{data.get('message', 'Password: ')}{Colors.RESET} "
|
||
)
|
||
|
||
|
||
# Store temporary password on the websocket object handle (used by display_message tracker)
|
||
# Note: This is an ad-hoc attribute on the wrapper but works for this single-instance client.
|
||
# In the real code we don't have 'self' in chat_client, but display_message is global.
|
||
# I will store it in a global for simplicity in this specific client architecture.
|
||
global _LAST_PASS_USED
|
||
_LAST_PASS_USED = cast(str, pwd)
|
||
|
||
await websocket.send(json.dumps({'type': 'auth_response', 'password': cast(str, pwd)}))
|
||
|
||
# Wait for result of authentication
|
||
raw2 = await asyncio.wait_for(websocket.recv(), timeout=15.0)
|
||
data2 = json.loads(raw2)
|
||
if data2.get('type') == 'error':
|
||
cprint(f"{Colors.RED}[X] {data2.get('message')}{Colors.RESET}")
|
||
if use_saved:
|
||
cprint(f"{Colors.YELLOW}[!] Saved password failed. Clearing it.{Colors.RESET}")
|
||
save_setting('last_pass', '')
|
||
await asyncio.sleep(RECONNECT_DELAY)
|
||
continue
|
||
else:
|
||
# Success! Save password if "Save Profile" is checked
|
||
if CLIENT_SETTINGS.get('save_profile'):
|
||
save_setting('last_pass', pwd)
|
||
display_message(data2)
|
||
else:
|
||
display_message(data)
|
||
|
||
except asyncio.TimeoutError:
|
||
pass # No immediate message – proceed
|
||
except Exception as e:
|
||
cprint(f"{Colors.RED}Handshake error: {e}{Colors.RESET}")
|
||
|
||
# -- 3. Offline Draft Drain -------------------------------------
|
||
drafts = await asyncio.to_thread(cast(Any, get_and_clear_drafts))
|
||
if drafts:
|
||
cprint(f"{T()['YELLOW']}[Offline] Sending {len(drafts)} drafted message(s)...{T()['RESET']}")
|
||
for d_payload_str in cast(list, drafts):
|
||
try:
|
||
# Parse it back to json and put it in the robust outgoing queue so it retries nicely if it fails
|
||
payload_evt = json.loads(d_payload_str)
|
||
await outgoing_queue.put(payload_evt)
|
||
except Exception:
|
||
pass
|
||
|
||
# -- 4. Start tasks ---------------------------------------------
|
||
# HEARTBEAT WORKER (Suggestion 1)
|
||
async def heartbeat_worker(ws):
|
||
while True:
|
||
try:
|
||
await asyncio.sleep(30)
|
||
await ws.send(json.dumps({"type": "ping"}))
|
||
except Exception:
|
||
break
|
||
|
||
input_task = asyncio.create_task(input_handler(websocket))
|
||
recv_task = asyncio.create_task(receive_handler(websocket))
|
||
sender_task = asyncio.create_task(queue_sender(websocket))
|
||
heartbeat_task = asyncio.create_task(heartbeat_worker(websocket))
|
||
|
||
done, pending = await asyncio.wait(
|
||
[input_task, recv_task, sender_task, heartbeat_task],
|
||
return_when=asyncio.FIRST_COMPLETED
|
||
)
|
||
|
||
for task in pending:
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# User typed /quit or Ctrl-D -> don't reconnect
|
||
if input_task in done and not recv_task.done():
|
||
cprint(f"{Colors.YELLOW}\nDisconnecting cleanly.{Colors.RESET}")
|
||
return
|
||
|
||
# Auto-reconnect with Exponential Backoff
|
||
session_duration = time.time() - session_start
|
||
cprint(f"{Colors.RED}\nConnection lost.{Colors.RESET}")
|
||
|
||
# If we failed very quickly, maybe the nickname is taken or rejected
|
||
if session_duration < 2:
|
||
loop = asyncio.get_running_loop()
|
||
try:
|
||
cprint(f"{Colors.YELLOW}Connection lasted < 2s. Identity rejection?{Colors.RESET}")
|
||
choice = await loop.run_in_executor(
|
||
None, cast(Any, lambda: input(f"Try a different Nickname/Room? (y/N): ").strip().lower())
|
||
)
|
||
if choice == 'y':
|
||
new_nick = await loop.run_in_executor(None, cast(Any, lambda: input(f"New Nickname (Current: {nickname}): ").strip()))
|
||
if new_nick: nickname = new_nick
|
||
new_room = await loop.run_in_executor(None, cast(Any, lambda: input(f"New Room (Current: {room}): ").strip()))
|
||
if new_room: room = new_room; save_setting('last_room', room)
|
||
except: pass
|
||
|
||
# Reset retry count if we were connected for a meaningful amount of time
|
||
if session_duration > 30:
|
||
retry_count = 0
|
||
else:
|
||
retry_count += 1
|
||
|
||
# 2^retry_count, maxed at 60 seconds
|
||
current_backoff = min(60, 2 ** int(cast(Any, retry_count)))
|
||
|
||
cprint(f"{Colors.GREEN}Reconnecting as '{nickname}' in {current_backoff}s...{Colors.RESET}")
|
||
await asyncio.sleep(current_backoff)
|
||
|
||
except (ConnectionRefusedError, OSError, WebSocketException) as e:
|
||
current_backoff = min(30, (cast(int, retry_count) + 1) * 5)
|
||
cprint(f"{Colors.RED}Cannot connect: {e}. Retrying in {current_backoff}s...{Colors.RESET}")
|
||
await asyncio.sleep(current_backoff)
|
||
retry_count += 1
|
||
except KeyboardInterrupt:
|
||
cprint(f"{Colors.YELLOW}\nInterrupted. Goodbye!{Colors.RESET}")
|
||
return
|
||
|
||
# ============================================================================
|
||
# Entry Point
|
||
# ============================================================================
|
||
if __name__ == "__main__":
|
||
# Ensure UTF-8 and VT mode on Windows for better emoji/box/color support
|
||
if os.name == 'nt':
|
||
enable_windows_vt()
|
||
if hasattr(sys.stdout, 'reconfigure'):
|
||
cast(Any, sys.stdout).reconfigure(encoding='utf-8')
|
||
|
||
try:
|
||
asyncio.run(chat_client())
|
||
except KeyboardInterrupt:
|
||
cprint(f"\n{Colors.YELLOW}Goodbye!{Colors.RESET}")
|
||
|