feat(watch): add watch method with sparsity window

This commit is contained in:
arkohut 2024-10-10 15:45:05 +08:00
parent 6308b95784
commit f405d6e0a1
2 changed files with 300 additions and 12 deletions

View File

@ -1,7 +1,10 @@
import math
import typer
import httpx
import asyncio
import logging
import threading
from tqdm import tqdm
from pathlib import Path
from tabulate import tabulate
@ -10,11 +13,23 @@ from magika import Magika
from datetime import datetime
from enum import Enum
from typing import List, Tuple
import re
import os
import time
from collections import defaultdict, deque
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from concurrent.futures import ThreadPoolExecutor
from memos.read_metadata import read_metadata
from memos.schemas import MetadataSource
from memos.logging_config import LOGGING_CONFIG
import logging.config
# 配置日志
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger(__name__)
lib_app = typer.Typer()
@ -132,6 +147,14 @@ def show(library_id: int):
print(f"Failed to retrieve library: {response.status_code} - {response.text}")
def is_temp_file(filename):
return (
filename.startswith(".")
or filename.startswith("tmp")
or filename.startswith("temp")
)
async def loop_files(library_id, folder, folder_path, force, plugins):
updated_file_count = 0
added_file_count = 0
@ -147,8 +170,10 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
absolute_file_path = file_path.resolve() # Get absolute path
relative_path = absolute_file_path.relative_to(folder_path)
# Check if the file extension is in the include_files list
if file_path.suffix.lower() in include_files:
# Check if the file extension is in the include_files list and not a temp file
if file_path.suffix.lower() in include_files and not is_temp_file(
file
):
scanned_files.add(str(absolute_file_path))
candidate_files.append(str(absolute_file_path))
@ -793,3 +818,225 @@ def sync(
f"Error: File {file_path} does not belong to any folder in the library."
)
raise typer.Exit(code=1)
class LibraryFileHandler(FileSystemEventHandler):
def __init__(
self,
library_id,
include_files,
max_workers=2,
sparsity_factor=2,
window_size=20,
):
self.library_id = library_id
self.include_files = include_files
self.inode_pattern = re.compile(r"\._.+")
self.pending_files = defaultdict(lambda: {"timestamp": 0, "last_size": 0})
self.buffer_time = 2
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.lock = threading.Lock()
self.sparsity_window = 3
self.sparsity_factor = sparsity_factor
self.window_size = window_size
self.pending_times = deque(maxlen=window_size)
self.sync_times = deque(maxlen=window_size)
self.file_count = 0
self.file_submitted = 0
self.file_synced = 0
self.file_skipped = 0
self.logger = logger
def handle_event(self, event):
if not event.is_directory and self.is_valid_file(event.src_path):
current_time = time.time()
with self.lock:
file_info = self.pending_files[event.src_path]
if current_time - file_info["timestamp"] > self.buffer_time:
file_info["timestamp"] = current_time
self.pending_times.append(current_time)
file_info["last_size"] = os.path.getsize(event.src_path)
return True
return False
def process_pending_files(self):
current_time = time.time()
files_to_process = []
processed_in_current_loop = 0
with self.lock:
for path, file_info in list(self.pending_files.items()):
if current_time - file_info["timestamp"] > self.buffer_time:
processed_in_current_loop += 1
if os.path.exists(path) and os.path.getsize(path) > 0:
self.file_count += 1
if self.file_count % self.sparsity_window == 0:
files_to_process.append(path)
print(
f"file_count % sparsity_window: {self.file_count} % {self.sparsity_window} == 0"
)
print(f"Picked file for processing: {path}")
else:
self.file_skipped += 1
del self.pending_files[path]
elif not os.path.exists(path):
del self.pending_files[path]
for path in files_to_process:
self.executor.submit(self.process_file, path)
self.file_submitted += 1
if processed_in_current_loop > 0:
self.logger.info(
f"File count: {self.file_count}, Files submitted: {self.file_submitted}, Files synced: {self.file_synced}, Files skipped: {self.file_skipped}"
)
self.update_sparsity_window()
def process_file(self, path):
self.logger.debug(f"Processing file: {path}")
start_time = time.time()
sync(self.library_id, path)
end_time = time.time()
with self.lock:
self.sync_times.append(end_time - start_time)
self.file_synced += 1
def update_sparsity_window(self):
min_samples = max(3, self.window_size // 3)
max_interval = 60 # Maximum allowed interval between events in seconds
if len(self.pending_times) >= min_samples and len(self.sync_times) >= min_samples:
# Filter out large time gaps
filtered_intervals = [
self.pending_times[i] - self.pending_times[i-1]
for i in range(1, len(self.pending_times))
if self.pending_times[i] - self.pending_times[i-1] <= max_interval
]
if filtered_intervals:
avg_interval = sum(filtered_intervals) / len(filtered_intervals)
pending_files_per_second = 1 / avg_interval if avg_interval > 0 else 0
else:
pending_files_per_second = 0
sync_time_total = sum(self.sync_times)
sync_files_per_second = len(self.sync_times) / sync_time_total if sync_time_total > 0 else 0
if pending_files_per_second > 0 and sync_files_per_second > 0:
rate = pending_files_per_second / sync_files_per_second
new_sparsity_window = max(1, math.ceil(self.sparsity_factor * rate))
if new_sparsity_window != self.sparsity_window:
old_sparsity_window = self.sparsity_window
self.sparsity_window = new_sparsity_window
self.logger.info(f"Updated sparsity window: {old_sparsity_window} -> {self.sparsity_window}")
self.logger.debug(f"Pending files per second: {pending_files_per_second:.2f}")
self.logger.debug(f"Sync files per second: {sync_files_per_second:.2f}")
self.logger.debug(f"Rate (pending/sync): {rate:.2f}")
def is_valid_file(self, path):
filename = os.path.basename(path)
return (
any(path.lower().endswith(ext) for ext in self.include_files)
and not is_temp_file(filename)
and not self.inode_pattern.match(filename)
)
def on_created(self, event):
self.handle_event(event)
def on_modified(self, event):
self.handle_event(event)
def on_moved(self, event):
if self.handle_event(event):
# For moved events, we need to update the key in pending_files
with self.lock:
self.pending_files[event.dest_path] = self.pending_files.pop(
event.src_path, {"timestamp": time.time(), "last_size": 0}
)
def on_deleted(self, event):
if self.is_valid_file(event.src_path):
self.logger.info(f"File deleted: {event.src_path}")
# Remove from pending files if it was there
with self.lock:
self.pending_files.pop(event.src_path, None)
# Add logic for handling deleted files if needed
@lib_app.command("watch")
def watch(
library_id: int,
folders: List[int] = typer.Option(
None, "--folder", "-f", help="Specify folders to watch"
),
sparsity_factor: float = typer.Option(
2.0, "--sparsity-factor", "-sf", help="Sparsity factor for file processing"
),
window_size: int = typer.Option(
20, "--window-size", "-ws", help="Window size for rate calculation"
),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""
Watch for file changes in the library folders and sync automatically.
"""
# Set the logging level based on the verbose flag
log_level = "DEBUG" if verbose else "INFO"
logger.setLevel(log_level)
logger.info(f"Watching library {library_id} for changes...")
# Get the library
response = httpx.get(f"{BASE_URL}/libraries/{library_id}")
if response.status_code != 200:
print(f"Error: Library with id {library_id} not found.")
raise typer.Exit(code=1)
library = response.json()
# Filter folders if the folders parameter is provided
if folders:
library_folders = [
folder for folder in library["folders"] if folder["id"] in folders
]
else:
library_folders = library["folders"]
if not library_folders:
print("No folders to watch.")
return
# Create an observer and handler for each folder in the library
observer = Observer()
handlers = []
for folder in library_folders:
folder_path = Path(folder["path"])
event_handler = LibraryFileHandler(
library_id,
include_files,
sparsity_factor=sparsity_factor,
window_size=window_size,
)
handlers.append(event_handler)
observer.schedule(event_handler, str(folder_path), recursive=True)
print(f"Watching folder: {folder_path}")
observer.start()
try:
while True:
time.sleep(5)
for handler in handlers:
handler.process_pending_files()
except KeyboardInterrupt:
observer.stop()
for handler in handlers:
handler.executor.shutdown(wait=True)
observer.join()

View File

@ -18,7 +18,8 @@ import sys
import subprocess
import platform
from .cmds.plugin import plugin_app, bind
from .cmds.library import lib_app, scan, index
from .cmds.library import lib_app, scan, index, watch
app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]})
@ -47,6 +48,7 @@ def serve():
ts_success = init_typesense()
if db_success and (ts_success or not settings.typesense.enabled):
from .server import run_server
run_server()
else:
print("Server initialization failed. Unable to start the server.")
@ -65,16 +67,15 @@ def init():
print("Initialization failed. Please check the error messages above.")
@app.command("scan")
def scan_default_library(force: bool = False):
def get_or_create_default_library():
"""
Scan the screenshots directory and add it to the library if empty.
Get the default library or create it if it doesn't exist.
Ensure the library has at least one folder.
"""
# Get the default library
response = httpx.get(f"{BASE_URL}/libraries")
if response.status_code != 200:
print(f"Failed to retrieve libraries: {response.status_code} - {response.text}")
return
return None
libraries = response.json()
default_library = next(
@ -91,7 +92,7 @@ def scan_default_library(force: bool = False):
print(
f"Failed to create default library: {response.status_code} - {response.text}"
)
return
return None
default_library = response.json()
for plugin in settings.default_plugins:
@ -103,7 +104,9 @@ def scan_default_library(force: bool = False):
screenshots_dir = Path(settings.screenshots_dir).resolve()
folder = {
"path": str(screenshots_dir),
"last_modified_at": datetime.fromtimestamp(screenshots_dir.stat().st_mtime).isoformat(),
"last_modified_at": datetime.fromtimestamp(
screenshots_dir.stat().st_mtime
).isoformat(),
}
response = httpx.post(
f"{BASE_URL}/libraries/{default_library['id']}/folders",
@ -113,9 +116,21 @@ def scan_default_library(force: bool = False):
print(
f"Failed to add screenshots directory: {response.status_code} - {response.text}"
)
return
return None
print(f"Added screenshots directory: {screenshots_dir}")
return default_library
@app.command("scan")
def scan_default_library(force: bool = False):
"""
Scan the screenshots directory and add it to the library if empty.
"""
default_library = get_or_create_default_library()
if not default_library:
return
# Scan the library
print(f"Scanning library: {default_library['name']}")
scan(default_library["id"], plugins=None, folders=None, force=force)
@ -174,6 +189,32 @@ def record(
time.sleep(10)
@app.command("watch")
def watch_default_library(
window_size: int = typer.Option(
20, "--window-size", "-ws", help="Window size for rate calculation"
),
sparsity_factor: float = typer.Option(
2.0, "--sparsity-factor", "-sf", help="Sparsity factor for file processing"
),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
):
"""
Watch the default library for file changes and sync automatically.
"""
default_library = get_or_create_default_library()
if not default_library:
return
watch(
default_library["id"],
folders=None,
window_size=window_size,
sparsity_factor=sparsity_factor,
verbose=verbose
)
def get_python_path():
return sys.executable
@ -342,4 +383,4 @@ def disable():
if __name__ == "__main__":
app()
app()