feat(scan): batching check file exists

This commit is contained in:
arkohut 2024-07-26 18:35:39 +08:00
parent badbfd70bc
commit 873034c76b

View File

@ -31,13 +31,14 @@ ignore_files = [".DS_Store"]
# Configure logging
logging.basicConfig(
level=logging.WARNING, # Set the logging level to WARNING or higher
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Optionally, you can set the logging level for specific libraries
logging.getLogger("httpx").setLevel(logging.ERROR)
logging.getLogger("typer").setLevel(logging.ERROR)
class FileStatus(Enum):
UPDATED = "updated"
ADDED = "added"
@ -137,40 +138,68 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
async with httpx.AsyncClient() as client:
tasks = []
for root, _, files in os.walk(folder_path):
with tqdm(total=len(files), desc=f"Scanning {folder_path}", leave=False) as pbar:
with tqdm(
total=len(files), desc=f"Scanning {folder_path}", leave=False
) as pbar:
condidate_files = []
for file in files:
pbar.update(1)
file_path = Path(root) / file
absolute_file_path = file_path.resolve() # Get absolute path
if file in ignore_files:
continue
scanned_files.add(str(absolute_file_path))
file_stat = file_path.stat()
condidate_files.append(str(absolute_file_path))
batching = 200
for i in range(0, len(condidate_files), batching):
batch = condidate_files[i : i + batching]
# Get batch of entities
get_response = await client.post(
f"{BASE_URL}/libraries/{library_id}/entities/by-filepaths",
json=batch,
)
if get_response.status_code == 200:
existing_entities = get_response.json()
else:
print(
f"Failed to get entities: {get_response.status_code} - {get_response.text}"
)
continue
existing_entities_dict = {
entity["filepath"]: entity for entity in existing_entities
}
for file_path in batch:
absolute_file_path = Path(file_path).resolve()
file_stat = absolute_file_path.stat()
file_type, file_type_group = get_file_type(absolute_file_path)
new_entity = {
"filename": file_path.name,
"filepath": str(absolute_file_path), # Save absolute path
"filename": absolute_file_path.name,
"filepath": str(absolute_file_path),
"size": file_stat.st_size,
"file_created_at": format_timestamp(file_stat.st_ctime),
"file_last_modified_at": format_timestamp(file_stat.st_mtime),
"file_last_modified_at": format_timestamp(
file_stat.st_mtime
),
"file_type": file_type,
"file_type_group": file_type_group,
"folder_id": folder["id"],
}
# Check if the entity already exists
get_response = await client.get(
f"{BASE_URL}/libraries/{library_id}/entities/by-filepath",
params={
"filepath": str(absolute_file_path)
}, # Use relative path
existing_entity = existing_entities_dict.get(
str(absolute_file_path)
)
if get_response.status_code == 200:
existing_entity = get_response.json()
if existing_entity:
existing_created_at = format_timestamp(
existing_entity["file_created_at"]
)
new_created_at = format_timestamp(new_entity["file_created_at"])
new_created_at = format_timestamp(
new_entity["file_created_at"]
)
existing_modified_at = format_timestamp(
existing_entity["file_last_modified_at"]
)
@ -200,6 +229,7 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
client, semaphore, library_id, plugins, new_entity
)
)
pbar.update(len(batch))
for future in tqdm(
asyncio.as_completed(tasks),
@ -267,10 +297,10 @@ def scan(
total_files_updated += updated_file_count
# Check for deleted files
limit = 200
limit = 100
offset = 0
total_files = float('inf') # We'll update this after the first request
with tqdm(total=total_files, desc="Checking for deleted files", unit="file") as pbar:
total_entities = 0 # We'll update this after the first request
with tqdm(total=total_entities, desc="Checking for deleted files", leave=True) as pbar2:
while True:
existing_files_response = httpx.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities",
@ -278,7 +308,7 @@ def scan(
timeout=60,
)
if existing_files_response.status_code != 200:
tqdm.write(
pbar2.write(
f"Failed to retrieve existing files: {existing_files_response.status_code} - {existing_files_response.text}"
)
break
@ -289,9 +319,13 @@ def scan(
# Update total if this is the first request
if offset == 0:
total_files = int(existing_files_response.headers.get('X-Total-Count', total_files))
pbar.total = total_files
pbar.refresh()
total_entities = int(
existing_files_response.headers.get(
"X-Total-Count", total_entities
)
)
pbar2.total = total_entities
pbar2.refresh()
for existing_file in existing_files:
if existing_file["filepath"] not in scanned_files:
@ -300,16 +334,15 @@ def scan(
f"{BASE_URL}/libraries/{library_id}/entities/{existing_file['id']}"
)
if 200 <= delete_response.status_code < 300:
tqdm.write(
pbar2.write(
f"Deleted file from library: {existing_file['filepath']}"
)
total_files_deleted += 1
else:
tqdm.write(
pbar2.write(
f"Failed to delete file: {delete_response.status_code} - {delete_response.text}"
)
pbar.update(1)
pbar.refresh()
pbar2.update(1)
offset += limit