feat: support reindex

This commit is contained in:
arkohut 2024-10-13 00:40:58 +08:00
parent 974f056dc2
commit 9d1b155f63
4 changed files with 106 additions and 12 deletions

View File

@ -532,6 +532,71 @@ async def update_entity(
return new_entity["filepath"], FileStatus.UPDATED, False, None
@lib_app.command("reindex")
def reindex(
library_id: int,
folders: List[int] = typer.Option(None, "--folder", "-f"),
):
print(f"Reindexing library {library_id}")
# Get the library
response = httpx.get(f"{BASE_URL}/libraries/{library_id}")
if response.status_code != 200:
print(f"Failed to get library: {response.status_code} - {response.text}")
return
library = response.json()
scanned_entities = set()
# Filter folders if the folders parameter is provided
if folders:
library_folders = [
folder for folder in library["folders"] if folder["id"] in folders
]
else:
library_folders = library["folders"]
def process_folders():
with httpx.Client(timeout=60) as client:
# Iterate through folders
for folder in library_folders:
print(f"Processing folder: {folder['id']}")
# List all entities in the folder
limit = 200
offset = 0
while True:
entities_response = client.get(
f"{BASE_URL}/libraries/{library_id}/folders/{folder['id']}/entities",
params={"limit": limit, "offset": offset},
)
if entities_response.status_code != 200:
print(f"Failed to get entities: {entities_response.status_code} - {entities_response.text}")
break
entities = entities_response.json()
if not entities:
break
# Update last_scan_at for each entity
for entity in entities:
if entity["id"] in scanned_entities:
continue
update_response = client.post(f"{BASE_URL}/entities/{entity['id']}/last-scan-at")
if update_response.status_code != 200:
print(f"Failed to update last_scan_at for entity {entity['id']}: {update_response.status_code} - {update_response.text}")
else:
print(f"Updated last_scan_at for entity {entity['id']}")
scanned_entities.add(entity["id"])
offset += limit
process_folders()
print(f"Reindexing completed for library {library_id}")
async def check_and_index_entity(client, entity_id, entity_last_scan_at):
try:
index_response = await client.get(f"{BASE_URL}/entities/{entity_id}/index")
@ -560,8 +625,8 @@ async def index_batch(client, entity_ids):
return index_response
@lib_app.command("index")
def index(
@lib_app.command("typesense-index")
def typesense_index(
library_id: int,
folders: List[int] = typer.Option(None, "--folder", "-f"),
force: bool = typer.Option(False, "--force", help="Force update all indexes"),

View File

@ -290,6 +290,14 @@ def update_entity(
return Entity(**db_entity.__dict__)
def touch_entity(entity_id: int, db: Session):
db_entity = db.query(EntityModel).filter(EntityModel.id == entity_id).first()
if db_entity:
db_entity.last_scan_at = func.now()
db.commit()
db.refresh(db_entity)
def update_entity_tags(entity_id: int, tags: List[str], db: Session) -> Entity:
db_entity = get_entity_by_id(entity_id, db)
if not db_entity:

View File

@ -313,10 +313,10 @@ def update_entity_last_scan_at_for_metadata(mapper, connection, target):
session.commit()
async def update_or_insert_entities_vec(connection, target_id, embedding):
async def update_or_insert_entities_vec(session, target_id, embedding):
try:
# First, try to update the existing row
result = connection.execute(
result = session.execute(
text(
"UPDATE entities_vec SET embedding = :embedding WHERE rowid = :id"
),
@ -328,7 +328,7 @@ async def update_or_insert_entities_vec(connection, target_id, embedding):
# If no row was updated (i.e., the row doesn't exist), then insert a new row
if result.rowcount == 0:
connection.execute(
session.execute(
text(
"INSERT INTO entities_vec (rowid, embedding) VALUES (:id, :embedding)"
),
@ -337,14 +337,17 @@ async def update_or_insert_entities_vec(connection, target_id, embedding):
"embedding": serialize_float32(embedding),
},
)
session.commit()
except Exception as e:
print(f"Error updating entities_vec: {e}")
session.rollback()
def update_or_insert_entities_fts(connection, target_id, filepath, tags, metadata):
def update_or_insert_entities_fts(session, target_id, filepath, tags, metadata):
try:
# First, try to update the existing row
result = connection.execute(
result = session.execute(
text(
"""
UPDATE entities_fts
@ -362,7 +365,7 @@ def update_or_insert_entities_fts(connection, target_id, filepath, tags, metadat
# If no row was updated (i.e., the row doesn't exist), then insert a new row
if result.rowcount == 0:
connection.execute(
session.execute(
text(
"""
INSERT INTO entities_fts(id, filepath, tags, metadata)
@ -376,8 +379,11 @@ def update_or_insert_entities_fts(connection, target_id, filepath, tags, metadat
"metadata": metadata,
},
)
session.commit()
except Exception as e:
print(f"Error updating entities_fts: {e}")
session.rollback()
async def update_fts_and_vec(mapper, connection, target):
@ -411,7 +417,7 @@ async def update_fts_and_vec(mapper, connection, target):
)
# Update FTS table
update_or_insert_entities_fts(connection, target.id, target.filepath, tags, metadata)
update_or_insert_entities_fts(session, target.id, target.filepath, tags, metadata)
# Prepare vector data
metadata_text = "\n".join(
@ -431,9 +437,7 @@ async def update_fts_and_vec(mapper, connection, target):
# Update vector table
if embedding:
await update_or_insert_entities_vec(connection, target.id, embedding)
session.commit()
await update_or_insert_entities_vec(session, target.id, embedding)
def delete_fts_and_vec(mapper, connection, target):

View File

@ -372,6 +372,23 @@ async def update_entity(
return entity
@app.post("/entities/{entity_id}/last-scan-at", response_model=Entity, tags=["entity"])
def update_entity_last_scan_at(
entity_id: int,
db: Session = Depends(get_db)
):
"""
Update the last_scan_at timestamp for an entity and trigger update for fts and vec.
"""
entity = crud.touch_entity(entity_id, db)
if entity is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Entity not found",
)
return entity
def typesense_required(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):