mirror of
https://github.com/tcsenpai/pensieve.git
synced 2025-06-07 03:35:24 +00:00
feat(screen-recorder): add for windows
This commit is contained in:
parent
1d4faf2652
commit
7d548e680e
136
screen-recorder/record-for-win.py
Normal file
136
screen-recorder/record-for-win.py
Normal file
@ -0,0 +1,136 @@
|
||||
import time
|
||||
import os
|
||||
import json
|
||||
import argparse
|
||||
import imagehash
|
||||
from PIL import Image
|
||||
import win32gui
|
||||
import win32process
|
||||
import psutil
|
||||
from mss import mss
|
||||
from .utils import write_image_metadata
|
||||
import win32api
|
||||
import pywintypes
|
||||
import ctypes
|
||||
import ctypes.wintypes
|
||||
from screeninfo import get_monitors
|
||||
|
||||
def get_monitor_info():
|
||||
return {monitor.name: monitor for monitor in get_monitors()}
|
||||
|
||||
def get_active_window_info():
|
||||
try:
|
||||
window = win32gui.GetForegroundWindow()
|
||||
_, pid = win32process.GetWindowThreadProcessId(window)
|
||||
app_name = psutil.Process(pid).name()
|
||||
window_title = win32gui.GetWindowText(window)
|
||||
return app_name, window_title
|
||||
except:
|
||||
return "", ""
|
||||
|
||||
def load_screen_sequences(base_dir, date):
|
||||
try:
|
||||
with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
def save_screen_sequences(base_dir, screen_sequences, date):
|
||||
with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f:
|
||||
json.dump(screen_sequences, f)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp):
|
||||
screenshots = []
|
||||
app_name, window_title = get_active_window_info()
|
||||
|
||||
os.makedirs(os.path.join(base_dir, date), exist_ok=True)
|
||||
worklog_path = os.path.join(base_dir, date, "worklog")
|
||||
|
||||
monitor_infos = get_monitor_info()
|
||||
|
||||
with open(worklog_path, "a") as worklog:
|
||||
with mss() as sct:
|
||||
for i, monitor in enumerate(sct.monitors[1:], 1): # 跳过第一个全屏显示器
|
||||
monitor_name = monitor_infos.get(f"\\\\.\\DISPLAY{i}", f"screen_{i}").name
|
||||
safe_monitor_name = ''.join(c for c in monitor_name if c.isalnum() or c in ('_', '-'))
|
||||
print(f"处理显示器: {safe_monitor_name}") # 调试输出
|
||||
|
||||
jpeg_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.jpg")
|
||||
|
||||
screen = sct.grab(monitor)
|
||||
img = Image.frombytes("RGB", screen.size, screen.bgra, "raw", "BGRX")
|
||||
|
||||
current_hash = imagehash.phash(img)
|
||||
|
||||
if safe_monitor_name in previous_hashes and current_hash - previous_hashes[safe_monitor_name] < threshold:
|
||||
print(f"截图 {safe_monitor_name} 与上一张相似。跳过。")
|
||||
worklog.write(
|
||||
f"{timestamp} - {safe_monitor_name} - Skipped (similar to previous)\n"
|
||||
)
|
||||
continue
|
||||
|
||||
previous_hashes[safe_monitor_name] = current_hash
|
||||
screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1
|
||||
|
||||
metadata = {
|
||||
"timestamp": timestamp,
|
||||
"active_app": app_name,
|
||||
"active_window": window_title,
|
||||
"screen_name": safe_monitor_name,
|
||||
"sequence": screen_sequences[safe_monitor_name],
|
||||
}
|
||||
|
||||
img.save(jpeg_filename, format="JPEG", quality=85)
|
||||
write_image_metadata(jpeg_filename, metadata)
|
||||
save_screen_sequences(base_dir, screen_sequences, date)
|
||||
|
||||
screenshots.append(jpeg_filename)
|
||||
worklog.write(f"{timestamp} - {safe_monitor_name} - Saved\n")
|
||||
|
||||
return screenshots
|
||||
|
||||
def is_screen_locked():
|
||||
import ctypes
|
||||
user32 = ctypes.windll.User32
|
||||
return user32.GetForegroundWindow() == 0
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Screen Recorder for Windows")
|
||||
parser.add_argument(
|
||||
"--threshold", type=int, default=4, help="Threshold for image similarity"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--base-dir", type=str, default="~/tmp", help="Base directory for screenshots"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
base_dir = os.path.expanduser(args.base_dir)
|
||||
previous_hashes = {}
|
||||
|
||||
while True:
|
||||
try:
|
||||
if not is_screen_locked():
|
||||
date = time.strftime("%Y%m%d")
|
||||
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
||||
screen_sequences = load_screen_sequences(base_dir, date)
|
||||
screenshot_files = take_screenshot(
|
||||
base_dir,
|
||||
previous_hashes,
|
||||
args.threshold,
|
||||
screen_sequences,
|
||||
date,
|
||||
timestamp,
|
||||
)
|
||||
for screenshot_file in screenshot_files:
|
||||
print(f"Screenshot taken: {screenshot_file}")
|
||||
else:
|
||||
print("Screen is locked. Skipping screenshot.")
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {str(e)}. Skipping this iteration.")
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user