diff --git a/memos/cmds/library.py b/memos/cmds/library.py index 7b76156..4535805 100644 --- a/memos/cmds/library.py +++ b/memos/cmds/library.py @@ -1,7 +1,10 @@ +import math import typer import httpx import asyncio import logging +import threading + from tqdm import tqdm from pathlib import Path from tabulate import tabulate @@ -10,11 +13,23 @@ from magika import Magika from datetime import datetime from enum import Enum from typing import List, Tuple +import re import os +import time + +from collections import defaultdict, deque +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +from concurrent.futures import ThreadPoolExecutor from memos.read_metadata import read_metadata from memos.schemas import MetadataSource +from memos.logging_config import LOGGING_CONFIG +import logging.config +# 配置日志 +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger(__name__) lib_app = typer.Typer() @@ -132,6 +147,14 @@ def show(library_id: int): print(f"Failed to retrieve library: {response.status_code} - {response.text}") +def is_temp_file(filename): + return ( + filename.startswith(".") + or filename.startswith("tmp") + or filename.startswith("temp") + ) + + async def loop_files(library_id, folder, folder_path, force, plugins): updated_file_count = 0 added_file_count = 0 @@ -147,8 +170,10 @@ async def loop_files(library_id, folder, folder_path, force, plugins): absolute_file_path = file_path.resolve() # Get absolute path relative_path = absolute_file_path.relative_to(folder_path) - # Check if the file extension is in the include_files list - if file_path.suffix.lower() in include_files: + # Check if the file extension is in the include_files list and not a temp file + if file_path.suffix.lower() in include_files and not is_temp_file( + file + ): scanned_files.add(str(absolute_file_path)) candidate_files.append(str(absolute_file_path)) @@ -793,3 +818,225 @@ def sync( f"Error: File {file_path} does not belong to any folder in the library." ) raise typer.Exit(code=1) + + +class LibraryFileHandler(FileSystemEventHandler): + def __init__( + self, + library_id, + include_files, + max_workers=2, + sparsity_factor=2, + window_size=20, + ): + self.library_id = library_id + self.include_files = include_files + self.inode_pattern = re.compile(r"\._.+") + self.pending_files = defaultdict(lambda: {"timestamp": 0, "last_size": 0}) + self.buffer_time = 2 + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.lock = threading.Lock() + + self.sparsity_window = 3 + self.sparsity_factor = sparsity_factor + self.window_size = window_size + + self.pending_times = deque(maxlen=window_size) + self.sync_times = deque(maxlen=window_size) + + self.file_count = 0 + self.file_submitted = 0 + self.file_synced = 0 + self.file_skipped = 0 + self.logger = logger + + def handle_event(self, event): + if not event.is_directory and self.is_valid_file(event.src_path): + current_time = time.time() + with self.lock: + file_info = self.pending_files[event.src_path] + + if current_time - file_info["timestamp"] > self.buffer_time: + file_info["timestamp"] = current_time + self.pending_times.append(current_time) + + file_info["last_size"] = os.path.getsize(event.src_path) + + return True + return False + + def process_pending_files(self): + current_time = time.time() + files_to_process = [] + processed_in_current_loop = 0 + with self.lock: + for path, file_info in list(self.pending_files.items()): + if current_time - file_info["timestamp"] > self.buffer_time: + processed_in_current_loop += 1 + if os.path.exists(path) and os.path.getsize(path) > 0: + self.file_count += 1 + if self.file_count % self.sparsity_window == 0: + files_to_process.append(path) + print( + f"file_count % sparsity_window: {self.file_count} % {self.sparsity_window} == 0" + ) + print(f"Picked file for processing: {path}") + else: + self.file_skipped += 1 + del self.pending_files[path] + elif not os.path.exists(path): + del self.pending_files[path] + + for path in files_to_process: + self.executor.submit(self.process_file, path) + self.file_submitted += 1 + + if processed_in_current_loop > 0: + self.logger.info( + f"File count: {self.file_count}, Files submitted: {self.file_submitted}, Files synced: {self.file_synced}, Files skipped: {self.file_skipped}" + ) + + self.update_sparsity_window() + + def process_file(self, path): + self.logger.debug(f"Processing file: {path}") + start_time = time.time() + sync(self.library_id, path) + end_time = time.time() + with self.lock: + self.sync_times.append(end_time - start_time) + self.file_synced += 1 + + def update_sparsity_window(self): + min_samples = max(3, self.window_size // 3) + max_interval = 60 # Maximum allowed interval between events in seconds + + if len(self.pending_times) >= min_samples and len(self.sync_times) >= min_samples: + # Filter out large time gaps + filtered_intervals = [ + self.pending_times[i] - self.pending_times[i-1] + for i in range(1, len(self.pending_times)) + if self.pending_times[i] - self.pending_times[i-1] <= max_interval + ] + + if filtered_intervals: + avg_interval = sum(filtered_intervals) / len(filtered_intervals) + pending_files_per_second = 1 / avg_interval if avg_interval > 0 else 0 + else: + pending_files_per_second = 0 + + sync_time_total = sum(self.sync_times) + sync_files_per_second = len(self.sync_times) / sync_time_total if sync_time_total > 0 else 0 + + if pending_files_per_second > 0 and sync_files_per_second > 0: + rate = pending_files_per_second / sync_files_per_second + new_sparsity_window = max(1, math.ceil(self.sparsity_factor * rate)) + + if new_sparsity_window != self.sparsity_window: + old_sparsity_window = self.sparsity_window + self.sparsity_window = new_sparsity_window + self.logger.info(f"Updated sparsity window: {old_sparsity_window} -> {self.sparsity_window}") + self.logger.debug(f"Pending files per second: {pending_files_per_second:.2f}") + self.logger.debug(f"Sync files per second: {sync_files_per_second:.2f}") + self.logger.debug(f"Rate (pending/sync): {rate:.2f}") + + def is_valid_file(self, path): + filename = os.path.basename(path) + return ( + any(path.lower().endswith(ext) for ext in self.include_files) + and not is_temp_file(filename) + and not self.inode_pattern.match(filename) + ) + + def on_created(self, event): + self.handle_event(event) + + def on_modified(self, event): + self.handle_event(event) + + def on_moved(self, event): + if self.handle_event(event): + # For moved events, we need to update the key in pending_files + with self.lock: + self.pending_files[event.dest_path] = self.pending_files.pop( + event.src_path, {"timestamp": time.time(), "last_size": 0} + ) + + def on_deleted(self, event): + if self.is_valid_file(event.src_path): + self.logger.info(f"File deleted: {event.src_path}") + # Remove from pending files if it was there + with self.lock: + self.pending_files.pop(event.src_path, None) + # Add logic for handling deleted files if needed + + +@lib_app.command("watch") +def watch( + library_id: int, + folders: List[int] = typer.Option( + None, "--folder", "-f", help="Specify folders to watch" + ), + sparsity_factor: float = typer.Option( + 2.0, "--sparsity-factor", "-sf", help="Sparsity factor for file processing" + ), + window_size: int = typer.Option( + 20, "--window-size", "-ws", help="Window size for rate calculation" + ), + verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging") +): + """ + Watch for file changes in the library folders and sync automatically. + """ + # Set the logging level based on the verbose flag + log_level = "DEBUG" if verbose else "INFO" + logger.setLevel(log_level) + + logger.info(f"Watching library {library_id} for changes...") + + # Get the library + response = httpx.get(f"{BASE_URL}/libraries/{library_id}") + if response.status_code != 200: + print(f"Error: Library with id {library_id} not found.") + raise typer.Exit(code=1) + + library = response.json() + + # Filter folders if the folders parameter is provided + if folders: + library_folders = [ + folder for folder in library["folders"] if folder["id"] in folders + ] + else: + library_folders = library["folders"] + + if not library_folders: + print("No folders to watch.") + return + + # Create an observer and handler for each folder in the library + observer = Observer() + handlers = [] + for folder in library_folders: + folder_path = Path(folder["path"]) + event_handler = LibraryFileHandler( + library_id, + include_files, + sparsity_factor=sparsity_factor, + window_size=window_size, + ) + handlers.append(event_handler) + observer.schedule(event_handler, str(folder_path), recursive=True) + print(f"Watching folder: {folder_path}") + + observer.start() + try: + while True: + time.sleep(5) + for handler in handlers: + handler.process_pending_files() + except KeyboardInterrupt: + observer.stop() + for handler in handlers: + handler.executor.shutdown(wait=True) + observer.join() \ No newline at end of file diff --git a/memos/commands.py b/memos/commands.py index 1e34166..2864ef2 100644 --- a/memos/commands.py +++ b/memos/commands.py @@ -18,7 +18,8 @@ import sys import subprocess import platform from .cmds.plugin import plugin_app, bind -from .cmds.library import lib_app, scan, index +from .cmds.library import lib_app, scan, index, watch + app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]}) @@ -47,6 +48,7 @@ def serve(): ts_success = init_typesense() if db_success and (ts_success or not settings.typesense.enabled): from .server import run_server + run_server() else: print("Server initialization failed. Unable to start the server.") @@ -65,16 +67,15 @@ def init(): print("Initialization failed. Please check the error messages above.") -@app.command("scan") -def scan_default_library(force: bool = False): +def get_or_create_default_library(): """ - Scan the screenshots directory and add it to the library if empty. + Get the default library or create it if it doesn't exist. + Ensure the library has at least one folder. """ - # Get the default library response = httpx.get(f"{BASE_URL}/libraries") if response.status_code != 200: print(f"Failed to retrieve libraries: {response.status_code} - {response.text}") - return + return None libraries = response.json() default_library = next( @@ -91,7 +92,7 @@ def scan_default_library(force: bool = False): print( f"Failed to create default library: {response.status_code} - {response.text}" ) - return + return None default_library = response.json() for plugin in settings.default_plugins: @@ -103,7 +104,9 @@ def scan_default_library(force: bool = False): screenshots_dir = Path(settings.screenshots_dir).resolve() folder = { "path": str(screenshots_dir), - "last_modified_at": datetime.fromtimestamp(screenshots_dir.stat().st_mtime).isoformat(), + "last_modified_at": datetime.fromtimestamp( + screenshots_dir.stat().st_mtime + ).isoformat(), } response = httpx.post( f"{BASE_URL}/libraries/{default_library['id']}/folders", @@ -113,9 +116,21 @@ def scan_default_library(force: bool = False): print( f"Failed to add screenshots directory: {response.status_code} - {response.text}" ) - return + return None print(f"Added screenshots directory: {screenshots_dir}") + return default_library + + +@app.command("scan") +def scan_default_library(force: bool = False): + """ + Scan the screenshots directory and add it to the library if empty. + """ + default_library = get_or_create_default_library() + if not default_library: + return + # Scan the library print(f"Scanning library: {default_library['name']}") scan(default_library["id"], plugins=None, folders=None, force=force) @@ -174,6 +189,32 @@ def record( time.sleep(10) +@app.command("watch") +def watch_default_library( + window_size: int = typer.Option( + 20, "--window-size", "-ws", help="Window size for rate calculation" + ), + sparsity_factor: float = typer.Option( + 2.0, "--sparsity-factor", "-sf", help="Sparsity factor for file processing" + ), + verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging") +): + """ + Watch the default library for file changes and sync automatically. + """ + default_library = get_or_create_default_library() + if not default_library: + return + + watch( + default_library["id"], + folders=None, + window_size=window_size, + sparsity_factor=sparsity_factor, + verbose=verbose + ) + + def get_python_path(): return sys.executable @@ -342,4 +383,4 @@ def disable(): if __name__ == "__main__": - app() \ No newline at end of file + app()