refactor: split win and macos method in screen record

This commit is contained in:
arkohut 2024-09-02 11:35:16 +08:00
parent 7479398770
commit bc205eca11

View File

@ -23,6 +23,7 @@ elif platform.system() == "Darwin":
CGSessionCopyCurrentDictionary, CGSessionCopyCurrentDictionary,
) )
def load_screen_sequences(base_dir, date): def load_screen_sequences(base_dir, date):
try: try:
with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f: with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f:
@ -30,12 +31,14 @@ def load_screen_sequences(base_dir, date):
except FileNotFoundError: except FileNotFoundError:
return {} return {}
def save_screen_sequences(base_dir, screen_sequences, date): def save_screen_sequences(base_dir, screen_sequences, date):
with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f: with open(os.path.join(base_dir, date, ".screen_sequences"), "w") as f:
json.dump(screen_sequences, f) json.dump(screen_sequences, f)
f.flush() f.flush()
os.fsync(f.fileno()) os.fsync(f.fileno())
def load_previous_hashes(base_dir): def load_previous_hashes(base_dir):
date = time.strftime("%Y%m%d") date = time.strftime("%Y%m%d")
hash_file = os.path.join(base_dir, date, ".previous_hashes") hash_file = os.path.join(base_dir, date, ".previous_hashes")
@ -45,6 +48,7 @@ def load_previous_hashes(base_dir):
except FileNotFoundError: except FileNotFoundError:
return {} return {}
def save_previous_hashes(base_dir, previous_hashes): def save_previous_hashes(base_dir, previous_hashes):
date = time.strftime("%Y%m%d") date = time.strftime("%Y%m%d")
hash_file = os.path.join(base_dir, date, ".previous_hashes") hash_file = os.path.join(base_dir, date, ".previous_hashes")
@ -52,38 +56,57 @@ def save_previous_hashes(base_dir, previous_hashes):
with open(hash_file, "w") as f: with open(hash_file, "w") as f:
json.dump(previous_hashes, f) json.dump(previous_hashes, f)
def get_active_window_info_darwin():
active_app = NSWorkspace.sharedWorkspace().activeApplication()
app_name = active_app["NSApplicationName"]
app_pid = active_app["NSApplicationProcessIdentifier"]
windows = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
for window in windows:
if window["kCGWindowOwnerPID"] == app_pid:
window_title = window.get("kCGWindowName", "")
if window_title:
return app_name, window_title
return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题
def get_active_window_info_windows():
try:
window = win32gui.GetForegroundWindow()
_, pid = win32process.GetWindowThreadProcessId(window)
app_name = psutil.Process(pid).name()
window_title = win32gui.GetWindowText(window)
return app_name, window_title
except:
return "", ""
def get_active_window_info(): def get_active_window_info():
if platform.system() == "Darwin": if platform.system() == "Darwin":
active_app = NSWorkspace.sharedWorkspace().activeApplication() return get_active_window_info_darwin()
app_name = active_app["NSApplicationName"]
app_pid = active_app["NSApplicationProcessIdentifier"]
windows = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
for window in windows:
if window["kCGWindowOwnerPID"] == app_pid:
window_title = window.get("kCGWindowName", "")
if window_title:
return app_name, window_title
return app_name, "" # 如果没有找到窗口标题,则返回空字符串作为标题
elif platform.system() == "Windows": elif platform.system() == "Windows":
try: return get_active_window_info_windows()
window = win32gui.GetForegroundWindow()
_, pid = win32process.GetWindowThreadProcessId(window)
app_name = psutil.Process(pid).name()
window_title = win32gui.GetWindowText(window)
return app_name, window_title
except:
return "", ""
def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title):
def take_screenshot_macos(
base_dir,
previous_hashes,
threshold,
screen_sequences,
date,
timestamp,
app_name,
window_title,
):
screenshots = [] screenshots = []
result = subprocess.check_output(["system_profiler", "SPDisplaysDataType", "-json"]) result = subprocess.check_output(["system_profiler", "SPDisplaysDataType", "-json"])
displays_info = json.loads(result)["SPDisplaysDataType"][0]["spdisplays_ndrvs"] displays_info = json.loads(result)["SPDisplaysDataType"][0]["spdisplays_ndrvs"]
screen_names = {} screen_names = {}
for display_index, display_info in enumerate(displays_info): for display_index, display_info in enumerate(displays_info):
base_screen_name = display_info["_name"].replace(" ", "_").lower() base_screen_name = display_info["_name"].replace(" ", "_").lower()
if base_screen_name in screen_names: if base_screen_name in screen_names:
@ -93,17 +116,29 @@ def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences
screen_names[base_screen_name] = 1 screen_names[base_screen_name] = 1
screen_name = base_screen_name screen_name = base_screen_name
temp_filename = os.path.join(base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png") temp_filename = os.path.join(
subprocess.run(["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename]) base_dir, date, f"temp_screenshot-{timestamp}-of-{screen_name}.png"
)
subprocess.run(
["screencapture", "-C", "-x", "-D", str(display_index + 1), temp_filename]
)
with Image.open(temp_filename) as img: with Image.open(temp_filename) as img:
img = img.convert("RGB") img = img.convert("RGB")
webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp") webp_filename = os.path.join(
base_dir, date, f"screenshot-{timestamp}-of-{screen_name}.webp"
)
current_hash = str(imagehash.phash(img)) current_hash = str(imagehash.phash(img))
if (screen_name in previous_hashes and if (
imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[screen_name]) < threshold): screen_name in previous_hashes
logging.info(f"Screenshot for {screen_name} is similar to the previous one. Skipping.") and imagehash.hex_to_hash(current_hash)
- imagehash.hex_to_hash(previous_hashes[screen_name])
< threshold
):
logging.info(
f"Screenshot for {screen_name} is similar to the previous one. Skipping."
)
os.remove(temp_filename) os.remove(temp_filename)
yield screen_name, None, "Skipped (similar to previous)" yield screen_name, None, "Skipped (similar to previous)"
continue continue
@ -127,24 +162,54 @@ def take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences
screenshots.append(webp_filename) screenshots.append(webp_filename)
yield screen_name, webp_filename, "Saved" yield screen_name, webp_filename, "Saved"
def take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title):
def take_screenshot_windows(
base_dir,
previous_hashes,
threshold,
screen_sequences,
date,
timestamp,
app_name,
window_title,
):
for monitor in get_monitors(): for monitor in get_monitors():
safe_monitor_name = ''.join(c for c in monitor.name if c.isalnum() or c in ('_', '-')) safe_monitor_name = "".join(
c for c in monitor.name if c.isalnum() or c in ("_", "-")
)
logging.info(f"Processing monitor: {safe_monitor_name}") logging.info(f"Processing monitor: {safe_monitor_name}")
webp_filename = os.path.join(base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp") webp_filename = os.path.join(
base_dir, date, f"screenshot-{timestamp}-of-{safe_monitor_name}.webp"
)
img = ImageGrab.grab(bbox=(monitor.x, monitor.y, monitor.x + monitor.width, monitor.y + monitor.height)) img = ImageGrab.grab(
bbox=(
monitor.x,
monitor.y,
monitor.x + monitor.width,
monitor.y + monitor.height,
)
)
img = img.convert("RGB") img = img.convert("RGB")
current_hash = str(imagehash.phash(img)) current_hash = str(imagehash.phash(img))
if safe_monitor_name in previous_hashes and imagehash.hex_to_hash(current_hash) - imagehash.hex_to_hash(previous_hashes[safe_monitor_name]) < threshold: if (
logging.info(f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping.") safe_monitor_name in previous_hashes
and imagehash.hex_to_hash(current_hash)
- imagehash.hex_to_hash(previous_hashes[safe_monitor_name])
< threshold
):
logging.info(
f"Screenshot for {safe_monitor_name} is similar to the previous one. Skipping."
)
yield safe_monitor_name, None, "Skipped (similar to previous)" yield safe_monitor_name, None, "Skipped (similar to previous)"
continue continue
previous_hashes[safe_monitor_name] = current_hash previous_hashes[safe_monitor_name] = current_hash
screen_sequences[safe_monitor_name] = screen_sequences.get(safe_monitor_name, 0) + 1 screen_sequences[safe_monitor_name] = (
screen_sequences.get(safe_monitor_name, 0) + 1
)
metadata = { metadata = {
"timestamp": timestamp, "timestamp": timestamp,
@ -160,18 +225,41 @@ def take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequenc
yield safe_monitor_name, webp_filename, "Saved" yield safe_monitor_name, webp_filename, "Saved"
def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp):
def take_screenshot(
base_dir, previous_hashes, threshold, screen_sequences, date, timestamp
):
app_name, window_title = get_active_window_info() app_name, window_title = get_active_window_info()
os.makedirs(os.path.join(base_dir, date), exist_ok=True) os.makedirs(os.path.join(base_dir, date), exist_ok=True)
worklog_path = os.path.join(base_dir, date, "worklog") worklog_path = os.path.join(base_dir, date, "worklog")
with open(worklog_path, "a") as worklog: with open(worklog_path, "a") as worklog:
if platform.system() == "Darwin": if platform.system() == "Darwin":
screenshot_generator = take_screenshot_macos(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) screenshot_generator = take_screenshot_macos(
base_dir,
previous_hashes,
threshold,
screen_sequences,
date,
timestamp,
app_name,
window_title,
)
elif platform.system() == "Windows": elif platform.system() == "Windows":
screenshot_generator = take_screenshot_windows(base_dir, previous_hashes, threshold, screen_sequences, date, timestamp, app_name, window_title) screenshot_generator = take_screenshot_windows(
base_dir,
previous_hashes,
threshold,
screen_sequences,
date,
timestamp,
app_name,
window_title,
)
else: else:
raise NotImplementedError(f"Unsupported operating system: {platform.system()}") raise NotImplementedError(
f"Unsupported operating system: {platform.system()}"
)
screenshots = [] screenshots = []
for screen_name, screenshot_file, status in screenshot_generator: for screen_name, screenshot_file, status in screenshot_generator:
@ -181,6 +269,7 @@ def take_screenshot(base_dir, previous_hashes, threshold, screen_sequences, date
return screenshots return screenshots
def is_screen_locked(): def is_screen_locked():
if platform.system() == "Darwin": if platform.system() == "Darwin":
session_dict = CGSessionCopyCurrentDictionary() session_dict = CGSessionCopyCurrentDictionary()
@ -190,4 +279,4 @@ def is_screen_locked():
return False return False
elif platform.system() == "Windows": elif platform.system() == "Windows":
user32 = ctypes.windll.User32 user32 = ctypes.windll.User32
return user32.GetForegroundWindow() == 0 return user32.GetForegroundWindow() == 0