Files
Operation-Shadow-Watcher/Operation Shadow Watcher/batch.py
P7MJ 5ac587ddf1 Upload files to "Operation Shadow Watcher"
New version applying error fixes (old version could not work).
2026-02-02 08:13:05 -05:00

158 lines
4.6 KiB
Python

import time
import os
import cv2
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from plyer import notification
from datetime import datetime
from threading import Thread, Lock
# ---------------- CONFIG ----------------
WATCH_DIR = r"C:\Program Files\Renato Software\Senso.Cloud.Client"
LOG_FILE = "file_changes_log.txt"
BATCH_WINDOW = 10 # Seconds between notification flushes
LATEST_COUNT = 3 # How many recent changes to show if no priority items
MAX_PRIORITY_SHOWN = 10 # Max priority items shown per notification
MAX_NOTIFY_LENGTH = 256 # Windows balloon tip max length
# 🔥 Priority keywords (case-insensitive)
PRIORITY_WORDS = ["MOD", "REMOTECONTROL", "PEER2PEER", "REMOTESCREEN"]
# ---------------- STATE ----------------
change_buffer = []
buffer_lock = Lock()
camera_lock = Lock()
# ---------------- CAMERA ----------------
def blink_camera_light(blink_time=0.7):
try:
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if cap.isOpened():
ret, _ = cap.read()
if ret:
time.sleep(blink_time)
cap.release()
cv2.destroyAllWindows()
except Exception as e:
print(f"[❌] Camera blink failed: {e}")
def async_camera_blink():
def _blink():
with camera_lock: # prevents webcam thread flooding
blink_camera_light()
Thread(target=_blink, daemon=True).start()
# ---------------- LOGGING ----------------
def log_change(action, filepath):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {action.upper()}: {filepath}\n"
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(entry)
# ---------------- PRIORITY ----------------
def priority_score(entry: str) -> int:
upper = entry.upper()
return sum(1 for word in PRIORITY_WORDS if word in upper)
# ---------------- NOTIFICATION FLUSH ----------------
def flush_notifications():
while True:
time.sleep(BATCH_WINDOW)
with buffer_lock:
if not change_buffer:
continue
scored = [(priority_score(c), i, c) for i, c in enumerate(change_buffer)]
priority_items = [item for item in scored if item[0] > 0]
if priority_items:
# sort by descending priority then original order
priority_items.sort(key=lambda x: (-x[0], x[1]))
shown = [c for _, _, c in priority_items[:MAX_PRIORITY_SHOWN]]
omitted = len(priority_items) > MAX_PRIORITY_SHOWN
else:
shown = change_buffer[-LATEST_COUNT:]
omitted = len(change_buffer) > len(shown)
change_buffer.clear()
message = "\n".join(shown)
if omitted:
message += "\n..."
# 🔹 Truncate message safely for Windows balloon notifications
if len(message) > MAX_NOTIFY_LENGTH:
message = message[:MAX_NOTIFY_LENGTH - 3] + "..."
notification.notify(
title="File Changes Detected",
message=message,
timeout=5
)
print("🔔 Notification sent:")
print(message)
# ---------------- FILE WATCHER ----------------
class ChangeHandler(FileSystemEventHandler):
def on_any_event(self, event):
action = event.event_type.capitalize()
filepath = event.src_path
filename = os.path.basename(filepath)
# Write log immediately
log_change(action, filepath)
# Flash camera immediately (async, safe)
async_camera_blink()
entry = f"{action}: {filename}"
with buffer_lock:
change_buffer.append(entry)
print(f"[{action}] {filepath}")
# ---------------- MAIN ----------------
if __name__ == "__main__":
print(f"👀 Monitoring: {WATCH_DIR}")
print(f"📝 Logging to: {os.path.abspath(LOG_FILE)}")
print(f"🔥 Priority words: {', '.join(PRIORITY_WORDS)}")
# Ensure log file exists
if not os.path.exists(LOG_FILE):
with open(LOG_FILE, "w", encoding="utf-8") as f:
f.write("=== File Change Log ===\n")
observer = Observer()
observer.schedule(ChangeHandler(), WATCH_DIR, recursive=True)
observer.start()
# Start fixed-interval notification flusher
Thread(target=flush_notifications, daemon=True).start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()