Uploaded to ext
This commit is contained in:
1
ext/cat1_enc.p7c_enc
Normal file
1
ext/cat1_enc.p7c_enc
Normal file
File diff suppressed because one or more lines are too long
268
ext/fdea-1.py
Normal file
268
ext/fdea-1.py
Normal file
@@ -0,0 +1,268 @@
|
|||||||
|
# FDEA-1, MKOSI ISO Edition
|
||||||
|
# Full Disk Encryption Application for BUGPy-mOS.
|
||||||
|
|
||||||
|
import shutil
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import getpass
|
||||||
|
import sys
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||||
|
from pathlib import Path
|
||||||
|
import base64
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
drive = "cat1"
|
||||||
|
swap_file = "used.swp"
|
||||||
|
|
||||||
|
_session_password = None
|
||||||
|
_is_securing = False # Global lock flag to block signal re-entry re-runs
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Output helper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def print_green(*args, sep=" ", end="\n", flush=False):
|
||||||
|
message = sep.join(str(a) for a in args)
|
||||||
|
print(f"\033[92m{message}\033[0m", end=end, flush=flush)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# SIGTERM / SIGINT handler with Re-entry Isolation
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _emergency_encrypt(signum, frame):
|
||||||
|
global _session_password, _is_securing
|
||||||
|
|
||||||
|
# Critical Block: If we are already zipping/encrypting, ignore duplicate signals
|
||||||
|
if _is_securing:
|
||||||
|
return
|
||||||
|
_is_securing = True
|
||||||
|
|
||||||
|
sig_name = "SIGTERM" if signum == signal.SIGTERM else "SIGINT"
|
||||||
|
print(f"\nfdea: {sig_name} received — securing data before exit...")
|
||||||
|
|
||||||
|
if _session_password and os.path.isdir(drive):
|
||||||
|
try:
|
||||||
|
# Mask signals during emergency mitigation to ensure absolute atomicity
|
||||||
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||||
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
|
|
||||||
|
zip_dir(f"{drive}_enc", "zip", drive)
|
||||||
|
encrypt_data(f"{drive}_enc.zip", f"{drive}_enc", _session_password, if_delete_original=True)
|
||||||
|
|
||||||
|
if os.path.exists(swap_file):
|
||||||
|
os.remove(swap_file)
|
||||||
|
print("fdea: data secured.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"fdea: CRITICAL WARNING — could not re-encrypt: {e}")
|
||||||
|
print("fdea: cat1/ may still be on disk unencrypted.")
|
||||||
|
elif os.path.isdir(drive):
|
||||||
|
print("fdea: WARNING — cat1/ is on disk unencrypted and no password is stored.")
|
||||||
|
print("fdea: Run FDEA manually and choose (e)ncrypt to secure it.")
|
||||||
|
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Register Emergency hooks
|
||||||
|
signal.signal(signal.SIGTERM, _emergency_encrypt)
|
||||||
|
signal.signal(signal.SIGINT, _emergency_encrypt)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Encryption / decryption
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def encrypt_data(zip_filename_w_extension, encrypted_name_without_extension, password_string, if_delete_original=True):
|
||||||
|
file_name = Path(zip_filename_w_extension)
|
||||||
|
if not file_name.is_file():
|
||||||
|
print("fdea: zip file does not exist.")
|
||||||
|
return
|
||||||
|
|
||||||
|
password = password_string.encode()
|
||||||
|
salt = os.urandom(16)
|
||||||
|
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=salt,
|
||||||
|
iterations=480000,
|
||||||
|
)
|
||||||
|
key = base64.urlsafe_b64encode(kdf.derive(password))
|
||||||
|
f = Fernet(key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
original_data = file_name.read_bytes()
|
||||||
|
encrypted_data = f.encrypt(original_data)
|
||||||
|
|
||||||
|
target_file = Path(f"{encrypted_name_without_extension}.p7c_enc")
|
||||||
|
|
||||||
|
# Write atomically using a temporary file step to prevent partial corruptions
|
||||||
|
tmp_target = target_file.with_suffix('.tmp')
|
||||||
|
with open(tmp_target, 'wb') as fh:
|
||||||
|
fh.write(salt)
|
||||||
|
fh.write(encrypted_data)
|
||||||
|
tmp_target.rename(target_file)
|
||||||
|
|
||||||
|
if if_delete_original:
|
||||||
|
file_name.unlink(missing_ok=True)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"fdea: error during encryption steps: {e}")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt_data(p7c_enc_filename, password_string, output_zip_name, if_remove_file=True):
|
||||||
|
p7c_path = Path(p7c_enc_filename)
|
||||||
|
if not p7c_path.is_file():
|
||||||
|
return 1 # File not found
|
||||||
|
|
||||||
|
password = password_string.encode()
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(p7c_path, 'rb') as fh:
|
||||||
|
file_salt = fh.read(16)
|
||||||
|
encrypted_data = fh.read()
|
||||||
|
|
||||||
|
kdf = PBKDF2HMAC(
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
length=32,
|
||||||
|
salt=file_salt,
|
||||||
|
iterations=480000,
|
||||||
|
)
|
||||||
|
key = base64.urlsafe_b64encode(kdf.derive(password))
|
||||||
|
f = Fernet(key)
|
||||||
|
|
||||||
|
decrypted_data = f.decrypt(encrypted_data)
|
||||||
|
|
||||||
|
Path(output_zip_name).write_bytes(decrypted_data)
|
||||||
|
|
||||||
|
if if_remove_file:
|
||||||
|
p7c_path.unlink(missing_ok=True)
|
||||||
|
return 0
|
||||||
|
except Exception:
|
||||||
|
return 2 # Bad password or internal corruption handler
|
||||||
|
|
||||||
|
|
||||||
|
def zip_dir(output_file_no_extension, extension, dir_name):
|
||||||
|
if not os.path.isdir(dir_name):
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
shutil.make_archive(
|
||||||
|
base_name=output_file_no_extension,
|
||||||
|
format=extension,
|
||||||
|
root_dir=dir_name,
|
||||||
|
)
|
||||||
|
# Only wipe the directory out if the zip completed without an error
|
||||||
|
shutil.rmtree(dir_name)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"fdea: Archiving safety failure: {e}")
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
|
def unzip_dir(filename_with_extension, output_folder, zipped_file_extension):
|
||||||
|
shutil.unpack_archive(
|
||||||
|
filename=filename_with_extension,
|
||||||
|
extract_dir=output_folder,
|
||||||
|
format=zipped_file_extension,
|
||||||
|
)
|
||||||
|
Path(filename_with_extension).unlink(missing_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def launch_bugpy():
|
||||||
|
target = os.path.join(drive, "bugpy-mos-1.py")
|
||||||
|
if os.path.exists(target):
|
||||||
|
print("fdea: launching bugpy...")
|
||||||
|
try:
|
||||||
|
subprocess.run([sys.executable, "bugpy-mos-1.py"], cwd=drive, check=True)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"fdea: session ended: {e}")
|
||||||
|
else:
|
||||||
|
print("fdea: critical error: bugpy-mos-1.py not found.")
|
||||||
|
|
||||||
|
|
||||||
|
def finalize_session(password):
|
||||||
|
global _is_securing
|
||||||
|
_is_securing = True # Inform signals to stay clear during routine execution cleanup
|
||||||
|
|
||||||
|
print("fdea: re-encrypting data...")
|
||||||
|
try:
|
||||||
|
zip_dir(f"{drive}_enc", "zip", drive)
|
||||||
|
encrypt_data(f"{drive}_enc.zip", f"{drive}_enc", password, if_delete_original=True)
|
||||||
|
if os.path.exists(swap_file):
|
||||||
|
os.remove(swap_file)
|
||||||
|
print("fdea: session secured. Done.")
|
||||||
|
finally:
|
||||||
|
_is_securing = False
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Main loop
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print_green(f"FDEA-1 for use with BUGPy-mOS-1 \"Expansion\", adapted for MKOSI use")
|
||||||
|
|
||||||
|
# Recovery Checkpoint
|
||||||
|
if os.path.exists(swap_file):
|
||||||
|
print_green("\nfdea: WARNING: previous session was not closed correctly.")
|
||||||
|
print_green("(e)ncrypt / (r)estore > ", end="", flush=True)
|
||||||
|
choice = input("").lower()
|
||||||
|
|
||||||
|
if choice == 'e':
|
||||||
|
print_green("enter encryption password > ", end="", flush=True)
|
||||||
|
pwd = getpass.getpass("")
|
||||||
|
finalize_session(pwd)
|
||||||
|
sys.exit(0)
|
||||||
|
elif choice == 'r':
|
||||||
|
launch_bugpy()
|
||||||
|
print_green("enter encryption password > ", end="", flush=True)
|
||||||
|
new_pwd = getpass.getpass("")
|
||||||
|
finalize_session(new_pwd)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Standard Entry Menu
|
||||||
|
print_green("[1] Launch BUGPy\n[x] Shut down system\n[e] Exit FDEA\n > ", end="", flush=True)
|
||||||
|
choice = input("")
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
print_green("password > ", end="", flush=True)
|
||||||
|
old_password = getpass.getpass("")
|
||||||
|
|
||||||
|
res = decrypt_data(f"{drive}_enc.p7c_enc", old_password, f"{drive}_deenc.zip", if_remove_file=True)
|
||||||
|
|
||||||
|
if res == 1:
|
||||||
|
print_green("fdea: encrypted file not found.")
|
||||||
|
sys.exit(1)
|
||||||
|
elif res == 2:
|
||||||
|
print_green("fdea: incorrect password.")
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
unzip_dir(f"{drive}_deenc.zip", drive, "zip")
|
||||||
|
Path(swap_file).touch()
|
||||||
|
except Exception as e:
|
||||||
|
print_green(f"fdea: failed to open drive: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Retain validation state
|
||||||
|
_session_password = old_password
|
||||||
|
|
||||||
|
launch_bugpy()
|
||||||
|
|
||||||
|
# Exit cycle transitions
|
||||||
|
print_green("enter new password (ENTER to keep current) > ", end="", flush=True)
|
||||||
|
new_password = getpass.getpass("")
|
||||||
|
if not new_password:
|
||||||
|
new_password = old_password
|
||||||
|
print_green("fdea: keeping current password.")
|
||||||
|
|
||||||
|
# Wipe memory space cleanly before committing compilation write blocks
|
||||||
|
_session_password = None
|
||||||
|
finalize_session(new_password)
|
||||||
|
|
||||||
|
elif choice == "x":
|
||||||
|
os.system("systemctl poweroff")
|
||||||
|
|
||||||
|
elif choice == "e":
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
else:
|
||||||
|
print_green("Invalid choice.")
|
||||||
Reference in New Issue
Block a user