feat: merge memos-record into memos

This commit is contained in:
arkohut 2024-09-28 23:28:43 +08:00
parent 228cdee66f
commit 04621f74ff
4 changed files with 117 additions and 79 deletions

View File

@ -17,6 +17,13 @@ from .config import settings
from .models import init_database
from .initialize_typesense import init_typesense
import pathspec
from .record import (
run_screen_recorder_once,
run_screen_recorder,
load_previous_hashes,
save_previous_hashes,
)
import time # Add this import at the top of the file
IS_THUMBNAIL = "is_thumbnail"
@ -859,5 +866,30 @@ def index_default_library(
index(default_library["id"], force=force, folders=None, batchsize=batchsize)
@app.command("record")
def record(
threshold: int = typer.Option(4, help="Threshold for image similarity"),
base_dir: str = typer.Option(None, help="Base directory for screenshots"),
once: bool = typer.Option(False, help="Run once and exit"),
):
"""
Record screenshots of the screen.
"""
base_dir = os.path.expanduser(base_dir) if base_dir else settings.screenshots_dir
previous_hashes = load_previous_hashes(base_dir)
if once:
run_screen_recorder_once(threshold, base_dir, previous_hashes)
else:
while True:
try:
run_screen_recorder(threshold, base_dir, previous_hashes)
except Exception as e:
logging.error(
f"Critical error occurred, program will restart in 10 seconds: {str(e)}"
)
time.sleep(10)
if __name__ == "__main__":
app()
app()

View File

@ -4,12 +4,16 @@ import time
import logging
import platform
import subprocess
import argparse
from PIL import Image
import imagehash
from memos.utils import write_image_metadata
import ctypes
from mss import mss
from pathlib import Path
from memos.config import settings
# Import platform-specific modules
if platform.system() == "Windows":
import win32gui
import win32process
@ -23,7 +27,12 @@ elif platform.system() == "Darwin":
CGSessionCopyCurrentDictionary,
)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# Functions moved from common.py
def load_screen_sequences(base_dir, date):
try:
with open(os.path.join(base_dir, date, ".screen_sequences"), "r") as f:
@ -174,7 +183,9 @@ def take_screenshot_windows(
window_title,
):
with mss() as sct:
for i, monitor in enumerate(sct.monitors[1:], 1): # Skip the first monitor (entire screen)
for i, monitor in enumerate(
sct.monitors[1:], 1
): # Skip the first monitor (entire screen)
safe_monitor_name = f"monitor_{i}"
logging.info(f"Processing monitor: {safe_monitor_name}")
@ -272,3 +283,74 @@ def is_screen_locked():
elif platform.system() == "Windows":
user32 = ctypes.windll.User32
return user32.GetForegroundWindow() == 0
def run_screen_recorder_once(threshold, base_dir, previous_hashes):
if not is_screen_locked():
date = time.strftime("%Y%m%d")
timestamp = time.strftime("%Y%m%d-%H%M%S")
screen_sequences = load_screen_sequences(base_dir, date)
screenshot_files = take_screenshot(
base_dir, previous_hashes, threshold, screen_sequences, date, timestamp
)
for screenshot_file in screenshot_files:
logging.info(f"Screenshot saved: {screenshot_file}")
save_previous_hashes(base_dir, previous_hashes)
else:
logging.info("Screen is locked. Skipping screenshot.")
def run_screen_recorder(threshold, base_dir, previous_hashes):
while True:
try:
if not is_screen_locked():
date = time.strftime("%Y%m%d")
timestamp = time.strftime("%Y%m%d-%H%M%S")
screen_sequences = load_screen_sequences(base_dir, date)
screenshot_files = take_screenshot(
base_dir,
previous_hashes,
threshold,
screen_sequences,
date,
timestamp,
)
for screenshot_file in screenshot_files:
logging.info(f"Screenshot saved: {screenshot_file}")
else:
logging.info("Screen is locked. Skipping screenshot.")
except Exception as e:
logging.error(f"An error occurred: {str(e)}. Skipping this iteration.")
time.sleep(5)
def main():
parser = argparse.ArgumentParser(description="Screen Recorder")
parser.add_argument(
"--threshold", type=int, default=4, help="Threshold for image similarity"
)
parser.add_argument("--base-dir", type=str, help="Base directory for screenshots")
parser.add_argument("--once", action="store_true", help="Run once and exit")
args = parser.parse_args()
base_dir = (
os.path.expanduser(args.base_dir) if args.base_dir else settings.screenshots_dir
)
previous_hashes = load_previous_hashes(base_dir)
if args.once:
run_screen_recorder_once(args, base_dir, previous_hashes)
else:
while True:
try:
run_screen_recorder(args, base_dir, previous_hashes)
except Exception as e:
logging.error(
f"Critical error occurred, program will restart in 10 seconds: {str(e)}"
)
time.sleep(10)
if __name__ == "__main__":
main()

View File

@ -51,10 +51,9 @@ Homepage = "https://github.com/arkohut/memos"
[project.scripts]
memos = "memos.commands:app"
memos-record = "screen_recorder.record:main"
[tool.setuptools.packages.find]
include = ["memos*", "screen_recorder*"]
include = ["memos*"]
[tool.setuptools.package-data]
"*" = ["static/**/*"]

View File

@ -1,75 +0,0 @@
import logging
import time
import os
import argparse
from screen_recorder.common import (
load_screen_sequences,
load_previous_hashes,
save_previous_hashes,
take_screenshot,
is_screen_locked,
)
from pathlib import Path
from memos.config import settings
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def run_screen_recorder_once(args, base_dir, previous_hashes):
if not is_screen_locked():
date = time.strftime("%Y%m%d")
timestamp = time.strftime("%Y%m%d-%H%M%S")
screen_sequences = load_screen_sequences(base_dir, date)
screenshot_files = take_screenshot(
base_dir, previous_hashes, args.threshold, screen_sequences, date, timestamp
)
for screenshot_file in screenshot_files:
logging.info(f"Screenshot saved: {screenshot_file}")
save_previous_hashes(base_dir, previous_hashes)
else:
logging.info("Screen is locked. Skipping screenshot.")
def run_screen_recorder(args, base_dir, previous_hashes):
while True:
try:
if not is_screen_locked():
date = time.strftime("%Y%m%d")
timestamp = time.strftime("%Y%m%d-%H%M%S")
screen_sequences = load_screen_sequences(base_dir, date)
screenshot_files = take_screenshot(
base_dir, previous_hashes, args.threshold, screen_sequences, date, timestamp
)
for screenshot_file in screenshot_files:
logging.info(f"Screenshot saved: {screenshot_file}")
else:
logging.info("Screen is locked. Skipping screenshot.")
except Exception as e:
logging.error(f"An error occurred: {str(e)}. Skipping this iteration.")
time.sleep(5)
def main():
parser = argparse.ArgumentParser(description="Screen Recorder")
parser.add_argument(
"--threshold", type=int, default=4, help="Threshold for image similarity"
)
parser.add_argument(
"--base-dir", type=str, help="Base directory for screenshots"
)
parser.add_argument("--once", action="store_true", help="Run once and exit")
args = parser.parse_args()
base_dir = os.path.expanduser(args.base_dir) if args.base_dir else settings.screenshots_dir
previous_hashes = load_previous_hashes(base_dir)
if args.once:
run_screen_recorder_once(args, base_dir, previous_hashes)
else:
while True:
try:
run_screen_recorder(args, base_dir, previous_hashes)
except Exception as e:
logging.error(f"Critical error occurred, program will restart in 10 seconds: {str(e)}")
time.sleep(10)
if __name__ == "__main__":
main()