import time from pathlib import Path import hashlib import os # Setup paths base_path = Path(__file__).parent req_path = base_path / "requests" user_path = base_path / "users" chat_path = base_path / "single_chats" status_path = base_path / "requests" / "status" req_path.mkdir(exist_ok=True) print("Server online. Monitoring requests...") def hash_password(password: str) -> str: # Generate a random 16-byte salt salt = os.urandom(16) # Hash the password using scrypt # n=16384 (CPU/Memory cost), r=8 (Block size), p=1 (Parallelization) password_hash = hashlib.scrypt( password.encode(), salt=salt, n=16384, r=8, p=1 ) # Store the salt and hash together as hex strings so they can be saved in a file return f"{salt.hex()}:{password_hash.hex()}" def verify_password(stored_full_hash: str, provided_password: str) -> bool: try: # Split the stored string back into the salt and the hash salt_hex, original_hash_hex = stored_full_hash.split(":") salt = bytes.fromhex(salt_hex) original_hash = bytes.fromhex(original_hash_hex) # Hash the provided password using the same salt and parameters new_hash = hashlib.scrypt( provided_password.encode(), salt=salt, n=16384, r=8, p=1 ) # Use hmac.compare_digest to prevent "timing attacks" import hmac return hmac.compare_digest(new_hash, original_hash) except Exception: return False while True: # 1. Grab everything in the folder # .iterdir() is efficient for scanning for file_path in req_path.iterdir(): # FAILSAFE 1: Ignore folders or non-txt files (like .DS_Store or .tmp) if not file_path.is_file() or file_path.suffix != ".txt": continue try: # 2. Try to read the content with open(file_path, "r") as f: content = f.read().strip() # FAILSAFE 2: Check if the file is empty or missing colons if not content or ":" not in content: print(f"Invalid or empty data in {file_path.name}, Deleting.") # make failed request file file_path.unlink() continue # 3. Parse the data # Use the '2' limit we discussed to protect passwords with colons parts = content.split(":", 2) # FAILSAFE 3: Check if we have enough parts (Command:User:Pass) if len(parts) != 3: print(f"Corrupted data in {file_path.name}, Deleting.") file_path.rename(status_path / f"fail_{file_path.name}") continue command, user, data = parts # Validity check for one last time if not command.isalnum() or not user.isalnum() or not data.isalnum(): print(f"Invalid parameters in {file_path.name}, Deleting.") file_path.rename(status_path / f"fail_{file_path.name}") continue print(f"Processing {command} for {user}...") if command == "LOGIN": username_path = user_path / f"{user}" # TODO if username_path.is_dir(): with open(username_path / "pass.txt", "r") as f: password = f.readline().strip() # Now password is the entire sequence if verify_password(password, data): file_path.rename(status_path / f"success_{file_path.name}") # move and rename to success_filename.txt in status print(f"{user} login success.") else: file_path.rename(status_path / f"fail_{file_path.name}") print(f"{user} hash did not match.") else: file_path.rename(status_path / f"fail_{file_path.name}") print(f"{user} does not exist.") elif command == "CREATEUSER": new_user_path = user_path / user if new_user_path.exists(): file_path.rename(status_path / f"fail_{file_path.name}") print(f"{user} already exists!") else: try: new_user_path.mkdir(parents=True) with open(new_user_path / "pass.txt", "w") as password: password.write(hash_password(data)) with open(new_user_path / "info.txt", "w") as info: info.write(f"{user}\n\n\nA new user") print(f"{user} created!") file_path.rename(status_path / f"success_{file_path.name}") except Exception as e: print(f"Something went wrong while trying to create {user}: {e}") file_path.rename(status_path / f"fail_{file_path.name}") continue elif file_path.is_file(): file_path.unlink() except Exception as e: # FAILSAFE 4: Catch-all for weird errors (like file being locked) print(f"Error processing {file_path.name}: {e}") continue # 5. Heartbeat time.sleep(0.5)