feat: add http timeout

This commit is contained in:
arkohut 2024-08-08 20:05:14 +08:00
parent 75222d990d
commit 0fff358e4d
2 changed files with 24 additions and 19 deletions

View File

@ -131,13 +131,13 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
added_file_count = 0 added_file_count = 0
scanned_files = set() scanned_files = set()
semaphore = asyncio.Semaphore(8) semaphore = asyncio.Semaphore(8)
async with httpx.AsyncClient() as client: async with httpx.AsyncClient(timeout=60) as client:
tasks = [] tasks = []
for root, _, files in os.walk(folder_path): for root, _, files in os.walk(folder_path):
with tqdm( with tqdm(
total=len(files), desc=f"Scanning {folder_path}", leave=False total=len(files), desc=f"Scanning {folder_path}", leave=False
) as pbar: ) as pbar:
condidate_files = [] candidate_files = []
for file in files: for file in files:
file_path = Path(root) / file file_path = Path(root) / file
absolute_file_path = file_path.resolve() # Get absolute path absolute_file_path = file_path.resolve() # Get absolute path
@ -145,11 +145,11 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
continue continue
scanned_files.add(str(absolute_file_path)) scanned_files.add(str(absolute_file_path))
condidate_files.append(str(absolute_file_path)) candidate_files.append(str(absolute_file_path))
batching = 200 batching = 100
for i in range(0, len(condidate_files), batching): for i in range(0, len(candidate_files), batching):
batch = condidate_files[i : i + batching] batch = candidate_files[i : i + batching]
# Get batch of entities # Get batch of entities
get_response = await client.post( get_response = await client.post(
@ -208,7 +208,6 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
or existing_created_at != new_created_at or existing_created_at != new_created_at
or existing_modified_at != new_modified_at or existing_modified_at != new_modified_at
): ):
# Update the existing entity
tasks.append( tasks.append(
update_entity( update_entity(
client, client,
@ -219,7 +218,6 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
) )
) )
else: else:
# Add the new entity
tasks.append( tasks.append(
add_entity( add_entity(
client, semaphore, library_id, plugins, new_entity client, semaphore, library_id, plugins, new_entity
@ -227,6 +225,7 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
) )
pbar.update(len(batch)) pbar.update(len(batch))
# Process all tasks after they've been created
for future in tqdm( for future in tqdm(
asyncio.as_completed(tasks), asyncio.as_completed(tasks),
desc=f"Processing {folder_path}", desc=f"Processing {folder_path}",
@ -248,9 +247,9 @@ async def loop_files(library_id, folder, folder_path, force, plugins):
tqdm.write(f"Updated file in library: {file_path}") tqdm.write(f"Updated file in library: {file_path}")
else: else:
error_message = "Failed to update file" error_message = "Failed to update file"
if hasattr(response, 'status_code'): if hasattr(response, "status_code"):
error_message += f": {response.status_code}" error_message += f": {response.status_code}"
elif hasattr(response, 'text'): elif hasattr(response, "text"):
error_message += f" - {response.text}" error_message += f" - {response.text}"
else: else:
error_message += f" - Unknown error occurred" error_message += f" - Unknown error occurred"
@ -499,7 +498,9 @@ def index(
batch = entities[i : i + batch_size] batch = entities[i : i + batch_size]
entity_ids = [entity["id"] for entity in batch] entity_ids = [entity["id"] for entity in batch]
index_response = httpx.post( index_response = httpx.post(
f"{BASE_URL}/entities/batch-index", json=entity_ids f"{BASE_URL}/entities/batch-index",
json=entity_ids,
timeout=60,
) )
if index_response.status_code == 204: if index_response.status_code == 204:
pbar.write(f"Indexed batch of {len(batch)} entities") pbar.write(f"Indexed batch of {len(batch)} entities")

View File

@ -60,7 +60,9 @@ class EntityModel(Base):
file_last_modified_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) file_last_modified_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
file_type: Mapped[str] = mapped_column(String, nullable=False) file_type: Mapped[str] = mapped_column(String, nullable=False)
file_type_group: Mapped[str] = mapped_column(String, nullable=False) file_type_group: Mapped[str] = mapped_column(String, nullable=False)
last_scan_at: Mapped[datetime | None] = mapped_column(DateTime, onupdate=func.now(), nullable=True) last_scan_at: Mapped[datetime | None] = mapped_column(
DateTime, server_default=func.now(), onupdate=func.now(), nullable=True
)
library_id: Mapped[int] = mapped_column( library_id: Mapped[int] = mapped_column(
Integer, ForeignKey("libraries.id"), nullable=False Integer, ForeignKey("libraries.id"), nullable=False
) )
@ -73,15 +75,17 @@ class EntityModel(Base):
metadata_entries: Mapped[List["EntityMetadataModel"]] = relationship( metadata_entries: Mapped[List["EntityMetadataModel"]] = relationship(
"EntityMetadataModel", lazy="joined" "EntityMetadataModel", lazy="joined"
) )
tags: Mapped[List["TagModel"]] = relationship("TagModel", secondary="entity_tags", lazy="joined") tags: Mapped[List["TagModel"]] = relationship(
"TagModel", secondary="entity_tags", lazy="joined"
)
# 添加索引 # 添加索引
__table_args__ = ( __table_args__ = (
Index('idx_filepath', 'filepath'), Index("idx_filepath", "filepath"),
Index('idx_filename', 'filename'), Index("idx_filename", "filename"),
Index('idx_file_type', 'file_type'), Index("idx_file_type", "file_type"),
Index('idx_library_id', 'library_id'), Index("idx_library_id", "library_id"),
Index('idx_folder_id', 'folder_id'), Index("idx_folder_id", "folder_id"),
) )
@ -136,4 +140,4 @@ class LibraryPluginModel(Base):
# Create the database engine with the path from config # Create the database engine with the path from config
engine = create_engine(f"sqlite:///{get_database_path()}") engine = create_engine(f"sqlite:///{get_database_path()}")
Base.metadata.create_all(engine) Base.metadata.create_all(engine)