feat(watch): battery aware

This commit is contained in:
arkohut 2024-10-15 15:13:57 +08:00
parent 93dc5f2fff
commit d823d834b9
2 changed files with 78 additions and 24 deletions

View File

@ -13,9 +13,12 @@ from magika import Magika
from datetime import datetime
from enum import Enum
from typing import List, Tuple
from functools import lru_cache
import re
import os
import time
import psutil
from collections import defaultdict, deque
from watchdog.observers import Observer
@ -28,7 +31,7 @@ from memos.schemas import MetadataSource
from memos.logging_config import LOGGING_CONFIG
import logging.config
# 配置日志
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger(__name__)
@ -537,7 +540,9 @@ async def update_entity(
def reindex(
library_id: int,
folders: List[int] = typer.Option(None, "--folder", "-f"),
force: bool = typer.Option(False, "--force", help="Force recreate FTS and vector tables before reindexing"),
force: bool = typer.Option(
False, "--force", help="Force recreate FTS and vector tables before reindexing"
),
):
print(f"Reindexing library {library_id}")
@ -565,7 +570,7 @@ def reindex(
with httpx.Client(timeout=60) as client:
total_entities = 0
# Get total entity count for all folders
for folder in library_folders:
response = client.get(
@ -575,7 +580,9 @@ def reindex(
if response.status_code == 200:
total_entities += int(response.headers.get("X-Total-Count", 0))
else:
print(f"Failed to get entity count for folder {folder['id']}: {response.status_code} - {response.text}")
print(
f"Failed to get entity count for folder {folder['id']}: {response.status_code} - {response.text}"
)
# Now process entities with a progress bar
with tqdm(total=total_entities, desc="Reindexing entities") as pbar:
@ -591,7 +598,9 @@ def reindex(
params={"limit": limit, "offset": offset},
)
if entities_response.status_code != 200:
print(f"Failed to get entities: {entities_response.status_code} - {entities_response.text}")
print(
f"Failed to get entities: {entities_response.status_code} - {entities_response.text}"
)
break
entities = entities_response.json()
@ -603,12 +612,16 @@ def reindex(
if entity["id"] in scanned_entities:
continue
update_response = client.post(f"{BASE_URL}/entities/{entity['id']}/last-scan-at")
update_response = client.post(
f"{BASE_URL}/entities/{entity['id']}/last-scan-at"
)
if update_response.status_code != 204:
print(f"Failed to update last_scan_at for entity {entity['id']}: {update_response.status_code} - {update_response.text}")
print(
f"Failed to update last_scan_at for entity {entity['id']}: {update_response.status_code} - {update_response.text}"
)
else:
scanned_entities.add(entity["id"])
pbar.update(1)
offset += limit
@ -844,7 +857,9 @@ def sync(
"key": key,
"value": str(value),
"source": MetadataSource.SYSTEM_GENERATED.value,
"data_type": "number" if isinstance(value, (int, float)) else "text",
"data_type": (
"number" if isinstance(value, (int, float)) else "text"
),
}
for key, value in metadata.items()
if key != IS_THUMBNAIL
@ -860,14 +875,18 @@ def sync(
if is_thumbnail:
new_entity["file_created_at"] = existing_entity["file_created_at"]
new_entity["file_last_modified_at"] = existing_entity["file_last_modified_at"]
new_entity["file_last_modified_at"] = existing_entity[
"file_last_modified_at"
]
new_entity["file_type"] = existing_entity["file_type"]
new_entity["file_type_group"] = existing_entity["file_type_group"]
new_entity["size"] = existing_entity["size"]
# Merge existing metadata with new metadata
if new_entity.get("metadata_entries"):
new_metadata_keys = {entry["key"] for entry in new_entity["metadata_entries"]}
new_metadata_keys = {
entry["key"] for entry in new_entity["metadata_entries"]
}
for existing_entry in existing_entity["metadata_entries"]:
if existing_entry["key"] not in new_metadata_keys:
new_entity["metadata_entries"].append(existing_entry)
@ -928,6 +947,16 @@ def sync(
raise typer.Exit(code=1)
@lru_cache(maxsize=1)
def is_on_battery():
try:
battery = psutil.sensors_battery()
return battery is not None and not battery.power_plugged
except:
return False # If unable to detect battery status, assume not on battery
# Modify the LibraryFileHandler class
class LibraryFileHandler(FileSystemEventHandler):
def __init__(
self,
@ -945,7 +974,7 @@ class LibraryFileHandler(FileSystemEventHandler):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.lock = threading.Lock()
self.sparsity_window = 6
self.sparsity_window = 12
self.sparsity_factor = sparsity_factor
self.window_size = window_size
@ -958,6 +987,10 @@ class LibraryFileHandler(FileSystemEventHandler):
self.file_skipped = 0
self.logger = logger
self.base_sparsity_window = self.sparsity_window
self.last_battery_check = 0
self.battery_check_interval = 60 # Check battery status every 60 seconds
def handle_event(self, event):
if not event.is_directory and self.is_valid_file(event.src_path):
current_time = time.time()
@ -998,7 +1031,7 @@ class LibraryFileHandler(FileSystemEventHandler):
for path in files_to_process:
self.executor.submit(self.process_file, path)
self.file_submitted += 1
if processed_in_current_loop > 0:
self.logger.info(
f"File count: {self.file_count}, Files submitted: {self.file_submitted}, Files synced: {self.file_synced}, Files skipped: {self.file_skipped}"
@ -1019,12 +1052,15 @@ class LibraryFileHandler(FileSystemEventHandler):
min_samples = max(3, self.window_size // 3)
max_interval = 60 # Maximum allowed interval between events in seconds
if len(self.pending_times) >= min_samples and len(self.sync_times) >= min_samples:
if (
len(self.pending_times) >= min_samples
and len(self.sync_times) >= min_samples
):
# Filter out large time gaps
filtered_intervals = [
self.pending_times[i] - self.pending_times[i-1]
self.pending_times[i] - self.pending_times[i - 1]
for i in range(1, len(self.pending_times))
if self.pending_times[i] - self.pending_times[i-1] <= max_interval
if self.pending_times[i] - self.pending_times[i - 1] <= max_interval
]
if filtered_intervals:
@ -1034,18 +1070,34 @@ class LibraryFileHandler(FileSystemEventHandler):
pending_files_per_second = 0
sync_time_total = sum(self.sync_times)
sync_files_per_second = len(self.sync_times) / sync_time_total if sync_time_total > 0 else 0
sync_files_per_second = (
len(self.sync_times) / sync_time_total if sync_time_total > 0 else 0
)
if pending_files_per_second > 0 and sync_files_per_second > 0:
rate = pending_files_per_second / sync_files_per_second
new_sparsity_window = max(1, math.ceil(self.sparsity_factor * rate))
current_time = time.time()
if current_time - self.last_battery_check > self.battery_check_interval:
self.last_battery_check = current_time
is_on_battery.cache_clear() # Clear the cache to get fresh battery status
new_sparsity_window = (
new_sparsity_window * 2 if is_on_battery() else new_sparsity_window
)
if new_sparsity_window != self.sparsity_window:
old_sparsity_window = self.sparsity_window
self.sparsity_window = new_sparsity_window
self.logger.info(f"Updated sparsity window: {old_sparsity_window} -> {self.sparsity_window}")
self.logger.debug(f"Pending files per second: {pending_files_per_second:.2f}")
self.logger.debug(f"Sync files per second: {sync_files_per_second:.2f}")
self.logger.info(
f"Updated sparsity window: {old_sparsity_window} -> {self.sparsity_window}"
)
self.logger.debug(
f"Pending files per second: {pending_files_per_second:.2f}"
)
self.logger.debug(
f"Sync files per second: {sync_files_per_second:.2f}"
)
self.logger.debug(f"Rate (pending/sync): {rate:.2f}")
def is_valid_file(self, path):
@ -1091,7 +1143,9 @@ def watch(
window_size: int = typer.Option(
10, "--window-size", "-ws", help="Window size for rate calculation"
),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose logging")
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Enable verbose logging"
),
):
"""
Watch for file changes in the library folders and sync automatically.
@ -1099,7 +1153,7 @@ def watch(
# Set the logging level based on the verbose flag
log_level = "DEBUG" if verbose else "INFO"
logger.setLevel(log_level)
logger.info(f"Watching library {library_id} for changes...")
# Get the library
@ -1147,4 +1201,4 @@ def watch(
observer.stop()
for handler in handlers:
handler.executor.shutdown(wait=True)
observer.join()
observer.join()

View File

@ -276,7 +276,7 @@
type="text"
class={inputClasses}
bind:value={searchString}
placeholder="Type to search..."
placeholder="Input keyword to search or press Enter to show latest records"
on:keydown={handleEnterPress}
autofocus
/>