From 63c302e017fee9db645547d7e7f0bb01389f1410 Mon Sep 17 00:00:00 2001 From: arkohut <39525455+arkohut@users.noreply.github.com> Date: Mon, 2 Sep 2024 00:03:20 +0800 Subject: [PATCH] feat(screen-recorder): refactor --- pyproject.toml | 8 +- screen_recorder/common.py | 193 +++++++++++++++++++++++++++++ screen_recorder/record-for-win.bat | 2 +- screen_recorder/record-for-win.py | 159 ------------------------ screen_recorder/record.py | 177 ++------------------------ 5 files changed, 210 insertions(+), 329 deletions(-) create mode 100644 screen_recorder/common.py delete mode 100644 screen_recorder/record-for-win.py diff --git a/pyproject.toml b/pyproject.toml index 602fc8d..78e85af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,12 @@ dependencies = [ "opencv-python", "pillow", "piexif", + "imagehash", + "screeninfo", + "pywin32; sys_platform == 'win32'", + "psutil; sys_platform == 'win32'", + "pyobjc-framework-Quartz; sys_platform == 'darwin'", + "pyobjc-framework-AppKit; sys_platform == 'darwin'", ] [project.urls] @@ -37,7 +43,7 @@ Homepage = "https://github.com/arkohut/memos" memos = "memos.commands:app" [tool.setuptools.packages.find] -include = ["memos*"] +include = ["memos*", "screen_recorder*"] [tool.setuptools.package-data] "*" = ["static/**/*"] diff --git a/screen_recorder/common.py b/screen_recorder/common.py new file mode 100644 index 0000000..8326cd6 --- /dev/null +++ b/screen_recorder/common.py @@ -0,0 +1,193 @@ +import json +import os +import time +import logging +import platform +import subprocess +from PIL import Image, ImageGrab +import imagehash +from memos.utils import write_image_metadata +from screeninfo import get_monitors +import ctypes + +if platform.system() == "Windows": + import win32gui + import win32process + import psutil +elif platform.system() == "Darwin": + from AppKit import NSWorkspace + from Quartz import ( + CGWindowListCopyWindowInfo, + kCGWindowListOptionOnScreenOnly, + kCGNullWindowID, + CGSessionCopyCurrentDictionary, + ) + +def load_screen_sequences(base_dir, date): + try: + with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + +def save_screen_sequences(base_dir, screen_sequences, date): + with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f: + json.dump(screen_sequences, f) + f.flush() + os.fsync(f.fileno()) + +def load_previous_hashes(base_dir): + date = time.strftime("%Y%m%d") + hash_file = os.path.join(base_dir, date, ".previous_hashes") + try: + with open(hash_file, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + +def save_previous_hashes(base_dir, previous_hashes): + date = time.strftime("%Y%m%d") + hash_file = os.path.join(base_dir, date, ".previous_hashes") + os.makedirs(os.path.dirname(hash_file), exist_ok=True) + with open(hash_file, "w") as f: + json.dump(previous_hashes, f) + +def get_active_window_info(): + if platform.system() == "Darwin": + active_app = NSWorkspace.sharedWorkspace().activeApplication() + app_name = active_app["NSApplicationName"] + app_pid = active_app["NSApplicationProcessIdentifier"] + + windows = CGWindowListCopyWindowInfo( + kCGWindowListOptionOnScreenOnly, kCGNullWindowID + ) + for window in windows: + if window["kCGWindowOwnerPID"] == app_pid: + window_title = window.get("kCGWindowName", "") + if window_title: + return app_name, window_title + + return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题 + elif platform.system() == "Windows": + try: + window = win32gui.GetForegroundWindow() + _, pid = win32process.GetWindowThreadProcessId(window) + app_name = psutil.Process(pid).name() + window_title = win32gui.GetWindowText(window) + return app_name, window_title + except: + return "", "" + +def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title): + screenshots = [] + result = subprocess.check_output(["system_profiler", "SPDisplaysDataType", "-json"]) + displays_info = json.loads(result)["SPDisplaysDataType"][0]["spdisplays_ndrvs"] + screen_names = {} + + for display_index, display_info in enumerate(displays_info): + base_screen_name = display_info["_name"].replace(" ", "_").lower() + if base_screen_name in screen_names: + screen_names[base_screen_name] += 1 + screen_name = f"{base_screen_name}_{screen_names[base_screen_name]}" + else: + screen_names[base_screen_name] = 1 + screen_name = base_screen_name + + temp_filename = os.path.join(base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png") + subprocess.run(["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename]) + + with Image.open(temp_filename) as img: + img = img.convert("RGB") + webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp") + current_hash = str(imagehash.phash(img)) + + if (screen_name in previous_hashes and + imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[screen_name]) < threshold): + logging.info(f"Screenshot for {screen_name} is similar to the previous one. Skipping.") + os.remove(temp_filename) + yield screen_name, None, "Skipped (similar to previous)" + continue + + previous_hashes[screen_name] = current_hash + screen_sequences[screen_name] = screen_sequences.get(screen_name, 0) + 1 + + metadata = { + "timestamp": timestamp, + "active_app": app_name, + "active_window": window_title, + "screen_name": screen_name, + "sequence": screen_sequences[screen_name], + } + + img.save(webp_filename, format="WebP", quality=85) + write_image_metadata(webp_filename, metadata) + save_screen_sequences(base_dir, screen_sequences, date) + + os.remove(temp_filename) + screenshots.append(webp_filename) + yield screen_name, webp_filename, "Saved" + +def take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title): + for monitor in get_monitors(): + safe_monitor_name = ''.join(c for c in monitor.name if c.isalnum() or c in ('_', '-')) + logging.info(f"Processing monitor: {safe_monitor_name}") + + webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp") + + img = ImageGrab.grab(bbox=(monitor.x, monitor.y, monitor.x + monitor.width, monitor.y + monitor.height)) + img = img.convert("RGB") + current_hash = str(imagehash.phash(img)) + + if safe_monitor_name in previous_hashes and imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[safe_monitor_name]) < threshold: + logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.") + yield safe_monitor_name, None, "Skipped (similar to previous)" + continue + + previous_hashes[safe_monitor_name] = current_hash + screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1 + + metadata = { + "timestamp": timestamp, + "active_app": app_name, + "active_window": window_title, + "screen_name": safe_monitor_name, + "sequence": screen_sequences[safe_monitor_name], + } + + img.save(webp_filename, format="WebP", quality=85) + write_image_metadata(webp_filename, metadata) + save_screen_sequences(base_dir, screen_sequences, date) + + yield safe_monitor_name, webp_filename, "Saved" + +def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp): + app_name, window_title = get_active_window_info() + os.makedirs(os.path.join(base_dir, date), exist_ok=True) + worklog_path = os.path.join(base_dir, date, "worklog") + + with open(worklog_path, "a") as worklog: + if platform.system() == "Darwin": + screenshot_generator = take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) + elif platform.system() == "Windows": + screenshot_generator = take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) + else: + raise NotImplementedError(f"Unsupported operating system: {platform.system()}") + + screenshots = [] + for screen_name, screenshot_file, status in screenshot_generator: + worklog.write(f"{timestamp} - {screen_name} - {status}\n") + if screenshot_file: + screenshots.append(screenshot_file) + + return screenshots + +def is_screen_locked(): + if platform.system() == "Darwin": + session_dict = CGSessionCopyCurrentDictionary() + if session_dict: + screen_locked = session_dict.get("CGSSessionScreenIsLocked", 0) + return bool(screen_locked) + return False + elif platform.system() == "Windows": + user32 = ctypes.windll.User32 + return user32.GetForegroundWindow() == 0 \ No newline at end of file diff --git a/screen_recorder/record-for-win.bat b/screen_recorder/record-for-win.bat index ca93ae2..2f58624 100644 --- a/screen_recorder/record-for-win.bat +++ b/screen_recorder/record-for-win.bat @@ -1,6 +1,6 @@ @echo off call C:\Users\arkoh\miniconda3\Scripts\activate.bat memos :loop -python -m screen_recorder.record-for-win --once +python -m screen_recorder.record --once timeout /t 5 /nobreak >nul goto loop \ No newline at end of file diff --git a/screen_recorder/record-for-win.py b/screen_recorder/record-for-win.py deleted file mode 100644 index 473aa19..0000000 --- a/screen_recorder/record-for-win.py +++ /dev/null @@ -1,159 +0,0 @@ -import logging -import time -import os -import json -import argparse -import imagehash -from PIL import ImageGrab, Image -import win32gui -import win32process -import psutil -from memos.utils import write_image_metadata -import ctypes -from screeninfo import get_monitors - -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def get_active_window_info(): - try: - window = win32gui.GetForegroundWindow() - _, pid = win32process.GetWindowThreadProcessId(window) - app_name = psutil.Process(pid).name() - window_title = win32gui.GetWindowText(window) - return app_name, window_title - except: - return "", "" - -def load_screen_sequences(base_dir, date): - try: - with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f: - return json.load(f) - except FileNotFoundError: - return {} - -def save_screen_sequences(base_dir, screen_sequences, date): - with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f: - json.dump(screen_sequences, f) - f.flush() - os.fsync(f.fileno()) - -def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp): - screenshots = [] - app_name, window_title = get_active_window_info() - - os.makedirs(os.path.join(base_dir, date), exist_ok=True) - worklog_path = os.path.join(base_dir, date, "worklog") - - with open(worklog_path, "a") as worklog: - for monitor in get_monitors(): - safe_monitor_name = ''.join(c for c in monitor.name if c.isalnum() or c in ('_', '-')) - logging.info(f"Processing monitor: {safe_monitor_name}") - - webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp") - - img = ImageGrab.grab(bbox=(monitor.x, monitor.y, monitor.x + monitor.width, monitor.y + monitor.height)) - img = img.convert("RGB") - current_hash = str(imagehash.phash(img)) - - if safe_monitor_name in previous_hashes and imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[safe_monitor_name]) < threshold: - logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.") - worklog.write(f"{timestamp} - {safe_monitor_name} - Skipped (similar to previous)\n") - continue - - previous_hashes[safe_monitor_name] = current_hash - screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1 - - metadata = { - "timestamp": timestamp, - "active_app": app_name, - "active_window": window_title, - "screen_name": safe_monitor_name, - "sequence": screen_sequences[safe_monitor_name], - } - - img.save(webp_filename, format="WebP", quality=85) - write_image_metadata(webp_filename, metadata) - save_screen_sequences(base_dir, screen_sequences, date) - - screenshots.append(webp_filename) - worklog.write(f"{timestamp} - {safe_monitor_name} - Saved\n") - - return screenshots - -def is_screen_locked(): - user32 = ctypes.windll.User32 - return user32.GetForegroundWindow() == 0 - -def load_previous_hashes(base_dir): - date = time.strftime("%Y%m%d") - hash_file = os.path.join(base_dir, date, ".previous_hashes") - try: - with open(hash_file, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} - -def save_previous_hashes(base_dir, previous_hashes): - date = time.strftime("%Y%m%d") - hash_file = os.path.join(base_dir, date, ".previous_hashes") - os.makedirs(os.path.dirname(hash_file), exist_ok=True) - with open(hash_file, "w") as f: - json.dump(previous_hashes, f) - -def run_screen_recorder_once(args, base_dir, previous_hashes): - if not is_screen_locked(): - date = time.strftime("%Y%m%d") - timestamp = time.strftime("%Y%m%d-%H%M%S") - screen_sequences = load_screen_sequences(base_dir, date) - screenshot_files = take_screenshot( - base_dir, previous_hashes, args.threshold, screen_sequences, date, timestamp - ) - for screenshot_file in screenshot_files: - logging.info(f"Screenshot saved: {screenshot_file}") - save_previous_hashes(base_dir, previous_hashes) - else: - logging.info("Screen is locked. Skipping screenshot.") - -def run_screen_recorder(args, base_dir, previous_hashes): - while True: - try: - if not is_screen_locked(): - date = time.strftime("%Y%m%d") - timestamp = time.strftime("%Y%m%d-%H%M%S") - screen_sequences = load_screen_sequences(base_dir, date) - screenshot_files = take_screenshot( - base_dir, previous_hashes, args.threshold, screen_sequences, date, timestamp - ) - for screenshot_file in screenshot_files: - logging.info(f"Screenshot saved: {screenshot_file}") - else: - logging.info("Screen is locked. Skipping screenshot.") - except Exception as e: - logging.error(f"An error occurred: {str(e)}. Skipping this iteration.") - - time.sleep(5) - -def main(): - parser = argparse.ArgumentParser(description="Screen Recorder for Windows") - parser.add_argument("--threshold", type=int, default=4, help="Threshold for image similarity") - parser.add_argument("--base-dir", type=str, default="~/tmp", help="Base directory for screenshots") - parser.add_argument("--once", action="store_true", help="Run once and exit") - args = parser.parse_args() - - base_dir = os.path.expanduser(args.base_dir) - previous_hashes = load_previous_hashes(base_dir) - - if args.once: - run_screen_recorder_once(args, base_dir, previous_hashes) - else: - while True: - try: - run_screen_recorder(args, base_dir, previous_hashes) - except Exception as e: - logging.error(f"Critical error occurred, program will restart in 10 seconds: {str(e)}") - time.sleep(10) - - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/screen_recorder/record.py b/screen_recorder/record.py index 2f4bc49..3666ca1 100644 --- a/screen_recorder/record.py +++ b/screen_recorder/record.py @@ -1,177 +1,21 @@ import logging import time -import subprocess -from AppKit import NSWorkspace -from PIL import Image import os -from Quartz import ( - CGWindowListCopyWindowInfo, - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID, - CGSessionCopyCurrentDictionary, -) import json -import imagehash import argparse -from memos.utils import write_image_metadata +import platform +from screen_recorder.common import ( + load_screen_sequences, + save_screen_sequences, + load_previous_hashes, + save_previous_hashes, + take_screenshot, + is_screen_locked, +) # 在文件开头添加日志配置 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -def get_active_window_info(): - active_app = NSWorkspace.sharedWorkspace().activeApplication() - app_name = active_app["NSApplicationName"] - app_pid = active_app["NSApplicationProcessIdentifier"] - - windows = CGWindowListCopyWindowInfo( - kCGWindowListOptionOnScreenOnly, kCGNullWindowID - ) - for window in windows: - if window["kCGWindowOwnerPID"] == app_pid: - window_title = window.get("kCGWindowName", "") - if window_title: - return app_name, window_title - - return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题 - - -def load_screen_sequences(base_dir, date): - try: - with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f: - return json.load(f) - except FileNotFoundError: - return {} - - -def save_screen_sequences(base_dir, screen_sequences, date): - with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f: - json.dump(screen_sequences, f) - f.flush() - os.fsync(f.fileno()) - - -def take_screenshot( - base_dir, previous_hashes, threshold, screen_sequences, date, timestamp -): - screenshots = [] - - # 获取连接的显示器数量 - result = subprocess.check_output(["system_profiler", "SPDisplaysDataType", "-json"]) - displays_info = json.loads(result)["SPDisplaysDataType"][0]["spdisplays_ndrvs"] - - app_name, window_title = get_active_window_info() - - # 创建日期目录 - os.makedirs(os.path.join(base_dir, date), exist_ok=True) - - # 打开 worklog 文件 - worklog_path = os.path.join(base_dir, date, "worklog") - with open(worklog_path, "a") as worklog: - screen_names = {} - for display_index, display_info in enumerate(displays_info): - # 获取显示器基础名称 - base_screen_name = display_info["_name"].replace(" ", "_").lower() - - # 检查是否存在重复名称 - if base_screen_name in screen_names: - screen_names[base_screen_name] += 1 - screen_name = f"{base_screen_name}_{screen_names[base_screen_name]}" - else: - screen_names[base_screen_name] = 1 - screen_name = base_screen_name - - # 生成临时 PNG 文件名 - temp_filename = os.path.join( - os.path.join(base_dir, date), - f"temp_screenshot-{timestamp}-of-{screen_name}.png", - ) - - # 使用 screencapture 命令进行截图,-D 选项指定显示器 - subprocess.run( - ["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename] - ) - - # 压缩图像为 WebP 并添加元数据 - with Image.open(temp_filename) as img: - img = img.convert("RGB") - webp_filename = os.path.join( - os.path.join(base_dir, date), - f"screenshot-{timestamp}-of-{screen_name}.webp", - ) - - # 计算当前截图的哈希值 - current_hash = str(imagehash.phash(img)) - - # 检查当前截图与前一次截图的哈希值是否相似 - if ( - screen_name in previous_hashes - and imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[screen_name]) < threshold - ): - logging.info(f"Screenshot for {screen_name} is similar to the previous one. Skipping.") - os.remove(temp_filename) - # 记录跳过的截图 - worklog.write( - f"{timestamp} - {screen_name} - Skipped (similar to previous)\n" - ) - continue - - # 更新前一次截图的哈希值 - previous_hashes[screen_name] = current_hash - - # 更新序列号 - screen_sequences[screen_name] = screen_sequences.get(screen_name, 0) + 1 - - # 准备元数据 - metadata = { - "timestamp": timestamp, - "active_app": app_name, - "active_window": window_title, - "screen_name": screen_name, - "sequence": screen_sequences[screen_name], # 添加序列号到元数据 - } - - # 使用 write_image_metadata 函数写入元数据 - img.save(webp_filename, format="WebP", quality=85) - write_image_metadata(webp_filename, metadata) - save_screen_sequences(base_dir, screen_sequences, date) - - # 删除临时 PNG 文件 - os.remove(temp_filename) - - # 添加 WebP 文件到截图列表 - screenshots.append(webp_filename) - # 记录成功的截图 - worklog.write(f"{timestamp} - {screen_name} - Saved\n") - - return screenshots - - -def is_screen_locked(): - session_dict = CGSessionCopyCurrentDictionary() - if session_dict: - screen_locked = session_dict.get("CGSSessionScreenIsLocked", 0) - return bool(screen_locked) - return False - - -def load_previous_hashes(base_dir): - date = time.strftime("%Y%m%d") - hash_file = os.path.join(base_dir, date, ".previous_hashes") - try: - with open(hash_file, "r") as f: - return json.load(f) - except FileNotFoundError: - return {} - - -def save_previous_hashes(base_dir, previous_hashes): - date = time.strftime("%Y%m%d") - hash_file = os.path.join(base_dir, date, ".previous_hashes") - os.makedirs(os.path.dirname(hash_file), exist_ok=True) - with open(hash_file, "w") as f: - json.dump(previous_hashes, f) - - def run_screen_recorder_once(args, base_dir, previous_hashes): if not is_screen_locked(): date = time.strftime("%Y%m%d") @@ -186,7 +30,6 @@ def run_screen_recorder_once(args, base_dir, previous_hashes): else: logging.info("Screen is locked. Skipping screenshot.") - def run_screen_recorder(args, base_dir, previous_hashes): while True: try: @@ -206,7 +49,6 @@ def run_screen_recorder(args, base_dir, previous_hashes): time.sleep(5) - def main(): parser = argparse.ArgumentParser(description="Screen Recorder") parser.add_argument( @@ -231,6 +73,5 @@ def main(): logging.error(f"Critical error occurred, program will restart in 10 seconds: {str(e)}") time.sleep(10) - if __name__ == "__main__": main() \ No newline at end of file