#!/usr/bin/env python3 """ chatter.py — Minimalist Flask chat with secure login, file uploads, previews (including ZIP contents), and auto-clear after 8 minutes of inactivity. Features implemented: - Authentication (register/login) with werkzeug password hashing - Text messages + attachments stored in SQLite (messages.attachment holds filename) - All file types allowed for upload; max upload size ~9 MB (slightly > Discord free) - Attachments saved into uploads/ with timestamp-prefixed safe filenames - "Attachment: , preview" UI (preview opens in new tab) and download link - Preview supports images, video, gif, html (served directly), and zip (lists zip contents; clicking a file inside zip streams/downloads that member file) - Auto-migration to add `attachment` column if missing - Auto-clear: when there's no activity for INACTIVITY_TIMEOUT (8 minutes), all messages and attachment files are deleted - Poll-based client updates (1.5s) - Minimalist grey/monospace styling (keeps bold/italic/size emphasis) """ from flask import ( Flask, g, render_template_string, request, redirect, url_for, session, jsonify, send_from_directory, abort, Response ) import sqlite3 import os import zipfile import io import mimetypes from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from functools import wraps from datetime import datetime, timedelta import threading import time import urllib.parse # ------------------------- # Config # ------------------------- DB_PATH = "chatter.db" UPLOAD_FOLDER = "uploads" os.makedirs(UPLOAD_FOLDER, exist_ok=True) # a little more than Discord free limit (8 MB) — choose 9 MB MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "9")) MAX_CONTENT_LENGTH = MAX_UPLOAD_MB * 1024 * 1024 # inactivity timeout (seconds) — 8 minutes INACTIVITY_TIMEOUT = 8 * 60 # preview extensions for direct preview UI (opens in new tab) PREVIEW_EXTS = {"png", "jpg", "jpeg", "gif", "mp4", "webm", "html"} ZIP_EXT = "zip" app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "dev-secret-key-change-this") app.config["MAX_CONTENT_LENGTH"] = MAX_CONTENT_LENGTH app.config["SESSION_COOKIE_HTTPONLY"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" # app.config['SESSION_COOKIE_SECURE'] = True # enable on TLS # global last activity timestamp (epoch seconds) _last_activity = time.time() _last_activity_lock = threading.Lock() # ------------------------- # Database helpers & migration # ------------------------- def get_db(): db = getattr(g, "_database", None) if db is None: db = g._database = sqlite3.connect(DB_PATH, detect_types=sqlite3.PARSE_DECLTYPES) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def close_connection(exc): db = getattr(g, "_database", None) if db is not None: db.close() def init_db(): db = get_db() cur = db.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, username TEXT NOT NULL, text TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) db.commit() ensure_schema() def ensure_schema(): """ Auto-migrate: if messages table missing 'attachment' column, add it. Called after init_db() and safe to run multiple times. """ db = get_db() cur = db.cursor() cur.execute("PRAGMA table_info(messages)") cols = [r["name"] for r in cur.fetchall()] if "attachment" not in cols: cur.execute("ALTER TABLE messages ADD COLUMN attachment TEXT") db.commit() # ------------------------- # Activity / cleanup # ------------------------- def update_activity(): global _last_activity with _last_activity_lock: _last_activity = time.time() def prune_all_messages_and_files(): """Delete all rows from messages and all files in uploads/ safely.""" db = get_db() cur = db.cursor() cur.execute("SELECT attachment FROM messages WHERE attachment IS NOT NULL") rows = cur.fetchall() for r in rows: fn = r["attachment"] if fn: path = os.path.join(UPLOAD_FOLDER, fn) try: if os.path.exists(path): os.remove(path) except Exception: pass cur.execute("DELETE FROM messages") db.commit() def inactivity_watcher(): """Background thread that watches for inactivity and clears everything after timeout.""" global _last_activity while True: time.sleep(5) with _last_activity_lock: idle = time.time() - _last_activity if idle >= INACTIVITY_TIMEOUT: # perform cleanup in app context try: with app.app_context(): prune_all_messages_and_files() except Exception: pass # reset last_activity to avoid repeated work with _last_activity_lock: _last_activity = time.time() # start watcher thread (daemon) threading.Thread(target=inactivity_watcher, daemon=True).start() # ------------------------- # Auth decorator # ------------------------- def login_required(f): @wraps(f) def wrapper(*args, **kwargs): if "user_id" not in session: return redirect(url_for("login")) return f(*args, **kwargs) return wrapper # ------------------------- # Helpers # ------------------------- def allowed_preview_ext(filename: str) -> bool: fe = filename.rsplit(".", 1)[-1].lower() return fe in PREVIEW_EXTS def is_zip(filename: str) -> bool: return filename.rsplit(".", 1)[-1].lower() == ZIP_EXT def safe_save_file(storage_file): """ Save uploaded FileStorage to uploads/ with a timestamp prefix to keep names unique. Returns saved filename (basename). """ orig = storage_file.filename secured = secure_filename(orig) prefix = datetime.utcnow().strftime("%Y%m%d%H%M%S%f") saved = f"{prefix}_{secured}" path = os.path.join(UPLOAD_FOLDER, saved) storage_file.save(path) return saved # ------------------------- # Routes # ------------------------- @app.route("/") def index(): if "user_id" in session: return redirect(url_for("chat")) return redirect(url_for("login")) @app.route("/register", methods=["GET", "POST"]) def register(): error = None if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" if not username or not password: error = "Provide username and password" else: db = get_db() cur = db.cursor() try: cur.execute( "INSERT INTO users (username,password_hash) VALUES (?,?)", (username, generate_password_hash(password)), ) db.commit() return redirect(url_for("login")) except sqlite3.IntegrityError: error = "Username already taken" return render_template_string(REGISTER_HTML, error=error) @app.route("/login", methods=["GET", "POST"]) def login(): error = None if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" db = get_db() cur = db.cursor() cur.execute("SELECT id, username, password_hash FROM users WHERE username=?", (username,)) row = cur.fetchone() if row and check_password_hash(row["password_hash"], password): session.clear() session["user_id"] = row["id"] session["username"] = row["username"] update_activity() return redirect(url_for("chat")) else: error = "Invalid username or password" return render_template_string(LOGIN_HTML, error=error) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/chat") @login_required def chat(): update_activity() return render_template_string(CHAT_HTML, username=session.get("username"), max_mb=MAX_UPLOAD_MB) @app.route("/api/messages", methods=["GET", "POST"]) @login_required def api_messages(): update_activity() db = get_db() cur = db.cursor() if request.method == "POST": # Accept multipart/form-data (file) or JSON text = "" attachment_saved = None if request.content_type and request.content_type.startswith("multipart/form-data"): text = (request.form.get("text") or "").strip() file = request.files.get("file") if file and file.filename: attachment_saved = safe_save_file(file) else: data = request.get_json(silent=True) or {} text = (data.get("text") or "").strip() if not text and not attachment_saved: return jsonify({"error": "Empty message"}), 400 cur.execute( "INSERT INTO messages (user_id, username, text, attachment, created_at) VALUES (?,?,?,?,?)", (session["user_id"], session["username"], text, attachment_saved, datetime.utcnow()), ) db.commit() return jsonify({"ok": True}), 201 # GET -> return last 100 messages (ascending) cur.execute("SELECT id, username, text, attachment, created_at FROM messages ORDER BY id DESC LIMIT 100") rows = cur.fetchall() msgs = [ { "id": r["id"], "username": r["username"], "text": r["text"], "attachment": r["attachment"], "created_at": r["created_at"], } for r in reversed(rows) ] return jsonify(msgs) @app.route("/uploads/") def uploaded_file(filename): # Download directly as attachment # filename is the saved basename (timestamp_prefix_originalname.ext) return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True) @app.route("/preview/") def preview(filename): """ Preview endpoint: - For images/videos/gifs/html -> serve file directly (Content-Type auto) - For zip -> return an HTML listing of members with links to stream/download: /preview/zip// - For other files -> redirect to download """ safe_name = filename # we expect saved basenames; still be cautious fpath = os.path.join(UPLOAD_FOLDER, safe_name) if not os.path.exists(fpath): abort(404) ext = safe_name.rsplit(".", 1)[-1].lower() if ext in PREVIEW_EXTS and ext != "html": # images/videos/gif/mp4/webm -> serve raw file (browser will open in new tab) return send_from_directory(UPLOAD_FOLDER, safe_name) if ext == "html": # HTML: serve so browser renders it. But to prevent accidental direct script execution, # we will serve it with correct mimetype; it's the user's file. return send_from_directory(UPLOAD_FOLDER, safe_name) if ext == ZIP_EXT: # list zip contents and provide links to preview/download each member try: with zipfile.ZipFile(fpath, "r") as zf: members = zf.namelist() except zipfile.BadZipFile: return "

