feat(indexing): do not repeatly indexing updated entity by last_scan_at

This commit is contained in:
arkohut 2024-08-29 13:57:46 +08:00
parent 439adfa955
commit 9db92908a6

View File

@ -15,6 +15,7 @@ from tabulate import tabulate
from tqdm import tqdm from tqdm import tqdm
from enum import Enum from enum import Enum
from magika import Magika from magika import Magika
from .config import settings
IS_THUMBNAIL = "is_thumbnail" IS_THUMBNAIL = "is_thumbnail"
@ -28,7 +29,7 @@ app.add_typer(lib_app, name="lib")
file_detector = Magika() file_detector = Magika()
BASE_URL = "http://localhost:8080" BASE_URL = f"http://localhost:{settings.server_port}"
ignore_files = [".DS_Store", ".screen_sequences", "worklog"] ignore_files = [".DS_Store", ".screen_sequences", "worklog"]
@ -136,7 +137,7 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
updated_file_count = 0 updated_file_count = 0
added_file_count = 0 added_file_count = 0
scanned_files = set() scanned_files = set()
semaphore = asyncio.Semaphore(8) semaphore = asyncio.Semaphore(4)
async with httpx.AsyncClient(timeout=60) as client: async with httpx.AsyncClient(timeout=60) as client:
tasks = [] tasks = []
for root, _, files in os.walk(folder_path): for root, _, files in os.walk(folder_path):
@ -507,6 +508,34 @@ async def update_entity(
return new_entity["filepath"], FileStatus.UPDATED, False, None return new_entity["filepath"], FileStatus.UPDATED, False, None
async def check_and_index_entity(client, entity_id, entity_last_scan_at):
try:
index_response = await client.get(f"{BASE_URL}/entities/{entity_id}/index")
if index_response.status_code == 200:
index_data = index_response.json()
if index_data["last_scan_at"] is None:
return entity_last_scan_at is not None
index_last_scan_at = datetime.fromtimestamp(index_data["last_scan_at"])
entity_last_scan_at = datetime.fromisoformat(entity_last_scan_at)
if index_last_scan_at >= entity_last_scan_at:
return False # Index is up to date, no need to update
return True # Index doesn't exist or needs update
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return True # Index doesn't exist, need to create
raise # Re-raise other HTTP errors
async def index_batch(client, entity_ids):
index_response = await client.post(
f"{BASE_URL}/entities/batch-index",
json=entity_ids,
timeout=60,
)
return index_response
@lib_app.command("index") @lib_app.command("index")
def index( def index(
library_id: int, library_id: int,
@ -531,96 +560,119 @@ def index(
else: else:
library_folders = library["folders"] library_folders = library["folders"]
# Iterate through folders async def process_folders():
for folder in library_folders: async with httpx.AsyncClient(timeout=60) as client:
tqdm.write(f"Processing folder: {folder['id']}") # Iterate through folders
for folder in library_folders:
tqdm.write(f"Processing folder: {folder['id']}")
# List all entities in the folder # List all entities in the folder
limit = 200 limit = 200
offset = 0 offset = 0
total_entities = 0 # We'll update this after the first request total_entities = 0 # We'll update this after the first request
with tqdm(total=total_entities, desc="Indexing entities", leave=True) as pbar: with tqdm(
while True: total=total_entities, desc="Indexing entities", leave=True
entities_response = httpx.get( ) as pbar:
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities", while True:
params={"limit": limit, "offset": offset}, entities_response = await client.get(
timeout=60, f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities",
) params={"limit": limit, "offset": offset},
if entities_response.status_code != 200:
pbar.write(
f"Failed to get entities: {entities_response.status_code} - {entities_response.text}"
)
break
entities = entities_response.json()
if not entities:
break
# Update total if this is the first request
if offset == 0:
total_entities = int(
entities_response.headers.get("X-Total-Count", total_entities)
)
pbar.total = total_entities
pbar.refresh()
# Index each entity
batch_size = 40
for i in range(0, len(entities), batch_size):
batch = entities[i : i + batch_size]
entity_ids = [entity["id"] for entity in batch]
index_response = httpx.post(
f"{BASE_URL}/entities/batch-index",
json=entity_ids,
timeout=60,
)
if index_response.status_code == 204:
pbar.write(f"Indexed batch of {len(batch)} entities")
else:
pbar.write(
f"Failed to index batch: {index_response.status_code} - {index_response.text}"
) )
if entities_response.status_code != 200:
pbar.write(
f"Failed to get entities: {entities_response.status_code} - {entities_response.text}"
)
break
scanned_entities.update(str(entity["id"]) for entity in batch) entities = entities_response.json()
pbar.update(len(batch)) if not entities:
break
offset += limit # Update total if this is the first request
if offset == 0:
total_entities = int(
entities_response.headers.get(
"X-Total-Count", total_entities
)
)
pbar.total = total_entities
pbar.refresh()
# List all indexed entities in the folder # Index each entity
offset = 0 batch_size = 8
while True: for i in range(0, len(entities), batch_size):
index_response = httpx.get( batch = entities[i : i + batch_size]
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/index", to_index = []
params={"limit": 200, "offset": offset},
timeout=60,
)
if index_response.status_code != 200:
tqdm.write(
f"Failed to get indexed entities: {index_response.status_code} - {index_response.text}"
)
break
indexed_entities = index_response.json() for entity in batch:
if not indexed_entities: needs_indexing = await check_and_index_entity(
break client, entity["id"], entity["last_scan_at"]
)
if needs_indexing:
to_index.append(entity["id"])
# Delete indexes for entities not in scanned_entities if to_index:
for indexed_entity in tqdm( index_response = await index_batch(client, to_index)
indexed_entities, desc="Cleaning up indexes", leave=False if index_response.status_code == 204:
): pbar.write(
if indexed_entity["id"] not in scanned_entities: f"Indexed batch of {len(to_index)} entities"
delete_response = httpx.delete( )
f"{BASE_URL}/entities/{indexed_entity['id']}/index" else:
pbar.write(
f"Failed to index batch: {index_response.status_code} - {index_response.text}"
)
scanned_entities.update(
str(entity["id"]) for entity in batch
)
pbar.update(len(batch))
offset += limit
# List all indexed entities in the folder
offset = 0
print(f"Starting cleanup process for folder {folder['id']}")
while True:
index_response = await client.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/index",
params={"limit": 200, "offset": offset},
) )
if delete_response.status_code == 204: if index_response.status_code != 200:
tqdm.write(f"Deleted index for entity: {indexed_entity['id']}")
else:
tqdm.write( tqdm.write(
f"Failed to delete index for entity {indexed_entity['id']}: {delete_response.status_code} - {delete_response.text}" f"Failed to get indexed entities: {index_response.status_code} - {index_response.text}"
) )
break
offset += 200 indexed_entities = index_response.json()
if not indexed_entities:
print("No more indexed entities to process")
break
# Delete indexes for entities not in scanned_entities
for indexed_entity in tqdm(
indexed_entities, desc="Cleaning up indexes", leave=False
):
if indexed_entity["id"] not in scanned_entities:
tqdm.write(
f"Entity {indexed_entity['id']} not in scanned entities, deleting index"
)
delete_response = await client.delete(
f"{BASE_URL}/entities/{indexed_entity['id']}/index"
)
if delete_response.status_code == 204:
tqdm.write(
f"Deleted index for entity: {indexed_entity['id']}"
)
else:
tqdm.write(
f"Failed to delete index for entity {indexed_entity['id']}: {delete_response.status_code} - {delete_response.text}"
)
offset += 200
print(f"Finished cleanup process for folder {folder['id']}")
asyncio.run(process_folders())
print("Indexing completed") print("Indexing completed")