From bc205eca11b2e5596c55d90ab51df57f0fbd3a6c Mon Sep 17 00:00:00 2001 From: arkohut <39525455+arkohut@users.noreply.github.com> Date: Mon, 2 Sep 2024 11:35:16 +0800 Subject: [PATCH] refactor: split win and macos method in screen record --- screen_recorder/common.py | 173 +++++++++++++++++++++++++++++--------- 1 file changed, 131 insertions(+), 42 deletions(-) diff --git a/screen_recorder/common.py b/screen_recorder/common.py index 8326cd6..99e670a 100644 --- a/screen_recorder/common.py +++ b/screen_recorder/common.py @@ -23,6 +23,7 @@ elif platform.system() == "Darwin": CGSessionCopyCurrentDictionary, ) + def load_screen_sequences(base_dir, date): try: with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f: @@ -30,12 +31,14 @@ def load_screen_sequences(base_dir, date): except FileNotFoundError: return {} + def save_screen_sequences(base_dir, screen_sequences, date): with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f: json.dump(screen_sequences, f) f.flush() os.fsync(f.fileno()) + def load_previous_hashes(base_dir): date = time.strftime("%Y%m%d") hash_file = os.path.join(base_dir, date, ".previous_hashes") @@ -45,6 +48,7 @@ def load_previous_hashes(base_dir): except FileNotFoundError: return {} + def save_previous_hashes(base_dir, previous_hashes): date = time.strftime("%Y%m%d") hash_file = os.path.join(base_dir, date, ".previous_hashes") @@ -52,38 +56,57 @@ def save_previous_hashes(base_dir, previous_hashes): with open(hash_file, "w") as f: json.dump(previous_hashes, f) + +def get_active_window_info_darwin(): + active_app = NSWorkspace.sharedWorkspace().activeApplication() + app_name = active_app["NSApplicationName"] + app_pid = active_app["NSApplicationProcessIdentifier"] + + windows = CGWindowListCopyWindowInfo( + kCGWindowListOptionOnScreenOnly, kCGNullWindowID + ) + for window in windows: + if window["kCGWindowOwnerPID"] == app_pid: + window_title = window.get("kCGWindowName", "") + if window_title: + return app_name, window_title + + return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题 + + +def get_active_window_info_windows(): + try: + window = win32gui.GetForegroundWindow() + _, pid = win32process.GetWindowThreadProcessId(window) + app_name = psutil.Process(pid).name() + window_title = win32gui.GetWindowText(window) + return app_name, window_title + except: + return "", "" + + def get_active_window_info(): if platform.system() == "Darwin": - active_app = NSWorkspace.sharedWorkspace().activeApplication() - app_name = active_app["NSApplicationName"] - app_pid = active_app["NSApplicationProcessIdentifier"] - - windows = CGWindowListCopyWindowInfo( - kCGWindowListOptionOnScreenOnly, kCGNullWindowID - ) - for window in windows: - if window["kCGWindowOwnerPID"] == app_pid: - window_title = window.get("kCGWindowName", "") - if window_title: - return app_name, window_title - - return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题 + return get_active_window_info_darwin() elif platform.system() == "Windows": - try: - window = win32gui.GetForegroundWindow() - _, pid = win32process.GetWindowThreadProcessId(window) - app_name = psutil.Process(pid).name() - window_title = win32gui.GetWindowText(window) - return app_name, window_title - except: - return "", "" + return get_active_window_info_windows() -def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title): + +def take_screenshot_macos( + base_dir, + previous_hashes, + threshold, + screen_sequences, + date, + timestamp, + app_name, + window_title, +): screenshots = [] result = subprocess.check_output(["system_profiler", "SPDisplaysDataType", "-json"]) displays_info = json.loads(result)["SPDisplaysDataType"][0]["spdisplays_ndrvs"] screen_names = {} - + for display_index, display_info in enumerate(displays_info): base_screen_name = display_info["_name"].replace(" ", "_").lower() if base_screen_name in screen_names: @@ -93,17 +116,29 @@ def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences screen_names[base_screen_name] = 1 screen_name = base_screen_name - temp_filename = os.path.join(base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png") - subprocess.run(["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename]) + temp_filename = os.path.join( + base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png" + ) + subprocess.run( + ["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename] + ) with Image.open(temp_filename) as img: img = img.convert("RGB") - webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp") + webp_filename = os.path.join( + base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp" + ) current_hash = str(imagehash.phash(img)) - if (screen_name in previous_hashes and - imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[screen_name]) < threshold): - logging.info(f"Screenshot for {screen_name} is similar to the previous one. Skipping.") + if ( + screen_name in previous_hashes + and imagehash.hex_to_hash(current_hash) + - imagehash.hex_to_hash(previous_hashes[screen_name]) + < threshold + ): + logging.info( + f"Screenshot for {screen_name} is similar to the previous one. Skipping." + ) os.remove(temp_filename) yield screen_name, None, "Skipped (similar to previous)" continue @@ -127,24 +162,54 @@ def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences screenshots.append(webp_filename) yield screen_name, webp_filename, "Saved" -def take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title): + +def take_screenshot_windows( + base_dir, + previous_hashes, + threshold, + screen_sequences, + date, + timestamp, + app_name, + window_title, +): for monitor in get_monitors(): - safe_monitor_name = ''.join(c for c in monitor.name if c.isalnum() or c in ('_', '-')) + safe_monitor_name = "".join( + c for c in monitor.name if c.isalnum() or c in ("_", "-") + ) logging.info(f"Processing monitor: {safe_monitor_name}") - webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp") + webp_filename = os.path.join( + base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp" + ) - img = ImageGrab.grab(bbox=(monitor.x, monitor.y, monitor.x + monitor.width, monitor.y + monitor.height)) + img = ImageGrab.grab( + bbox=( + monitor.x, + monitor.y, + monitor.x + monitor.width, + monitor.y + monitor.height, + ) + ) img = img.convert("RGB") current_hash = str(imagehash.phash(img)) - if safe_monitor_name in previous_hashes and imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[safe_monitor_name]) < threshold: - logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.") + if ( + safe_monitor_name in previous_hashes + and imagehash.hex_to_hash(current_hash) + - imagehash.hex_to_hash(previous_hashes[safe_monitor_name]) + < threshold + ): + logging.info( + f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping." + ) yield safe_monitor_name, None, "Skipped (similar to previous)" continue previous_hashes[safe_monitor_name] = current_hash - screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1 + screen_sequences[safe_monitor_name] = ( + screen_sequences.get(safe_monitor_name, 0) + 1 + ) metadata = { "timestamp": timestamp, @@ -160,18 +225,41 @@ def take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequenc yield safe_monitor_name, webp_filename, "Saved" -def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp): + +def take_screenshot( + base_dir, previous_hashes, threshold, screen_sequences, date, timestamp +): app_name, window_title = get_active_window_info() os.makedirs(os.path.join(base_dir, date), exist_ok=True) worklog_path = os.path.join(base_dir, date, "worklog") with open(worklog_path, "a") as worklog: if platform.system() == "Darwin": - screenshot_generator = take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) + screenshot_generator = take_screenshot_macos( + base_dir, + previous_hashes, + threshold, + screen_sequences, + date, + timestamp, + app_name, + window_title, + ) elif platform.system() == "Windows": - screenshot_generator = take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) + screenshot_generator = take_screenshot_windows( + base_dir, + previous_hashes, + threshold, + screen_sequences, + date, + timestamp, + app_name, + window_title, + ) else: - raise NotImplementedError(f"Unsupported operating system: {platform.system()}") + raise NotImplementedError( + f"Unsupported operating system: {platform.system()}" + ) screenshots = [] for screen_name, screenshot_file, status in screenshot_generator: @@ -181,6 +269,7 @@ def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date return screenshots + def is_screen_locked(): if platform.system() == "Darwin": session_dict = CGSessionCopyCurrentDictionary() @@ -190,4 +279,4 @@ def is_screen_locked(): return False elif platform.system() == "Windows": user32 = ctypes.windll.User32 - return user32.GetForegroundWindow() == 0 \ No newline at end of file + return user32.GetForegroundWindow() == 0