feat(scan): support concurrent webhook trigger

This commit is contained in:
arkohut 2024-07-22 16:34:11 +08:00
parent 3855daf5b4
commit 506e5eedd0

View File

@ -1,14 +1,17 @@
import asyncio
import os import os
import time import time
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import List from typing import List, Tuple
import httpx import httpx
import typer import typer
from .server import run_server from .server import run_server
from tabulate import tabulate from tabulate import tabulate
from tqdm import tqdm from tqdm import tqdm
from enum import Enum
from magika import Magika from magika import Magika
app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]}) app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]})
@ -25,6 +28,20 @@ BASE_URL = "http://localhost:8080"
ignore_files = [".DS_Store"] ignore_files = [".DS_Store"]
# Configure logging
logging.basicConfig(
level=logging.WARNING, # Set the logging level to WARNING or higher
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Optionally, you can set the logging level for specific libraries
logging.getLogger("httpx").setLevel(logging.ERROR)
logging.getLogger("typer").setLevel(logging.ERROR)
class FileStatus(Enum):
UPDATED = "updated"
ADDED = "added"
def format_timestamp(timestamp): def format_timestamp(timestamp):
if isinstance(timestamp, str): if isinstance(timestamp, str):
@ -112,6 +129,105 @@ def show(library_id: int):
print(f"Failed to retrieve library: {response.status_code} - {response.text}") print(f"Failed to retrieve library: {response.status_code} - {response.text}")
async def loop_files(library_id, folder, folder_path, force, plugins):
updated_file_count = 0
added_file_count = 0
scanned_files = set()
semaphore = asyncio.Semaphore(8)
async with httpx.AsyncClient() as client:
tasks = []
for root, _, files in os.walk(folder_path):
with tqdm(total=len(files), desc=f"Scanning {folder_path}", leave=False) as pbar:
for file in files:
pbar.update(1)
file_path = Path(root) / file
absolute_file_path = file_path.resolve() # Get absolute path
if file in ignore_files:
continue
scanned_files.add(str(absolute_file_path))
file_stat = file_path.stat()
file_type, file_type_group = get_file_type(absolute_file_path)
new_entity = {
"filename": file_path.name,
"filepath": str(absolute_file_path), # Save absolute path
"size": file_stat.st_size,
"file_created_at": format_timestamp(file_stat.st_ctime),
"file_last_modified_at": format_timestamp(file_stat.st_mtime),
"file_type": file_type,
"file_type_group": file_type_group,
"folder_id": folder["id"],
}
# Check if the entity already exists
get_response = await client.get(
f"{BASE_URL}/libraries/{library_id}/entities/by-filepath",
params={
"filepath": str(absolute_file_path)
}, # Use relative path
)
if get_response.status_code == 200:
existing_entity = get_response.json()
existing_created_at = format_timestamp(
existing_entity["file_created_at"]
)
new_created_at = format_timestamp(new_entity["file_created_at"])
existing_modified_at = format_timestamp(
existing_entity["file_last_modified_at"]
)
new_modified_at = format_timestamp(
new_entity["file_last_modified_at"]
)
if (
force
or existing_created_at != new_created_at
or existing_modified_at != new_modified_at
):
# Update the existing entity
tasks.append(
update_entity(
client,
semaphore,
plugins,
new_entity,
existing_entity,
)
)
else:
# Add the new entity
tasks.append(
add_entity(
client, semaphore, library_id, plugins, new_entity
)
)
for future in tqdm(
asyncio.as_completed(tasks),
desc=f"Processing {folder_path}",
total=len(tasks),
leave=False,
):
file_path, file_status, succeeded, response = await future
if file_status == FileStatus.ADDED:
if succeeded:
added_file_count += 1
tqdm.write(f"Added file to library: {file_path}")
else:
tqdm.write(
f"Failed to add file: {response.status_code} - {response.text}"
)
elif file_status == FileStatus.UPDATED:
if succeeded:
updated_file_count += 1
tqdm.write(f"Updated file in library: {file_path}")
else:
tqdm.write(
f"Failed to update file: {response.status_code} - {response.text}"
)
return added_file_count, updated_file_count, scanned_files
@lib_app.command("scan") @lib_app.command("scan")
def scan( def scan(
library_id: int, library_id: int,
@ -144,111 +260,11 @@ def scan(
tqdm.write(f"Folder does not exist or is not a directory: {folder_path}") tqdm.write(f"Folder does not exist or is not a directory: {folder_path}")
continue continue
start_time = time.time() # Start the timer added_file_count, updated_file_count, scanned_files = asyncio.run(
file_count = 0 # Initialize the file counter loop_files(library_id, folder, folder_path, force, plugins)
scanned_files = set() # To keep track of scanned files )
total_files_added += added_file_count
for root, _, files in os.walk(folder_path): total_files_updated += updated_file_count
with tqdm(
total=len(files), desc=f"Scanning {os.path.basename(root)}", leave=False
) as pbar:
for file in files:
current_time = time.time() - start_time # Calculate elapsed time
elapsed_time_str = f"{int(current_time // 60):02d}m:{int(current_time % 60):02d}s" # Format as mm:ss for brevity
pbar.set_postfix_str(
f"Files: {file_count}, Time: {elapsed_time_str}", refresh=False
)
pbar.update(1)
file_path = Path(root) / file
absolute_file_path = file_path.resolve() # Get absolute path
if file in ignore_files:
continue
scanned_files.add(
str(absolute_file_path)
) # Add to scanned files set
file_stat = file_path.stat()
file_type, file_type_group = get_file_type(absolute_file_path)
new_entity = {
"filename": file_path.name,
"filepath": str(absolute_file_path), # Save absolute path
"size": file_stat.st_size,
"file_created_at": format_timestamp(file_stat.st_ctime),
"file_last_modified_at": format_timestamp(file_stat.st_mtime),
"file_type": file_type,
"file_type_group": file_type_group,
"folder_id": folder["id"],
}
# Check if the entity already exists
get_response = httpx.get(
f"{BASE_URL}/libraries/{library_id}/entities/by-filepath",
params={
"filepath": str(absolute_file_path)
}, # Use relative path
)
if get_response.status_code == 200:
existing_entity = get_response.json()
existing_created_at = format_timestamp(
existing_entity["file_created_at"]
)
new_created_at = format_timestamp(new_entity["file_created_at"])
existing_modified_at = format_timestamp(
existing_entity["file_last_modified_at"]
)
new_modified_at = format_timestamp(
new_entity["file_last_modified_at"]
)
if (
force
or existing_created_at != new_created_at
or existing_modified_at != new_modified_at
):
# Show the difference before update
tqdm.write(f"Updating file: {file_path}")
tqdm.write(
f"Old created at: {existing_created_at}, New created at: {new_created_at}"
)
tqdm.write(
f"Old last modified at: {existing_modified_at}, New last modified at: {new_modified_at}"
)
# Update the existing entity
update_response = httpx.put(
f"{BASE_URL}/entities/{existing_entity['id']}",
json=new_entity,
params={
"trigger_webhooks_flag": "true",
**({"plugins": plugins} if plugins else {}),
},
timeout=60,
)
if 200 <= update_response.status_code < 300:
tqdm.write(f"Updated file in library: {file_path}")
total_files_updated += 1
else:
tqdm.write(
f"Failed to update file: {update_response.status_code} - {update_response.text}"
)
else:
tqdm.write(
f"File already exists in library and is up-to-date: {file_path}"
)
continue
# Add the new entity
post_response = httpx.post(
f"{BASE_URL}/libraries/{library_id}/entities",
json=new_entity,
params={"plugins": plugins} if plugins else {},
timeout=60,
)
if 200 <= post_response.status_code < 300:
tqdm.write(f"Added file to library: {file_path}")
total_files_added += 1
else:
tqdm.write(
f"Failed to add file: {post_response.status_code} - {post_response.text}"
)
file_count += 1
# Check for deleted files # Check for deleted files
limit = 200 limit = 200
@ -257,6 +273,7 @@ def scan(
existing_files_response = httpx.get( existing_files_response = httpx.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities", f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities",
params={"limit": limit, "offset": offset}, params={"limit": limit, "offset": offset},
timeout=60,
) )
if existing_files_response.status_code != 200: if existing_files_response.status_code != 200:
tqdm.write( tqdm.write(
@ -291,6 +308,49 @@ def scan(
print(f"Total files deleted: {total_files_deleted}") print(f"Total files deleted: {total_files_deleted}")
async def add_entity(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
library_id,
plugins,
new_entity,
) -> Tuple[FileStatus, bool, httpx.Response]:
async with semaphore:
post_response = await client.post(
f"{BASE_URL}/libraries/{library_id}/entities",
json=new_entity,
params={"plugins": plugins} if plugins else {},
timeout=60,
)
if 200 <= post_response.status_code < 300:
return new_entity["filepath"], FileStatus.ADDED, True, post_response
else:
return new_entity["filepath"], FileStatus.ADDED, False, post_response
async def update_entity(
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
plugins,
new_entity,
existing_entity,
) -> Tuple[FileStatus, bool, httpx.Response]:
async with semaphore:
update_response = await client.put(
f"{BASE_URL}/entities/{existing_entity['id']}",
json=new_entity,
params={
"trigger_webhooks_flag": "true",
**({"plugins": plugins} if plugins else {}),
},
timeout=60,
)
if 200 <= update_response.status_code < 300:
return new_entity["filepath"], FileStatus.UPDATED, True, update_response
else:
return new_entity["filepath"], FileStatus.UPDATED, False, update_response
@lib_app.command("index") @lib_app.command("index")
def index(library_id: int): def index(library_id: int):
print(f"Indexing library {library_id}") print(f"Indexing library {library_id}")
@ -314,6 +374,7 @@ def index(library_id: int):
entities_response = httpx.get( entities_response = httpx.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities", f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities",
params={"limit": 200, "offset": offset}, params={"limit": 200, "offset": offset},
timeout=60,
) )
if entities_response.status_code != 200: if entities_response.status_code != 200:
tqdm.write( tqdm.write(
@ -345,6 +406,7 @@ def index(library_id: int):
index_response = httpx.get( index_response = httpx.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/index", f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/index",
params={"limit": 200, "offset": offset}, params={"limit": 200, "offset": offset},
timeout=60,
) )
if index_response.status_code != 200: if index_response.status_code != 200:
tqdm.write( tqdm.write(