Files
Command-Line-Radio/CLRadio-Hybrid-A-1-iii-Integra.py

255 lines
8.9 KiB
Python

import os
import socket
import threading
import time
import sys
from pathlib import Path
from rich.console import Console
from rich.layout import Layout
from rich.panel import Panel
from prompt_toolkit import Application
from prompt_toolkit.layout import Layout as PTLayout, Window
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.formatted_text import ANSI
from prompt_toolkit.layout.dimension import LayoutDimension
from prompt_toolkit.layout.containers import Window
# ----- APP STATE & LOGIC ----- #
class AppState:
def __init__(self):
self.left_input = ""
self.right_input = ""
self.active_side = "left"
self.logs = ["[bold blue]P7-InfraNET CLRadio-Hybrid A-1-iv Integra[/]"]
self.running = True
self.target_socket = None
self.target_ip = None
self.is_sudo = os.getuid() == 0 if os.name != 'nt' else True
self.cursor_blink = True
def add_log(self, msg):
self.logs.append(msg)
if len(self.logs) > 100: self.logs.pop(0)
state = AppState()
console = Console(force_terminal=True, color_system="truecolor", soft_wrap=True)
FILE_PATH = Path("known_connections.txt")
# ----- FILE LOGIC ----- #
def get_profiles():
if not FILE_PATH.exists(): return []
profiles = []
with open(FILE_PATH, 'r') as f:
for line in f:
if ": " in line:
profiles.append(line.strip())
return profiles
def write_profile(nick, ip):
with open(FILE_PATH, 'a') as f:
f.write(f"{nick}: {ip}\n")
# ----- NETWORK THREADS ----- #
def background_receiver(app):
host = '0.0.0.0'
port = 443
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
s.bind((host, port))
s.listen(5)
state.add_log("[system] Listening on port 443...")
except PermissionError:
state.add_log("[bold red][!] ERROR: Process Needs Elevation to listen on 443.[/]")
return
while state.running:
try:
s.settimeout(1.0)
c, addr = s.accept()
state.add_log(f"[bold green][+] Connection from {addr[0]}[/]")
while state.running:
data = c.recv(1024).decode()
if not data: break
state.add_log(f"[bold yellow][R] {addr[0]}:[/] {data}")
app.invalidate()
c.close()
except socket.timeout:
continue
except Exception as e:
state.add_log(f"[red]Receiver Error: {e}[/]")
# ----- COMMANDS ----- #
def handle_command(cmd_str):
parts = cmd_str.split()
if not parts: return
cmd = parts[0].lower()
if cmd == "/help":
state.add_log("[bold cyan]COMMANDS:[/]\n/connect <n> - Connect to nth profile\n/profiles add <nick> <ip>\n/profiles clear\n/listen - Status check\n/help - This menu")
elif cmd == "/listen":
state.add_log("[system] Radio is already actively listening on 443.")
elif cmd == "/profiles":
if len(parts) > 1:
mode = parts[1].lower()
if mode == "add" and len(parts) == 4:
write_profile(parts[2], parts[3])
state.add_log(f"[system] Added {parts[2]}")
elif mode == "clear":
with open(FILE_PATH, 'w') as f: f.write("")
state.add_log("[system] Profiles cleared.")
else:
state.add_log("[system] Usage: /profiles add <nick> <ip> or /profiles clear")
elif cmd == "/connect":
profiles = get_profiles()
if len(parts) == 1:
state.add_log("[bold cyan]PROFILES:[/]")
for i, p in enumerate(profiles, 1):
state.add_log(f" [{i}] {p}")
else:
try:
idx = int(parts[1]) - 1
target = profiles[idx].split(": ")[1]
state.target_ip = target
# Attempt connection
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3)
s.connect((target, 443))
state.target_socket = s
state.add_log(f"[bold green][!] Connected to {target}.[/]")
except Exception as e:
state.add_log(f"[bold red][!] Connection failed: {e}[/]")
# ----- UI RENDERING ----- #
def generate_rich_layout():
# Use a safe fallback if size can't be determined
try:
size = os.get_terminal_size()
# We subtract 2 lines: one for the potential status line and one for a safety buffer
term_width = size.columns
term_height = size.lines - 2
except OSError:
term_width, term_height = 80, 24
# Calculate logs based on the actual height of the panels
# The 'size=3' panels and borders take up about 7-8 lines
visible_log_lines = max(1, term_height - 8)
layout = Layout()
layout.split_row(Layout(name="left", ratio=3), Layout(name="right", ratio=1))
layout["left"].split_column(Layout(name="left_output", ratio=9), Layout(name="left_input", size=3))
layout["right"].split_column(Layout(name="right_output", ratio=9), Layout(name="right_input", size=3))
# Input Logic
cursor = "" if state.cursor_blink else " "
l_input = state.left_input + (cursor if state.active_side == "left" else "")
r_input = state.right_input + (cursor if state.active_side == "right" else "")
# Content
log_content = "\n".join(state.logs[-visible_log_lines:])
layout["left_output"].update(Panel(log_content, title="Live Console", border_style="blue"))
status_text = f"Focus: [bold yellow]{state.active_side.upper()}[/]\n\n"
if not state.is_sudo:
status_text += "[bold red]NON-SUDO MODE[/]\n"
status_text += "\n[bold cyan]Quick Commands:[/]\n/connect\n/profiles\n/help"
layout["right_output"].update(Panel(status_text, title="Status/Help", border_style="green"))
layout["left_input"].update(Panel(l_input, title="Chat", border_style="cyan" if state.active_side == "left" else "white"))
layout["right_input"].update(Panel(r_input, title="Cmd", border_style="cyan" if state.active_side == "right" else "white"))
with console.capture() as capture:
# Render slightly shorter than the terminal to prevent the 'jump'
console.print(layout, width=term_width, height=term_height)
# The \033[H tells the terminal "start drawing at 0,0"
return ANSI("\033[H" + capture.get().strip())
# ----- INPUT HANDLING ----- #
kb = KeyBindings()
@kb.add("tab")
def _(event):
state.active_side = "right" if state.active_side == "left" else "left"
@kb.add("c-q")
def _(event):
state.running = False
if state.target_socket: state.target_socket.close()
event.app.exit()
@kb.add("enter")
def _(event):
if state.active_side == "left":
if state.left_input and state.target_socket:
try:
state.target_socket.send(state.left_input.encode())
state.add_log(f"[bold white][S]:[/] {state.left_input}")
except:
state.add_log("[bold red][!] FATAL: Connection lost![/]")
state.target_socket = None
elif not state.target_socket:
state.add_log("[red]No active connection. Use /connect in Cmd panel.[/]")
state.left_input = ""
else:
if state.right_input.startswith("/"):
handle_command(state.right_input)
state.right_input = ""
@kb.add("backspace")
def _(event):
if state.active_side == "left": state.left_input = state.left_input[:-1]
else: state.right_input = state.right_input[:-1]
@kb.add("<any>")
def _(event):
if state.active_side == "left": state.left_input += event.data
else: state.right_input += event.data
# ----- MAIN LOOP ----- #
def blink_thread(app):
while state.running:
state.cursor_blink = not state.cursor_blink
app.invalidate()
time.sleep(0.5)
if __name__ == "__main__":
# 1. Create the window with specific flags to stop the "glitch"
# we tell it NOT to extend height and to hide the system cursor
content_window = Window(
content=FormattedTextControl(text=generate_rich_layout),
# This is the "Magic" setting to stop the jumping
allow_scroll_beyond_bottom=False,
always_hide_cursor=True,
wrap_lines=False
)
# 2. Setup the application with full_screen and NO mouse support
# (Mouse support is often what triggers the "scroll glitch" in Konsole/KDE)
app = Application(
layout=PTLayout(content_window),
key_bindings=kb,
full_screen=True,
mouse_support=False
)
# 3. Start your background threads
threading.Thread(target=background_receiver, args=(app,), daemon=True).start()
threading.Thread(target=blink_thread, args=(app,), daemon=True).start()
if not state.is_sudo:
state.add_log("[bold red][!] WARNING: Not running as sudo. Receiver will fail to bind port 443.[/]")
# 4. Run it!
app.run()