feat(screen-recorder): use imagegrab for win

This commit is contained in:
arkohut 2024-08-23 00:00:00 +08:00
parent 0d371d759e
commit 7f8c846e51

View File

@ -1,28 +1,19 @@
import logging import logging
import datetime
import time import time
import os import os
import json import json
import argparse import argparse
import imagehash import imagehash
from PIL import Image from PIL import ImageGrab
import win32gui import win32gui
import win32process import win32process
import psutil import psutil
from mss import mss
from memos.utils import write_image_metadata from memos.utils import write_image_metadata
import win32api
import pywintypes
import ctypes import ctypes
import ctypes.wintypes
from screeninfo import get_monitors from screeninfo import get_monitors
# 在文件开头添加日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_monitor_info():
return {monitor.name: monitor for monitor in get_monitors()}
def get_active_window_info(): def get_active_window_info():
try: try:
window = win32gui.GetForegroundWindow() window = win32gui.GetForegroundWindow()
@ -50,75 +41,52 @@ def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date
screenshots = [] screenshots = []
app_name, window_title = get_active_window_info() app_name, window_title = get_active_window_info()
# Create date directory
os.makedirs(os.path.join(base_dir, date), exist_ok=True) os.makedirs(os.path.join(base_dir, date), exist_ok=True)
worklog_path = os.path.join(base_dir, date, "worklog") worklog_path = os.path.join(base_dir, date, "worklog")
monitor_infos = get_monitor_info()
# Open worklog file
with open(worklog_path, "a") as worklog: with open(worklog_path, "a") as worklog:
with mss() as sct: for monitor in get_monitors():
for i, monitor in enumerate(sct.monitors[1:], 1): # Skip the first full-screen monitor safe_monitor_name = ''.join(c for c in monitor.name if c.isalnum() or c in ('_', '-'))
monitor_name = monitor_infos.get(f"\\\\.\\DISPLAY{i}", f"screen_{i}").name logging.info(f"Processing monitor: {safe_monitor_name}")
safe_monitor_name = ''.join(c for c in monitor_name if c.isalnum() or c in ('_', '-'))
logging.info(f"Processing monitor: {safe_monitor_name}") # Debug output
jpeg_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.jpg") jpeg_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.jpg")
screen = sct.grab(monitor) img = ImageGrab.grab(bbox=(monitor.x, monitor.y, monitor.x + monitor.width, monitor.y + monitor.height))
img = Image.frombytes("RGB", screen.size, screen.bgra, "raw", "BGRX") current_hash = imagehash.phash(img)
# Calculate hash of current screenshot if safe_monitor_name in previous_hashes and current_hash - previous_hashes[safe_monitor_name] < threshold:
current_hash = imagehash.phash(img) logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.")
worklog.write(f"{timestamp} - {safe_monitor_name} - Skipped (similar to previous)\n")
continue
# Check if current screenshot is similar to the previous one previous_hashes[safe_monitor_name] = current_hash
if safe_monitor_name in previous_hashes and current_hash - previous_hashes[safe_monitor_name] < threshold: screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1
logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.")
worklog.write(
f"{timestamp} - {safe_monitor_name} - Skipped (similar to previous)\n"
)
continue
# Update previous screenshot hash metadata = {
previous_hashes[safe_monitor_name] = current_hash "timestamp": timestamp,
"active_app": app_name,
"active_window": window_title,
"screen_name": safe_monitor_name,
"sequence": screen_sequences[safe_monitor_name],
}
# Update sequence number img.save(jpeg_filename, format="JPEG", quality=85)
screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1 write_image_metadata(jpeg_filename, metadata)
save_screen_sequences(base_dir, screen_sequences, date)
# Prepare metadata screenshots.append(jpeg_filename)
metadata = { worklog.write(f"{timestamp} - {safe_monitor_name} - Saved\n")
"timestamp": timestamp,
"active_app": app_name,
"active_window": window_title,
"screen_name": safe_monitor_name,
"sequence": screen_sequences[safe_monitor_name], # Add sequence number to metadata
}
# Save image and write metadata
img.save(jpeg_filename, format="JPEG", quality=85)
write_image_metadata(jpeg_filename, metadata)
save_screen_sequences(base_dir, screen_sequences, date)
screenshots.append(jpeg_filename)
# Record successful screenshot
worklog.write(f"{timestamp} - {safe_monitor_name} - Saved\n")
return screenshots return screenshots
def is_screen_locked(): def is_screen_locked():
import ctypes
user32 = ctypes.windll.User32 user32 = ctypes.windll.User32
return user32.GetForegroundWindow() == 0 return user32.GetForegroundWindow() == 0
def main(): def main():
parser = argparse.ArgumentParser(description="Screen Recorder for Windows") parser = argparse.ArgumentParser(description="Screen Recorder for Windows")
parser.add_argument( parser.add_argument("--threshold", type=int, default=4, help="Threshold for image similarity")
"--threshold", type=int, default=4, help="Threshold for image similarity" parser.add_argument("--base-dir", type=str, default="~/tmp", help="Base directory for screenshots")
)
parser.add_argument(
"--base-dir", type=str, default="~/tmp", help="Base directory for screenshots"
)
args = parser.parse_args() args = parser.parse_args()
base_dir = os.path.expanduser(args.base_dir) base_dir = os.path.expanduser(args.base_dir)
@ -131,12 +99,7 @@ def main():
timestamp = time.strftime("%Y%m%d-%H%M%S") timestamp = time.strftime("%Y%m%d-%H%M%S")
screen_sequences = load_screen_sequences(base_dir, date) screen_sequences = load_screen_sequences(base_dir, date)
screenshot_files = take_screenshot( screenshot_files = take_screenshot(
base_dir, base_dir, previous_hashes, args.threshold, screen_sequences, date, timestamp
previous_hashes,
args.threshold,
screen_sequences,
date,
timestamp,
) )
for screenshot_file in screenshot_files: for screenshot_file in screenshot_files:
logging.info(f"Screenshot taken: {screenshot_file}") logging.info(f"Screenshot taken: {screenshot_file}")