Files
Command-Line-Radio/CLRadio.py
2026-03-02 09:54:01 -05:00

1333 lines
60 KiB
Python

#!/usr/bin/env python3
"""
CLRadio Unified v2.0 - Senior Cryptography Overhaul
- MODE_PLAIN: Unencrypted
- MODE_SHARED: PBKDF2-HMAC-SHA256 (200k iter) + Fernet
- MODE_E2EE: X25519 Diffie-Hellman + HKDF-SHA256 + Fernet
Security Properties:
- No static salts.
- Per-connection session keys for E2EE.
- Forward Secrecy (ephemeral keys per connection).
- Fail-closed design.
"""
import asyncio
import socket
import json
import os
import sys
import shlex
import random
import string
import zlib
import base64
import time
import secrets
import hashlib
from datetime import datetime
from pathlib import Path
from collections import defaultdict
from typing import Any, Dict, Set, List, Optional, cast, Union
# Optional dependencies - Initialize to None to avoid unbound errors
x25519 = None
hashes = None
serialization = None
PBKDF2HMAC = None
Fernet = None
HKDF = None
default_backend = None
PromptSession = None
print_formatted_text = None
patch_stdout = None
Style = None
HTML = None
ANSI = None
WordCompleter = None
CRYPTOGRAPHY_AVAILABLE = False
PROMPT_TOOLKIT_AVAILABLE = False
try:
from cryptography.fernet import Fernet, InvalidToken # type: ignore
from cryptography.hazmat.primitives import hashes, serialization # type: ignore
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # type: ignore
from cryptography.hazmat.primitives.asymmetric import x25519 # type: ignore
from cryptography.hazmat.primitives.kdf.hkdf import HKDF # type: ignore
from cryptography.hazmat.backends import default_backend # type: ignore
CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
class InvalidToken(Exception): pass
Fernet = None
hashes = None
serialization = None
PBKDF2HMAC = None
x25519 = None
HKDF = None
default_backend = None
CRYPTOGRAPHY_AVAILABLE = False
try:
from prompt_toolkit import PromptSession, print_formatted_text # type: ignore
from prompt_toolkit.patch_stdout import patch_stdout # type: ignore
from prompt_toolkit.styles import Style # type: ignore
from prompt_toolkit.formatted_text import HTML, ANSI # type: ignore
from prompt_toolkit.completion import WordCompleter # type: ignore
PROMPT_TOOLKIT_AVAILABLE = True
except ImportError:
pass
# ----------------------------------------------------------------------------
# Constants & Enums
# ----------------------------------------------------------------------------
MODE_PLAIN = "plain"
MODE_SHARED = "shared"
MODE_E2EE = "e2ee"
# ----------------------------------------------------------------------------
# ANSI Colors
# ----------------------------------------------------------------------------
class SecurityError(Exception):
"""Custom exception for security-related failures."""
pass
class Colors:
HEADER = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
RESET = '\033[0m'
BOLD = '\033[1m'
_SESSION_INSTANCE: Optional[Any] = None
def pt_prompt(prompt_str, is_password=False, default=""):
"""Helper to call PromptSession.prompt with fallback to input()."""
global _SESSION_INSTANCE
if PROMPT_TOOLKIT_AVAILABLE and _SESSION_INSTANCE:
try:
return cast(Any, _SESSION_INSTANCE).prompt(prompt_str, is_password=is_password, default=default)
except (KeyboardInterrupt, EOFError):
return ""
else:
if is_password:
import getpass
return getpass.getpass(prompt_str)
return input(prompt_str)
# ----------------------------------------------------------------------------
# Crypto Manager Class
# ----------------------------------------------------------------------------
class SecurityManager:
"""Manages encryption states, key exchanges, and secure messaging."""
def __init__(self):
self.mode = MODE_PLAIN
self.shared_passphrase = None
self.active_keys = {} # ip -> Fernet object
self.pending_salts = {} # ip -> salt
# X25519 state
if CRYPTOGRAPHY_AVAILABLE:
self.private_key = cast(Any, x25519).X25519PrivateKey.generate()
self.public_key_bytes = self.private_key.public_key().public_bytes_raw()
else:
self.private_key = None
self.public_key_bytes = None
def set_mode(self, new_mode):
if new_mode not in [MODE_PLAIN, MODE_SHARED, MODE_E2EE]:
return False, "Invalid mode"
if new_mode != MODE_PLAIN and not CRYPTOGRAPHY_AVAILABLE:
return (False, "Cryptography library missing")
self.mode = new_mode
# Clear existing keys when switching modes to prevent reuse
self.active_keys.clear()
return (True, f"Security mode set to {new_mode.upper()}")
def set_passphrase(self, pwd):
self.shared_passphrase = pwd
self.active_keys.clear() # Reset keys derived from old passphrase
def derive_shared_key(self, salt, ip):
"""Derive symmetric key from shared passphrase and random salt."""
if not self.shared_passphrase:
raise ValueError("No shared passphrase set")
kdf = cast(Any, PBKDF2HMAC)(
algorithm=cast(Any, hashes).SHA256(),
length=32,
salt=salt,
iterations=200000,
)
key_bytes = kdf.derive(cast(str, self.shared_passphrase).encode()) if self.shared_passphrase else b""
key = base64.urlsafe_b64encode(key_bytes)
self.active_keys[ip] = cast(Any, Fernet)(key)
def derive_e2ee_key(self, peer_pub_bytes, ip):
"""Compute X25519 shared secret and derive session key via HKDF."""
peer_public_key = cast(Any, x25519).X25519PublicKey.from_public_bytes(peer_pub_bytes)
shared_secret = self.private_key.exchange(peer_public_key)
hkdf = cast(Any, HKDF)(
algorithm=cast(Any, hashes).SHA256(),
length=32,
salt=None,
info=b"clradio-e2ee",
)
key = base64.urlsafe_b64encode(hkdf.derive(shared_secret))
self.active_keys[ip] = cast(Any, Fernet)(key)
def encrypt(self, data_bytes, ip):
"""Encrypt bytes using active key for target IP."""
if self.mode == MODE_PLAIN:
return data_bytes
key = self.active_keys.get(ip)
if not key:
raise ConnectionError(f"No active session key for {ip}. Handshake required.")
return key.encrypt(data_bytes)
def decrypt(self, data_bytes, ip):
"""Decrypt bytes using active key for IP."""
if self.mode == MODE_PLAIN:
return data_bytes
key = self.active_keys.get(ip)
if not key:
# We might not have a key yet if the sender just switched
return None
try:
return key.decrypt(data_bytes)
except InvalidToken:
raise SecurityError("Decryption failed: Invalid token or key mismatch.")
# Global Manager
security = SecurityManager()
# ----------------------------------------------------------------------------
# Global State
# ----------------------------------------------------------------------------
nickname = ""
local_ip = "127.0.0.1"
local_ips: List[str] = ["127.0.0.1"]
listening_port = 443
discovery_port = 8889
targets = []
active_peers: Dict[str, datetime] = {}
radar_peers: Dict[str, Any] = {}
target_ip: str = ""
target_port: int = 443
messages_sent: int = 0
messages_received: int = 0
message_log: List[str] = []
file_buffers: Dict[str, Any] = {}
# Connection Approval State
auto_accept: bool = False
approved_ips: Set[str] = set()
blocked_ips: Dict[str, str] = {} # ip -> nick
pending_reqs: Dict[str, str] = {} # ip -> nick
nicknames_cache: Dict[str, str] = {} # ip -> nick
# Extended State
user_status = "Available"
current_channel = "global"
logging_enabled = False
compression_enabled = True
show_timestamps = True
relay_enabled = False
stealth_enabled = False
vault_enabled = False
aliases: Dict[str, str] = {} # alias -> ip
config_path = Path("clradio_config.json")
log_file = None
seen_mids: Set[str] = set()
# Connectivity & History
peer_health: Dict[str, Dict[str, Any]] = {}
# ----------------------------------------------------------------------------
# Synchronous Disk Workers (for asyncio.to_thread)
# ----------------------------------------------------------------------------
def _sync_save_config(p_path: Path, p_data: Dict[str, Any]):
try:
p_path.write_text(json.dumps(p_data, indent=4))
except Exception: pass
def _sync_save_peers(p_path: Path, p_targets: List[Dict[str, Any]]):
try:
p_path.write_text(json.dumps(p_targets))
except Exception: pass
def _sync_write_file(p_path: Path, p_data: bytes):
try:
p_path.write_bytes(p_data)
except Exception: pass
def _sync_write_stream(p_path: Path, p_chunks: Dict[int, bytes], p_total: int):
try:
with open(p_path, "wb") as f:
for i in range(p_total):
if i in p_chunks:
f.write(p_chunks[i])
except Exception: pass
def _sync_log_worker(p_text: str):
global log_file
try:
if not log_file:
Path("logs").mkdir(exist_ok=True)
fname_log = f"logs/chat_{datetime.now().strftime('%Y%m%d_%H%M')}.txt"
log_file = open(fname_log, "a", encoding="utf-8")
log_file.write(str(p_text) + "\n")
log_file.flush()
except Exception: pass
async def save_config():
"""Persist settings and security context"""
config_data = {
"nickname": nickname,
"auto_accept": auto_accept,
"blocked_ips": blocked_ips,
"approved_ips": list(approved_ips),
"user_status": user_status,
"current_channel": current_channel,
"logging_enabled": logging_enabled,
"compression_enabled": compression_enabled,
"show_timestamps": show_timestamps,
"relay_enabled": relay_enabled,
"stealth_enabled": stealth_enabled,
"vault_enabled": vault_enabled,
"aliases": aliases
}
await asyncio.to_thread(cast(Any, _sync_save_config), config_path, config_data)
def load_config():
"""Restore state from disk"""
global nickname, auto_accept, blocked_ips, approved_ips, user_status, \
current_channel, logging_enabled, compression_enabled, show_timestamps, \
relay_enabled, stealth_enabled, vault_enabled, aliases
try:
if config_path.exists():
data = json.loads(config_path.read_text())
nickname = data.get("nickname", nickname)
auto_accept = data.get("auto_accept", auto_accept)
blocked_ips = data.get("blocked_ips", {})
approved_ips = set(data.get("approved_ips", []))
user_status = data.get("user_status", user_status)
current_channel = data.get("current_channel", current_channel)
logging_enabled = data.get("logging_enabled", logging_enabled)
compression_enabled = data.get("compression_enabled", compression_enabled)
show_timestamps = data.get("show_timestamps", show_timestamps)
relay_enabled = data.get("relay_enabled", False)
stealth_enabled = data.get("stealth_enabled", False)
vault_enabled = data.get("vault_enabled", False)
aliases = data.get("aliases", {})
return True
except Exception: pass
return False
async def save_peer(ip: str, port: int):
global targets
peer = {"ip": ip, "port": port}
if peer not in targets:
targets.append(peer)
await asyncio.to_thread(cast(Any, _sync_save_peers), Path("known_peers.json"), targets)
# ----------------------------------------------------------------------------
# Networking Helpers
# ----------------------------------------------------------------------------
async def send_payload(ip, port, payload):
"""Low-level async send with 4-byte length prefix."""
try:
reader, writer = await asyncio.open_connection(ip, port)
payload_bytes = json.dumps(payload).encode()
length = len(payload_bytes).to_bytes(4, 'big')
writer.write(length + payload_bytes)
await writer.drain()
writer.close()
try:
await writer.wait_closed()
except: pass
return True
except Exception:
return False
# ----------------------------------------------------------------------------
# Handshake Logic
# ----------------------------------------------------------------------------
async def initiate_handshake(ip, port):
"""Trigger appropriate handshake based on current security mode."""
# NAT Punch logic: send a tiny UDP packet to nudge firewalls
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(b'', (ip, port))
sock.close()
except: pass
# Auto-approve the person we are actively connecting to
approved_ips.add(ip)
await save_peer(ip, port)
if security.mode == MODE_PLAIN:
# We still need to send the connection request for the other side to see us
pass
elif security.mode == MODE_SHARED:
if not security.shared_passphrase:
return False, "Shared passphrase not set. Use /crypt <pass>"
# Generate random salt
salt = os.urandom(16)
security.derive_shared_key(salt, ip)
# Send salt to peer
payload = {
"type": "shared_init",
"salt": base64.b64encode(salt).decode(),
"nick": nickname
}
await send_payload(ip, port, payload)
elif security.mode == MODE_E2EE:
# Send ephemeral X25519 public key
payload = {
"type": "e2ee_handshake",
"pub": base64.b64encode(security.public_key_bytes).decode(),
"nick": nickname
}
await send_payload(ip, port, payload)
# Send a separate connection request if not already approved
# Note: We send this even if ip in approved_ips if the peer might not have approved us yet
req_payload = {
"type": "conn_request",
"nick": nickname,
"port": listening_port
}
await send_payload(ip, port, req_payload)
asyncio.create_task(save_peer(ip, port))
return True, None
# ----------------------------------------------------------------------------
# Networking - Receiver
# ----------------------------------------------------------------------------
async def handle_incoming(reader, writer):
global messages_received, active_peers, file_buffers, peer_health, radar_peers
addr_info = writer.get_extra_info('peername')
if not addr_info: return
ip: str = str(addr_info[0])
active_peers[ip] = datetime.now()
try:
# read length
len_raw = await reader.readexactly(4)
length = int.from_bytes(len_raw, 'big')
# read payload
data_raw = await reader.readexactly(length)
payload = json.loads(data_raw.decode())
m_type = payload.get("type")
m_nick = str(payload.get("nick", "Unknown"))
m_port = int(payload.get("port", 443))
m_status = str(payload.get("status", ""))
m_chan = str(payload.get("channel", "global"))
m_dst = payload.get("dst_ip")
m_id = payload.get("mid")
# --- LOOP PREVENTION ---
if m_id:
if m_id in seen_mids: return
seen_mids.add(m_id)
if len(seen_mids) > 1000:
seen_mids.clear() # Reset occasionally
# ------------------------
# --- BLIND RELAY / PRIVACY FILTER ---
# If it's a directed message not for us, relay (if enabled) and SHRED.
if m_dst and m_dst not in local_ips and m_dst != "127.0.0.1":
if relay_enabled:
target_p = radar_peers.get(m_dst, {}).get("port", 443)
asyncio.create_task(send_payload(m_dst, target_p, payload))
return # Privacy: Do not process, cache, or log packets not for us.
# Overlay Relay for Shouts (everyone sees these)
if relay_enabled and m_type == "text" and not m_dst:
for p_ip in approved_ips:
if p_ip != ip and p_ip not in local_ips:
p_port = radar_peers.get(p_ip, {}).get("port", 443)
asyncio.create_task(send_payload(p_ip, p_port, payload))
# ------------------------
if ip in blocked_ips:
return
ts = datetime.now().strftime('%H:%M:%S')
# Cache metadata and refresh health
peer_health[ip] = {"online": True, "last_seen": datetime.now()}
radar_peers[ip] = {
"nick": m_nick,
"port": m_port,
"status": m_status,
"channel": m_chan,
"last_seen": datetime.now()
}
# 0. Handle Control Messages
if m_type == "ping":
resp = {"type": "pong", "nick": nickname, "port": listening_port, "t": payload.get("t")}
asyncio.create_task(send_payload(ip, m_port, resp))
return
if m_type == "pong":
sent_time = payload.get("t", 0)
lat = (time.time() - float(sent_time)) * 1000
peer_health[ip] = {"online": True, "latency": lat, "last_seen": datetime.now()}
if payload.get("manual"):
clean_print(f"{Colors.GREEN}[PING] {m_nick}@{ip} returned in {lat:.1f}ms{Colors.RESET}")
return
if m_type == "conn_request":
if ip in approved_ips or auto_accept:
# Auto-approve if setting allows or already approved
resp = {"type": "conn_response", "accepted": True, "nick": nickname, "port": listening_port}
asyncio.create_task(send_payload(ip, m_port, resp))
approved_ips.add(ip)
return
pending_reqs[ip] = m_nick
clean_print(f"\n{Colors.YELLOW}[!] REQUEST: {m_nick}@{ip} wants to connect on port {m_port}.{Colors.RESET}")
clean_print(f"{Colors.CYAN}Type '/accept {ip}' to allow or '/reject {ip}' to deny.{Colors.RESET}")
return
if m_type == "conn_response":
accepted = bool(payload.get("accepted", False))
if accepted:
approved_ips.add(ip)
clean_print(f"{Colors.GREEN}[+] {m_nick}@{ip} accepted your connection request.{Colors.RESET}")
else:
clean_print(f"{Colors.RED}[-] {m_nick}@{ip} rejected your connection request.{Colors.RESET}")
return
# Block unauthorized traffic (except handshakes which are part of connection establishment)
if m_type in ["text", "file", "file_chunk"]:
if ip not in approved_ips and not auto_accept:
# Silently drop or notify? Let's notify once and then discard.
if ip not in pending_reqs:
pending_reqs[ip] = m_nick
clean_print(f"\n{Colors.YELLOW}[!] BLOCKED: {m_nick}@{ip} sent a message but isn't approved.{Colors.RESET}")
clean_print(f"{Colors.CYAN}Type '/accept {ip}' to see future messages.{Colors.RESET}")
return
# 1. Handle Handshakes
if m_type == "e2ee_handshake":
raw_pub = payload.get("pub")
if raw_pub:
peer_pub = base64.b64decode(str(raw_pub))
security.derive_e2ee_key(peer_pub, ip)
# If we haven't sent ours yet, reply (simplified auto-response)
if ip not in security.active_keys:
reply = {
"type": "e2ee_handshake",
"pub": base64.b64encode(security.public_key_bytes).decode(),
"nick": nickname,
"port": listening_port
}
asyncio.create_task(send_payload(ip, m_port, reply))
return
elif m_type == "shared_init":
raw_salt = payload.get("salt")
if raw_salt:
salt = base64.b64decode(str(raw_salt))
try:
security.derive_shared_key(salt, ip)
clean_print(f"{Colors.GREEN}[SEC] Secure session established with {m_nick}@{ip} (SHARED){Colors.RESET}")
except ValueError:
clean_print(f"{Colors.RED}[SEC] Peer initiated SHARED mode, but you have no passphrase set!{Colors.RESET}")
return
# 2. Handle Messages
if m_type == "text":
# Channel Filtering: Only show if message is for our channel OR it's a direct 1-on-1 target
if m_chan != current_channel and m_chan != "global":
# If we are directly connected (target_ip == ip), we allow it regardless of channel
if target_ip != ip:
return
body = str(payload.get("body", ""))
encrypted = bool(payload.get("encrypted", False))
mode = str(payload.get("mode", "plain"))
if encrypted:
if mode != security.mode:
clean_print(f"{Colors.RED}[!] Security mismatch: Peer sent {mode} while you are in {security.mode}.{Colors.RESET}")
return
try:
decrypted_bytes = security.decrypt(base64.b64decode(body), ip)
if decrypted_bytes is None:
clean_print(f"{Colors.YELLOW}[!] Received encrypted msg from {ip} before handshake.{Colors.RESET}")
return
display_text = decrypted_bytes.decode()
except Exception:
display_text = f"{Colors.RED}[Decryption Failed]{Colors.RESET}"
else:
display_text = body
lock_icon = "🔒 " if encrypted else ""
prefix = f"{Colors.CYAN}[E2EE] " if mode == MODE_E2EE else (f"{Colors.YELLOW}[SHARED] " if mode == MODE_SHARED else "")
sender_display = f"{m_nick}@{ip}" if m_nick != "Unknown" else ip
ts_str = f"{Colors.YELLOW}[{ts}]{Colors.RESET} " if show_timestamps else ""
output = f"{ts_str}{Colors.BOLD}{sender_display}{Colors.RESET}: {prefix}{lock_icon}{display_text}"
message_log.append(output)
if len(message_log) > 200: message_log.pop(0)
clean_print(output)
globals()["messages_received"] += 1
if logging_enabled:
asyncio.create_task(log_msg(f"[{ts}] {m_nick or ip}: {display_text}"))
elif m_type == "file":
filename = str(payload.get("filename", "received_file"))
file_data = base64.b64decode(str(payload.get("data", "")))
mode = str(payload.get("mode", "plain"))
encrypted = bool(payload.get("encrypted", False))
compressed = bool(payload.get("compressed", False))
received_hash = payload.get("hash")
if received_hash:
actual_hash = hashlib.sha256(file_data).hexdigest()
if actual_hash != received_hash:
clean_print(f"{Colors.RED}[!] INTEGRITY ERROR: {filename} hash mismatch!{Colors.RESET}")
return
else:
clean_print(f"{Colors.GREEN}[✓] INTEGRITY: {filename} verified (SHA-256).{Colors.RESET}")
if encrypted:
try:
file_data = security.decrypt(file_data, ip)
if file_data is None: raise SecurityError("No key")
except:
clean_print(f"{Colors.RED}[!] File Decryption failed for {filename}{Colors.RESET}")
return
if compressed:
try:
file_data = zlib.decompress(file_data)
except:
clean_print(f"{Colors.RED}[!] File Decompression failed.{Colors.RESET}")
return
# Sanitize filename to prevent path traversal
safe_filename = Path(filename).name
save_path = Path("downloads") / safe_filename
save_path.parent.mkdir(exist_ok=True)
if file_data is not None:
# VAULT Logic: If vault is on, encrypt the local file with session key or similar?
# For now, let's keep it simple: vault means we save with .vault extension and maybe base64 it
if vault_enabled:
save_path = save_path.with_suffix(save_path.suffix + ".vault")
await asyncio.to_thread(cast(Any, _sync_write_file), save_path, file_data)
msg = f"{Colors.GREEN}[💾] Saved: {filename} ({len(file_data)//1024}KB){Colors.RESET}"
message_log.append(msg)
clean_print(msg)
globals()["messages_received"] = int(globals()["messages_received"]) + 1
else:
clean_print(f"{Colors.RED}[!] File data is empty or decryption failed.{Colors.RESET}")
elif m_type == "file_chunk":
fname = str(payload.get("filename", "streamed_file"))
c_idx = int(payload.get("idx", 0))
total = int(payload.get("total", 1))
data_chunk = base64.b64decode(str(payload.get("data", "")))
mode = str(payload.get("mode", "plain"))
encrypted = bool(payload.get("encrypted", False))
# Use filename+ip as a simple transfer ID
tid = f"{ip}_{fname}"
if tid not in file_buffers:
file_buffers[tid] = {"Chunks": {}, "Count": 0, "Total": int(total), "Filename": fname}
buf_info: Dict[str, Any] = file_buffers[tid]
chunks_map: Dict[int, bytes] = buf_info["Chunks"]
if c_idx not in chunks_map:
if encrypted:
data_chunk = security.decrypt(data_chunk, ip)
if data_chunk is None:
# decryption failed or key missing
return
chunks_map[int(c_idx)] = data_chunk
buf_info["Count"] = int(buf_info["Count"]) + 1
cur_count = int(buf_info["Count"])
if cur_count % 10 == 0:
clean_print(f"{Colors.YELLOW}[📡] Receiving {fname}: {cur_count*100//total}%{Colors.RESET}")
if int(buf_info["Count"]) >= total:
# Reassemble
assembled_data = b"".join([chunks_map[i] for i in range(total)])
# Verify Integrity
received_hash = payload.get("hash")
if received_hash:
actual_hash = hashlib.sha256(assembled_data).hexdigest()
if actual_hash != received_hash:
clean_print(f"{Colors.RED}[!] STREAM INTEGRITY FAILED for {fname}!{Colors.RESET}")
file_buffers.pop(tid, None)
return
else:
clean_print(f"{Colors.GREEN}[✓] STREAM INTEGRITY VERIFIED (SHA-256).{Colors.RESET}")
# Sanitize filename
safe_fname = Path(fname).name
final_save_path = Path("downloads") / f"stream_{safe_fname}"
if vault_enabled:
final_save_path = final_save_path.with_suffix(final_save_path.suffix + ".vault")
final_save_path.parent.mkdir(exist_ok=True)
await asyncio.to_thread(cast(Any, _sync_write_file), final_save_path, assembled_data)
msg_stream = f"{Colors.GREEN}[💾] Stream Saved: {final_save_path.name}{Colors.RESET}"
message_log.append(msg_stream)
clean_print(msg_stream)
file_buffers.pop(tid, None)
except Exception:
pass
finally:
writer.close()
async def start_receiver(port):
server = await asyncio.start_server(cast(Any, handle_incoming), '127.0.0.1', port)
clean_print(f"{Colors.GREEN}[OK] Listening on port {port}{Colors.RESET}")
async with server: await server.serve_forever()
# ----------------------------------------------------------------------------
# UI Utilities
# ----------------------------------------------------------------------------
def clean_print(text):
if PROMPT_TOOLKIT_AVAILABLE:
with cast(Any, patch_stdout)():
cast(Any, print_formatted_text)(cast(Any, ANSI)(text))
else:
print(text)
def bottom_toolbar():
now = datetime.now().strftime("%H:%M")
m_info = f"<style bg='ansigreen'> {security.mode.upper()} </style>"
t_info = f"Target: <b>{target_ip}</b>" if target_ip else "No Target"
return cast(Any, HTML)(
f' <b>CLRADIO v2.0</b> | {m_info} | {t_info} | '
f' Sent: {messages_sent} | Received: {messages_received} | {now} '
)
# ----------------------------------------------------------------------------
# Discovery
# ----------------------------------------------------------------------------
async def log_msg(text_to_log: str):
global logging_enabled
if not logging_enabled: return
await asyncio.to_thread(cast(Any, _sync_log_worker), text_to_log)
async def udp_broadcast():
global stealth_enabled
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
try:
if not stealth_enabled:
# 1. Local LAN discovery
msg = json.dumps({
"nick": nickname,
"port": listening_port,
"status": user_status,
"channel": current_channel
})
sock.sendto(msg.encode(), ('<broadcast>', discovery_port))
# 2. Mesh Heartbeats: Pulse all approved IPs
for ip in list(approved_ips):
p = radar_peers.get(ip, {}).get("port", 443)
payload = {"type": "ping", "t": time.time(), "nick": nickname, "port": listening_port}
asyncio.create_task(send_payload(ip, p, payload))
except Exception: pass
await asyncio.sleep(15)
async def udp_listener():
class DiscoveryProtocol(asyncio.DatagramProtocol):
def datagram_received(self, data, addr):
try:
info = json.loads(data.decode())
peer_ip = addr[0]
peer_port = info.get("port", 443)
# Filter out ourselves (same IP and port)
if (peer_ip == local_ip or peer_ip == "127.0.0.1") and peer_port == listening_port:
return
radar_peers[peer_ip] = {
"nick": info.get("nick", "Unknown"),
"port": peer_port,
"status": info.get("status", ""),
"channel": info.get("channel", "global"),
"last_seen": datetime.now()
}
except Exception: pass
loop = asyncio.get_running_loop()
await loop.create_datagram_endpoint(DiscoveryProtocol, local_addr=('0.0.0.0', discovery_port))
while True: await asyncio.sleep(3600)
# ----------------------------------------------------------------------------
# Main Loop
# ----------------------------------------------------------------------------
async def main_loop():
global nickname, local_ip, local_ips, target_ip, target_port, messages_sent, \
listening_port, current_channel, auto_accept, logging_enabled, \
compression_enabled, show_timestamps, approved_ips, blocked_ips, \
pending_reqs, user_status, relay_enabled, stealth_enabled, \
vault_enabled, aliases
print("-" * 60)
print(" CLRADIO UNIFIED v2.4 | SECURE EDITION")
print("-" * 60)
global listening_port
nickname = input("Nickname: ").strip() or f"User{random.randint(100,999)}"
# Securely identify ALL local IPs
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
local_ips = socket.gethostbyname_ex(socket.gethostname())[2]
if local_ip not in local_ips: local_ips.append(local_ip)
except:
local_ips = [local_ip, "127.0.0.1"]
port_in = input(f"Listening Port [443]: ").strip()
listening_port = int(port_in) if port_in else 443
load_config()
asyncio.create_task(start_receiver(listening_port))
asyncio.create_task(udp_broadcast())
asyncio.create_task(udp_listener())
completer = cast(Any, WordCompleter)([
'/mode', '/crypt', '/connect', '/radar', '/sendfile', '/exit',
'/help', '/accept', '/reject', '/settings', '/nick', '/disconnect',
'/whoami', '/block', '/unblock', '/blocked', '/shout', '/status',
'/clear', '/ping', '/panic', '/join', '/leave', '/last', '/peers',
'/alias', '/unalias', '/aliases', '/stealth', '/relay',
'plain', 'shared', 'e2ee', 'on', 'off'
])
def rprompt():
c = len(radar_peers)
return cast(Any, HTML)(f"<ansicyan>Peers: {c}</ansicyan>")
session = cast(Any, PromptSession)(bottom_toolbar=bottom_toolbar, rprompt=rprompt, completer=completer)
global _SESSION_INSTANCE
_SESSION_INSTANCE = session
while True:
try:
prompt = f"[{nickname}{' -> '+target_ip if target_ip else ''}]> "
text = await session.prompt_async(prompt, is_password=False)
text = text.strip()
if not text: continue
if text.startswith("/"):
try:
parts = shlex.split(text[1:])
except ValueError as e:
clean_print(f"{Colors.RED}Parsing error: {e}{Colors.RESET}")
continue
if not parts: continue
cmd = parts[0].lower()
if cmd == "exit": break
elif cmd == "whoami":
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
my_ip = s.getsockname()[0]
s.close()
except Exception: my_ip = "Unknown"
clean_print(f"\n{Colors.CYAN}--- Your Info ---")
clean_print(f" Nickname: {Colors.BOLD}{nickname}{Colors.RESET}")
clean_print(f" Local IP: {Colors.BOLD}{my_ip}{Colors.RESET}")
clean_print(f" Listening: {Colors.BOLD}{listening_port}{Colors.RESET}")
clean_print(f"-----------------{Colors.RESET}")
elif cmd == "disconnect":
if target_ip:
clean_print(f"{Colors.YELLOW}Disconnected from {target_ip}{Colors.RESET}")
target_ip = ""
else:
clean_print(f"{Colors.YELLOW}Not currently connected to anyone.{Colors.RESET}")
elif cmd == "shout":
if not cast(Any, parts)[1:]:
clean_print(f"{Colors.RED}Usage: /shout <message>{Colors.RESET}")
continue
msg_text = " ".join(cast(Any, parts)[1:])
count = 0
shout_id = secrets.token_hex(8)
for ip in list(approved_ips):
# Filter: Peer must be in our channel to hear a shout
peer_chan = radar_peers.get(ip, {}).get("channel", "global")
if current_channel != "global" and peer_chan != current_channel:
continue
p = radar_peers.get(ip, {}).get("port", 443)
payload = {
"type": "text", "body": msg_text, "nick": nickname,
"channel": current_channel,
"encrypted": security.mode != MODE_PLAIN,
"mode": security.mode,
"mid": shout_id
}
if security.mode != MODE_PLAIN:
try:
ciphertext = security.encrypt(msg_text.encode(), ip)
payload["body"] = base64.urlsafe_b64encode(ciphertext).decode()
except Exception: continue
asyncio.create_task(send_payload(ip, p, payload))
count += 1
clean_print(f"{Colors.GREEN}[SHOUT] Broadcasted to {count} peers in #{current_channel}.{Colors.RESET}")
globals()["messages_sent"] = int(globals()["messages_sent"]) + 1
elif cmd == "join":
if len(parts) < 2:
clean_print(f"{Colors.YELLOW}Current Channel: {current_channel}{Colors.RESET}")
else:
current_channel = parts[1].lower()
await save_config()
clean_print(f"{Colors.GREEN}Joined channel: #{current_channel}{Colors.RESET}")
elif cmd == "leave":
current_channel = "global"
await save_config()
clean_print(f"{Colors.YELLOW}Returned to #global channel.{Colors.RESET}")
elif cmd == "last":
n = int(cast(Any, parts)[1]) if len(cast(Any, parts)) > 1 else 10
try:
log_dir = Path("logs")
if not log_dir.exists():
clean_print(f"{Colors.RED}No logs found.{Colors.RESET}")
continue
files = sorted(log_dir.glob("chat_*.txt"), key=os.path.getmtime, reverse=True)
if not files:
clean_print(f"{Colors.RED}No log files found.{Colors.RESET}")
continue
lines = files[0].read_text().splitlines()
clean_print(f"\n{Colors.CYAN}--- Last {n} messages ---")
for line in cast(Any, lines)[-n:]:
clean_print(line)
clean_print(f"------------------------{Colors.RESET}")
except Exception as e:
clean_print(f"{Colors.RED}Error reading log: {e}{Colors.RESET}")
elif cmd == "peers":
print(f"\n{Colors.CYAN}--- Approved Peers Connectivity ---")
for ip in approved_ips:
health = peer_health.get(ip, {"online": False})
nick = nicknames_cache.get(ip, "Unknown")
status_icon = f"{Colors.GREEN}● ONLINE{Colors.RESET}" if health["online"] else f"{Colors.RED}○ OFFLINE{Colors.RESET}"
lat_str = f"({health.get('latency', 0):.1f}ms)" if "latency" in health else ""
chan = radar_peers.get(ip, {}).get("channel", "global")
print(f" {status_icon} {Colors.BOLD}{nick}@{ip}{Colors.RESET} | #{chan} {lat_str}")
if not approved_ips: print(" [No approved peers]")
print("-" * 40 + Colors.RESET)
elif cmd == "status":
if not cast(Any, parts)[1:]:
clean_print(f"{Colors.CYAN}Current Status: {user_status}{Colors.RESET}")
else:
user_status = " ".join(cast(Any, parts)[1:])
await save_config()
clean_print(f"{Colors.GREEN}Status updated to: {user_status}{Colors.RESET}")
elif cmd == "clear":
os.system('cls' if os.name == 'nt' else 'clear')
print("-" * 60)
print(f" CLRADIO UNIFIED v2.3 | {nickname}")
print("-" * 60)
elif cmd == "ping":
t_ip = cast(Any, parts)[1] if len(cast(Any, parts)) > 1 else target_ip
if not t_ip:
clean_print(f"{Colors.RED}Specify an IP or /connect first.{Colors.RESET}")
continue
p = radar_peers.get(t_ip, {}).get("port", 443)
payload = {"type": "ping", "t": time.time(), "nick": nickname, "port": listening_port}
await send_payload(t_ip, p, payload)
clean_print(f"{Colors.YELLOW}Pinging {t_ip}...{Colors.RESET}")
elif cmd == "panic":
target_ip = ""
approved_ips.clear()
os.system('cls' if os.name == 'nt' else 'clear')
clean_print(f"{Colors.RED}[!!!] PANIC: All targets cleared and screen wiped.{Colors.RESET}")
elif cmd == "nick":
if len(parts) < 2:
clean_print(f"{Colors.YELLOW}Usage: /nick <new_name>{Colors.RESET}")
continue
nickname = cast(Any, parts)[1]
clean_print(f"{Colors.GREEN}Nickname set to {nickname}{Colors.RESET}")
elif cmd == "help":
print(f"\n{Colors.CYAN}--- CLRadio Commands ---")
print(f" /help - Show this help menu")
print(f" /connect <ip> [port] - Connect to a peer")
print(f" /shout <msg> - Send message to peers in current channel")
print(f" /join <channel> - Switch to a virtual channel")
print(f" /leave - Return to #global channel")
print(f" /disconnect - Clear current target")
print(f" /status [msg] - Set or view your status message")
print(f" /radar - List discovered peers on local network")
print(f" /peers - Dashboard of all approved contacts (Online/Offline)")
print(f" /last [n] - Recover last N messages from log")
print(f" /settings - View and change settings")
print(f" /ping [ip] - Measure latency to a peer")
print(f" /accept <ip> - Approve a connection request")
print(f" /block <ip> - Block an IP address")
print(f" /panic - Emergency wipe of screen and targets")
print(f" /whoami - Show your info")
print(f" /exit - Quit CLRadio")
print(f"------------------------{Colors.RESET}")
elif cmd == "settings":
if len(parts) < 2:
print(f"\n{Colors.CYAN}--- Settings ---")
print(f" 1. auto_accept : {Colors.BOLD}{'on' if auto_accept else 'off'}{Colors.RESET}")
print(f" 2. logging : {Colors.BOLD}{'on' if logging_enabled else 'off'}{Colors.RESET}")
print(f" 3. compression : {Colors.BOLD}{'on' if compression_enabled else 'off'}{Colors.RESET}")
print(f" 4. timestamps : {Colors.BOLD}{'on' if show_timestamps else 'off'}{Colors.RESET}")
print(f" 5. relay : {Colors.BOLD}{'on' if relay_enabled else 'off'}{Colors.RESET}")
print(f" 6. stealth : {Colors.BOLD}{'on' if stealth_enabled else 'off'}{Colors.RESET}")
print(f" 7. vault : {Colors.BOLD}{'on' if vault_enabled else 'off'}{Colors.RESET}")
print(f"-----------------")
print(f" Usage: /settings <key> <on|off>{Colors.RESET}")
continue
if len(parts) < 3:
clean_print(f"{Colors.YELLOW}Usage: /settings <key> <on|off>{Colors.RESET}")
continue
key = parts[1].lower()
val = parts[2].lower() in ['on', 'true', 'yes', '1']
if key == "auto_accept": auto_accept = val
elif key == "logging":
logging_enabled = val
if not val and log_file:
log_file.close()
log_file = None
elif key == "compression": compression_enabled = val
elif key == "timestamps": show_timestamps = val
elif key == "relay": relay_enabled = val
elif key == "stealth": stealth_enabled = val
elif key == "vault": vault_enabled = val
else:
clean_print(f"{Colors.RED}Unknown setting: {key}{Colors.RESET}")
continue
save_config_task = asyncio.create_task(save_config())
clean_print(f"{Colors.GREEN}Setting '{key}' updated to {'on' if val else 'off'}{Colors.RESET}")
elif cmd == "accept":
if len(parts) < 2:
# Try to accept the most recent if only one pending
if len(pending_reqs) == 1:
target = list(pending_reqs.keys())[0]
else:
clean_print(f"{Colors.RED}Usage: /accept <ip>{Colors.RESET}")
continue
else:
target = parts[1]
if target in pending_reqs or auto_accept:
approved_ips.add(target)
nick_target = pending_reqs.pop(target, "Unknown")
peer_info = radar_peers.get(target, {})
target_port_actual = peer_info.get("port", listening_port)
# Automatically connect to them!
target_ip = target
target_port = target_port_actual
await save_config() # Persist approval
resp = {"type": "conn_response", "accepted": True, "nick": nickname, "port": listening_port}
await send_payload(target, target_port_actual, resp)
clean_print(f"{Colors.GREEN}[+] Approved and connected to {nick_target}@{target}{Colors.RESET}")
else:
clean_print(f"{Colors.YELLOW}No pending request from {target}{Colors.RESET}")
elif cmd == "reject":
if len(parts) < 2:
clean_print(f"{Colors.RED}Usage: /reject <ip>{Colors.RESET}")
continue
target = parts[1]
if target in pending_reqs:
pending_reqs.pop(target, None)
peer_info = radar_peers.get(target, {})
target_port_actual = peer_info.get("port", listening_port)
resp = {"type": "conn_response", "accepted": False, "nick": nickname, "port": listening_port}
await send_payload(target, target_port_actual, resp)
clean_print(f"{Colors.RED}[-] Rejected connection from {target}{Colors.RESET}")
elif cmd == "block":
if len(parts) < 2:
clean_print(f"{Colors.RED}Usage: /block <ip>{Colors.RESET}")
continue
ip_to_block = cast(List, parts)[1]
nick_to_block = nicknames_cache.get(ip_to_block, "Unknown")
blocked_ips[ip_to_block] = nick_to_block
approved_ips.discard(ip_to_block)
pending_reqs.pop(ip_to_block, None)
if target_ip == ip_to_block: target_ip = ""
await save_config()
clean_print(f"{Colors.RED}[!] Blocked {nick_to_block}@{ip_to_block}{Colors.RESET}")
elif cmd == "unblock":
if len(parts) < 2:
clean_print(f"{Colors.RED}Usage: /unblock <ip>{Colors.RESET}")
continue
ip_to_unblock = cast(List, parts)[1]
if ip_to_unblock in blocked_ips:
n = blocked_ips.pop(ip_to_unblock)
await save_config()
clean_print(f"{Colors.GREEN}[+] Unblocked {n}@{ip_to_unblock}{Colors.RESET}")
else:
clean_print(f"{Colors.YELLOW}IP {ip_to_unblock} is not blocked.{Colors.RESET}")
elif cmd == "blocked":
print(f"\n{Colors.RED}--- Blocked List ---")
for ip, nick in blocked_ips.items():
print(f" {nick} @ {ip}")
if not blocked_ips: print(" [Empty]")
print("-" * 20 + Colors.RESET)
elif cmd == "mode":
if len(parts) < 2:
clean_print(f"{Colors.YELLOW}Usage: /mode plain|shared|e2ee{Colors.RESET}")
continue
success, msg = security.set_mode(cast(List, parts)[1].lower())
color = Colors.GREEN if success else Colors.RED
clean_print(f"{color}{msg}{Colors.RESET}")
if success and security.mode != MODE_PLAIN and target_ip:
# Automatically initiate handshake with current target
await initiate_handshake(target_ip, target_port)
elif cmd == "crypt":
if len(parts) < 2:
clean_print(f"{Colors.RED}Usage: /crypt <passphrase>{Colors.RESET}")
else:
security.set_passphrase(cast(List, parts)[1])
clean_print(f"{Colors.GREEN}Shared Passphrase Set.{Colors.RESET}")
elif cmd == "connect":
if len(parts) >= 2:
target_addr = parts[1]
# Alias lookup
if target_addr in aliases:
target_addr = aliases[target_addr]
target_ip = target_addr
target_port = int(parts[2]) if len(parts) > 2 else 443
clean_print(f"{Colors.GREEN}Connecting to {target_ip}...{Colors.RESET}")
await initiate_handshake(target_ip, target_port)
else:
clean_print(f"{Colors.RED}Usage: /connect <ip>{Colors.RESET}")
elif cmd == "radar":
print(f"\n{Colors.CYAN}--- Local Radar ({len(radar_peers)} active) ---")
if stealth_enabled:
print(f" {Colors.YELLOW}[STEALTH ACTIVE] You are invisible.{Colors.RESET}")
for ip, info in radar_peers.items():
# Skip self and stale entries
if ip in local_ips: continue
delta = (datetime.now() - info['last_seen']).total_seconds()
if delta > 60: continue
nick = info['nick']
stat = f" - {Colors.YELLOW}{info['status']}{Colors.RESET}" if info['status'] else ""
chan = f" #{info['channel']}"
print(f" {Colors.BOLD}{nick}{Colors.RESET}@{ip}:{info['port']}{chan}{stat}")
print("-" * 40 + Colors.RESET)
elif cmd == "sendfile":
if not target_ip:
clean_print(f"{Colors.RED}Set target first.{Colors.RESET}")
continue
path_str = cast(Any, parts)[1]
# Sanitize path by taking only basename
safe_filename = os.path.basename(path_str)
if not safe_filename:
clean_print(f"{Colors.RED}Invalid file path.{Colors.RESET}")
continue
safe_path = Path.cwd() / safe_filename
if not safe_path.exists():
safe_path = Path("downloads") / safe_filename
if not safe_path.exists() or not safe_path.is_file():
clean_print(f"{Colors.RED}File not found or inaccessible: {path_str}{Colors.RESET}")
continue
try:
file_bytes = safe_path.read_bytes()
file_hash = hashlib.sha256(file_bytes).hexdigest()
filesize = len(file_bytes)
chunk_size = 32 * 1024
total_chunks = (filesize + chunk_size - 1) // chunk_size
idx = 0
with open(safe_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk: break
encrypted = security.mode != MODE_PLAIN
if encrypted:
chunk = security.encrypt(chunk, target_ip)
payload = {
"type": "file_chunk",
"filename": safe_path.name,
"nick": nickname,
"data": base64.b64encode(chunk).decode(),
"encrypted": encrypted,
"mode": security.mode,
"idx": idx,
"total": total_chunks,
"hash": file_hash,
"dst_ip": target_ip
}
# Use discovery port if available, else default to target_port
peer_info = radar_peers.get(target_ip, {})
effective_port = peer_info.get("port", target_port)
if not await send_payload(target_ip, effective_port, payload):
clean_print(f"\n{Colors.RED}[!] Transfer failed at chunk {idx}{Colors.RESET}")
break
idx += 1
if idx % 10 == 0 or idx == total_chunks:
sys.stdout.write(f"\rProgress: {idx*100//total_chunks}%")
sys.stdout.flush()
if idx == total_chunks:
clean_print(f"\n{Colors.GREEN}[OK] File stream complete.{Colors.RESET}")
globals()["messages_sent"] = int(globals()["messages_sent"]) + 1
except Exception as e:
clean_print(f"\n{Colors.RED}Streaming error: {e}{Colors.RESET}")
elif cmd == "alias":
if len(parts) < 3:
clean_print(f"{Colors.RED}Usage: /alias <name> <ip>{Colors.RESET}")
else:
aliases[parts[1]] = parts[2]
asyncio.create_task(save_config())
clean_print(f"{Colors.GREEN}Alias '{parts[1]}' assigned to {parts[2]}.{Colors.RESET}")
elif cmd == "unalias":
if len(parts) < 2:
clean_print(f"{Colors.RED}Usage: /unalias <name>{Colors.RESET}")
elif parts[1] in aliases:
aliases.pop(parts[1], None)
asyncio.create_task(save_config())
clean_print(f"{Colors.YELLOW}Alias '{parts[1]}' removed.{Colors.RESET}")
elif cmd == "aliases":
clean_print(f"\n{Colors.CYAN}--- Phonebook (Aliases) ---")
for a, i in aliases.items():
clean_print(f" {a} -> {i}")
if not aliases: clean_print(" [Empty]")
clean_print("---------------------------")
elif cmd == "stealth":
stealth_enabled = not stealth_enabled
asyncio.create_task(save_config())
clean_print(f"{Colors.CYAN}Stealth Mode: {'ON - Invisible' if stealth_enabled else 'OFF - Visible'}{Colors.RESET}")
elif cmd == "relay":
relay_enabled = not relay_enabled
asyncio.create_task(save_config())
clean_print(f"{Colors.CYAN}Relay (Mesh) Mode: {'ON' if relay_enabled else 'OFF'}{Colors.RESET}")
continue
# Regular Message
if not target_ip:
clean_print(f"{Colors.YELLOW}Use /connect <ip> first.{Colors.RESET}")
continue
try:
msg_bytes = text.encode()
encrypted = security.mode != MODE_PLAIN
if encrypted:
# This will raise an exception if no session key exists
ciphertext = security.encrypt(msg_bytes, target_ip)
body = base64.urlsafe_b64encode(ciphertext).decode()
else:
body = text
payload = {
"type": "text",
"body": body,
"nick": nickname,
"encrypted": encrypted,
"mode": security.mode,
"mid": secrets.token_hex(8),
"dst_ip": target_ip
}
if await send_payload(target_ip, target_port, payload):
globals()["messages_sent"] = int(cast(Any, globals().get("messages_sent", 0))) + 1
lock = "🔒 " if encrypted else ""
output = f"{Colors.GREEN} >> {lock}{text}{Colors.RESET}"
message_log.append(output)
clean_print(output)
else:
clean_print(f"{Colors.RED}Failed to send to {target_ip}{Colors.RESET}")
except ConnectionError as e:
clean_print(f"{Colors.RED}[ERROR] {e}{Colors.RESET}")
except Exception as e:
clean_print(f"{Colors.RED}[ERROR] {e}{Colors.RESET}")
except (EOFError, KeyboardInterrupt): break
print(f"\n{Colors.YELLOW}Bye!{Colors.RESET}")
if __name__ == "__main__":
try:
if os.name == 'nt' and hasattr(sys.stdout, 'reconfigure'):
cast(Any, sys.stdout).reconfigure(encoding='utf-8')
asyncio.run(main_loop())
except KeyboardInterrupt: pass