Bad ZIP file

", 400 # Build a simple listing HTML; links route to /preview/zipfile// rows = [] for m in members: # encode member path for URL (use urllib.parse.quote) enc = urllib.parse.quote(m, safe="") # Provide a link that streams the inner file (served from zip) rows.append(f'
  • {m}
  • ') html = f""" Contents of {safe_name}

    Contents of {safe_name}

      {''.join(rows)}
    """ return html # fallback -> direct download return redirect(url_for("uploaded_file", filename=safe_name)) @app.route("/preview/zipfile//") def preview_zip_member(zipname, member): """ Stream a file inside a zip directly without extracting to disk. Sets Content-Disposition to attachment so the browser offers download (or opens inline if it can). """ zipname_safe = zipname zip_path = os.path.join(UPLOAD_FOLDER, zipname_safe) if not os.path.exists(zip_path): abort(404) # URL-decoding of member path already performed by Flask; member contains decoded path member_name = member try: with zipfile.ZipFile(zip_path, "r") as zf: # ensure the member exists if member_name not in zf.namelist(): # try some normalization (zip can have backslashes on Windows) norm_match = None for nm in zf.namelist(): if nm.replace("\\", "/") == member_name.replace("\\", "/"): norm_match = nm break if norm_match: member_name = norm_match else: abort(404) with zf.open(member_name) as member_file: data = member_file.read() except zipfile.BadZipFile: abort(400, "Bad ZIP file") except KeyError: abort(404) # Guess mime type for the inner file guessed_type, _ = mimetypes.guess_type(member_name) if not guessed_type: guessed_type = "application/octet-stream" # Return as file-like response rv = Response(data, mimetype=guessed_type) # Present as attachment with original inner filename inner_basename = os.path.basename(member_name) rv.headers["Content-Disposition"] = f'attachment; filename="{inner_basename}"' return rv # ------------------------- # Minimal HTML templates # ------------------------- BASE_CSS = """ :root{--bg:#ececec;--card:#f8f8f8;--muted:#666} html,body{height:100%} body{margin:0;background:var(--bg);font-family:"Courier New",Courier,monospace;color:#111} .container{max-width:900px;margin:24px auto;padding:18px;background:var(--card);border-radius:8px;box-shadow:0 6px 18px rgba(0,0,0,0.04)} header{margin-bottom:18px} h1{font-size:28px;margin:0} small{color:var(--muted)} .chat{height:60vh;border:1px dashed #ddd;padding:12px;overflow:auto;background:white} .message{padding:6px 8px;border-bottom:1px dashed #efefef} .username{font-weight:700} .time{font-style:italic;color:var(--muted);font-size:12px} .attachment{font-style:italic;color:var(--muted);font-size:13px;margin-top:6px} .form-row{display:flex;gap:8px;margin-top:12px} input[type=text],input[type=password]{font-family:inherit;padding:8px;border:1px solid #ddd;border-radius:6px;flex:1} button{padding:8px 12px;border-radius:6px;border:0;background:#222;color:white;font-weight:700} .error{color:#a00;margin-top:8px} .note{font-size:14px;color:var(--muted)} .file-input{border:1px solid #ddd;padding:6px;border-radius:6px} @media(max-width:600px){.container{margin:8px;padding:12px}} """ LOGIN_HTML = """ chatter — login

    chatter

    Please login to continue.
    {% if error %}
    {{ error }}
    {% endif %}
    """ REGISTER_HTML = """ chatter — register

    chatter

    Please create an account to continue.
    {% if error %}
    {{ error }}
    {% endif %}
    """ CHAT_HTML = """ chatter

    chatter — chat

    Logged in as {{ username }}
    Files allowed (all types). Size limit: {{ max_mb }} MB. Attachments show: Attachment: filename, preview
    """ @app.context_processor def inject_globals(): return dict(base_css=BASE_CSS, max_mb=MAX_UPLOAD_MB) # ------------------------- # Startup # ------------------------- if __name__ == "__main__": # Initialize DB and ensure schema with app.app_context(): init_db() ensure_schema() # Run app app.run(host="0.0.0.0", port=5000, debug=True)