diff --git a/screen-recorder/read_metadata.py b/memos/read_metadata.py similarity index 71% rename from screen-recorder/read_metadata.py rename to memos/read_metadata.py index 68ed529..cbb1386 100644 --- a/screen-recorder/read_metadata.py +++ b/memos/read_metadata.py @@ -3,34 +3,42 @@ import json import argparse from PIL import Image, PngImagePlugin + def read_metadata(image_path): try: img = Image.open(image_path) - exif_data = img.info.get('exif') + exif_data = img.info.get("exif") png_info = img.info if isinstance(img, PngImagePlugin.PngImageFile) else None if not exif_data and not png_info: print("No EXIF or PNG metadata found.") - return + return None + + metadata = {} if exif_data: exif_dict = piexif.load(exif_data) metadata_json = exif_dict["0th"].get(piexif.ImageIFD.ImageDescription) if metadata_json: - metadata = json.loads(metadata_json.decode()) - print("EXIF Metadata:", json.dumps(metadata, indent=4)) + metadata["exif"] = json.loads(metadata_json.decode()) + print("EXIF Metadata:", json.dumps(metadata["exif"], indent=4)) else: print("No metadata found in the ImageDescription field of EXIF.") if png_info: metadata_json = png_info.get("Description") if metadata_json: - metadata = json.loads(metadata_json) - print("PNG Metadata:", json.dumps(metadata, indent=4)) + metadata["png"] = json.loads(metadata_json) + print("PNG Metadata:", json.dumps(metadata["png"], indent=4)) else: print("No metadata found in the Description field of PNG.") + + return metadata if metadata else None + except Exception as e: print(f"An error occurred: {str(e)}") + return None + def main(): parser = argparse.ArgumentParser(description="Read metadata from a screenshot") @@ -39,5 +47,6 @@ def main(): read_metadata(args.image_path) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/memos/server.py b/memos/server.py index 8478b8a..6156162 100644 --- a/memos/server.py +++ b/memos/server.py @@ -13,6 +13,9 @@ from typing import List, Annotated from pathlib import Path import asyncio import logging # Import logging module +import cv2 +from PIL import Image +from .read_metadata import read_metadata import typesense @@ -454,7 +457,9 @@ async def search_entities( end: int = None, db: Session = Depends(get_db), ): - library_ids = [int(id) for id in library_ids.split(",") if id] if library_ids else None + library_ids = ( + [int(id) for id in library_ids.split(",") if id] if library_ids else None + ) folder_ids = [int(id) for id in folder_ids.split(",") if id] if folder_ids else None try: return indexing.search_entities( @@ -567,6 +572,72 @@ def add_library_plugin( crud.add_plugin_to_library(library_id, new_plugin.plugin_id, db) +def is_image(file_path: Path) -> bool: + return file_path.suffix.lower() in [".png", ".jpg", ".jpeg"] + + +def get_thumbnail_info(metadata: dict) -> tuple: + if not metadata: + return None, None, None + + meta = metadata.get("exif", {}) or metadata.get("png", {}) + if not meta.get("sequence"): + return None, None, False + + return meta.get("screen_name"), meta.get("sequence"), True + + +def extract_video_frame(video_path: Path, frame_number: int) -> Image.Image: + cap = cv2.VideoCapture(str(video_path)) + cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) + ret, frame = cap.read() + cap.release() + + if not ret: + return None + + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + return Image.fromarray(frame_rgb) + + +@app.get("/files/video/{file_path:path}", tags=["files"]) +async def get_video_frame(file_path: str): + + full_path = Path("/") / file_path.strip("/") + + if not full_path.is_file(): + raise HTTPException(status_code=404, detail="File not found") + + if not is_image(full_path): + return FileResponse(full_path) + + metadata = read_metadata(str(full_path)) + screen, sequence, is_thumbnail = get_thumbnail_info(metadata) + + print(screen, sequence, is_thumbnail) + + if not all([screen, sequence, is_thumbnail]): + return FileResponse(full_path) + + video_path = full_path.parent / f"{screen}.mp4" + print(video_path) + if not video_path.is_file(): + return FileResponse(full_path) + + frame_image = extract_video_frame(video_path, sequence) + if frame_image is None: + return FileResponse(full_path) + + temp_dir = Path("/tmp") + temp_dir.mkdir(parents=True, exist_ok=True) + temp_path = temp_dir / f"temp_{full_path.name}" + frame_image.save(temp_path) + + return FileResponse( + temp_path, headers={"Content-Disposition": f"inline; filename={full_path.name}"} + ) + + @app.get("/files/{file_path:path}", tags=["files"]) async def get_file(file_path: str): full_path = Path("/") / file_path.strip("/") diff --git a/screen-recorder/video_generator.py b/screen-recorder/video_generator.py index 8e5108e..c678d47 100644 --- a/screen-recorder/video_generator.py +++ b/screen-recorder/video_generator.py @@ -10,33 +10,35 @@ from PIL.PngImagePlugin import PngInfo from multiprocessing import Pool, Manager from tqdm import tqdm -parser = argparse.ArgumentParser(description='Compress and save image(s) with metadata') -parser.add_argument('path', type=str, help='path to the directory or image file') +parser = argparse.ArgumentParser(description="Compress and save image(s) with metadata") +parser.add_argument("path", type=str, help="path to the directory or image file") args = parser.parse_args() -input_path = args.path.rstrip('/') +input_path = args.path.rstrip("/") def compress_and_save_image(image_path, order): # Open the image img = Image.open(image_path) - - if image_path.endswith(('.jpg', '.jpeg', '.tiff')): + + if image_path.endswith((".jpg", ".jpeg", ".tiff")): # Add order to the image metadata for JPEG/TIFF exif_dict = piexif.load(image_path) - existing_description = exif_dict["0th"].get(piexif.ImageIFD.ImageDescription, b'{}') + existing_description = exif_dict["0th"].get( + piexif.ImageIFD.ImageDescription, b"{}" + ) try: - existing_data = json.loads(existing_description.decode('utf-8')) + existing_data = json.loads(existing_description.decode("utf-8")) except json.JSONDecodeError: existing_data = {} existing_data["sequence"] = order existing_data["is_thumbnail"] = True - updated_description = json.dumps(existing_data).encode('utf-8') + updated_description = json.dumps(existing_data).encode("utf-8") exif_dict["0th"][piexif.ImageIFD.ImageDescription] = updated_description exif_bytes = piexif.dump(exif_dict) - elif image_path.endswith('.png'): + elif image_path.endswith(".png"): # Add order to the image metadata for PNG metadata = PngInfo() - existing_description = img.info.get("Description", '{}') + existing_description = img.info.get("Description", "{}") try: existing_data = json.loads(existing_description) except json.JSONDecodeError: @@ -51,31 +53,35 @@ def compress_and_save_image(image_path, order): # Compress the image img = img.convert("RGB") - if image_path.endswith('.png'): + if image_path.endswith(".png"): img.save(image_path, "PNG", optimize=True, pnginfo=metadata) else: img.save(image_path, "JPEG", quality=30) # Lower quality for higher compression - + # Resize the image proportionally max_size = (960, 960) # Define the maximum size for the thumbnail img.thumbnail(max_size) - if image_path.endswith('.png'): + if image_path.endswith(".png"): img.save(image_path, "PNG", optimize=True, pnginfo=metadata) else: img.save(image_path, "JPEG", quality=30) # Lower quality for higher compression - - if image_path.endswith(('.jpg', '.jpeg', '.tiff')): + + if image_path.endswith((".jpg", ".jpeg", ".tiff")): # Insert updated EXIF data for JPEG/TIFF piexif.insert(exif_bytes, image_path) - + return image_path def process_image(args): filename, screens = args - if filename.endswith(('.jpg', '.png')): # consider files with .jpg or .png extension - parts = filename.split('-of-') # split the file name at the "-of-" string - display_name = parts[-1].rsplit('.', 1)[0] # get the last part and remove the extension + if filename.endswith( + (".jpg", ".png") + ): # consider files with .jpg or .png extension + parts = filename.split("-of-") # split the file name at the "-of-" string + display_name = parts[-1].rsplit(".", 1)[ + 0 + ] # get the last part and remove the extension screens.append(display_name) # add the display name to the set of screens # call the function with the filename of the image @@ -87,15 +93,27 @@ def process_directory(directory): with Manager() as manager: screens = manager.list() with Pool(min(8, os.cpu_count())) as p: - list(tqdm(p.imap(process_image, [(filename, screens) for filename in os.listdir(directory)]), total=len(os.listdir(directory)))) + list( + tqdm( + p.imap( + process_image, + [(filename, screens) for filename in os.listdir(directory)], + ), + total=len(os.listdir(directory)), + ) + ) screens = set(screens) print(screens) for screen in screens: # Check if there are jpg or png files for the screen - jpg_files = [f for f in os.listdir(directory) if f.endswith('.jpg') and screen in f] - png_files = [f for f in os.listdir(directory) if f.endswith('.png') and screen in f] + jpg_files = [ + f for f in os.listdir(directory) if f.endswith(".jpg") and screen in f + ] + png_files = [ + f for f in os.listdir(directory) if f.endswith(".png") and screen in f + ] if jpg_files: input_pattern = f"{directory}/*{screen}*.jpg" @@ -107,7 +125,7 @@ def process_directory(directory): continue # Skip if no matching files are found # Create the frames.txt file - with open(f"{directory}/{screen}.frames.txt", 'w') as f: + with open(f"{directory}/{screen}.frames.txt", "w") as f: for frame, filename in enumerate(sorted(files)): f.write(f"{frame},{filename}\n") @@ -115,21 +133,39 @@ def process_directory(directory): command = f"ffmpeg -y -framerate 15 -pattern_type glob -i '{input_pattern}' -c:v libx264 -pix_fmt yuv420p {directory}/{screen}.mp4" # Start the process - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + ) # Print the output in real-time for line in process.stdout: - print(line, end='') + print(line, end="") # Compress and save all images after video generation for screen in screens: - jpg_pattern = f"{directory}/*{screen}*.jpg" - png_pattern = f"{directory}/*{screen}*.png" - - for pattern in [jpg_pattern, png_pattern]: - files = glob.glob(pattern) - for order, input_path in enumerate(tqdm(files, desc=f"Compressing {screen} images", unit="file")): - compress_and_save_image(input_path, order) + # Check if there are jpg or png files for the screen + jpg_files = [ + f for f in os.listdir(directory) if f.endswith(".jpg") and screen in f + ] + png_files = [ + f for f in os.listdir(directory) if f.endswith(".png") and screen in f + ] + + if jpg_files: + files = jpg_files + elif png_files: + files = png_files + else: + continue # Skip if no matching files are found + + for frame, filename in enumerate( + tqdm(sorted(files), desc=f"Compressing {screen} images", unit="file") + ): + compress_and_save_image(os.path.join(directory, filename), frame) # for filename in os.listdir(directory): # if filename.endswith(('.jpg', '.png')): @@ -144,5 +180,6 @@ def main(): else: print("Invalid path. Please provide a valid directory or file path.") -if __name__ == '__main__': - main() \ No newline at end of file + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index be44a19..9fa395c 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,8 @@ setup( 'magika', 'pydantic-settings', 'typesense', + 'opencv-python', + 'pillow', ], entry_points={ 'console_scripts': [ @@ -32,4 +34,4 @@ setup( ], }, python_requires='>=3.10', -) +) \ No newline at end of file diff --git a/web/src/lib/Figure.svelte b/web/src/lib/Figure.svelte index 0388246..5933b83 100644 --- a/web/src/lib/Figure.svelte +++ b/web/src/lib/Figure.svelte @@ -25,6 +25,10 @@ * @type {any} */ export let image; + /** + * @type {string} + */ + export let video; /** * @type {string} */ @@ -107,7 +111,7 @@
- + {title}
diff --git a/web/src/routes/+page.svelte b/web/src/routes/+page.svelte index 23bc630..27aa558 100644 --- a/web/src/routes/+page.svelte +++ b/web/src/routes/+page.svelte @@ -180,6 +180,7 @@ library_id={searchResults[selectedImage].library_id} folder_id={searchResults[selectedImage].folder_id} image={`${apiEndpoint}/files/${searchResults[selectedImage].filepath}`} + video={`${apiEndpoint}/files/video/${searchResults[selectedImage].filepath}`} created_at={searchResults[selectedImage].file_created_at} filepath={searchResults[selectedImage].filepath} title={filename(searchResults[selectedImage].filepath)}