mirror of
https://github.com/tcsenpai/pensieve.git
synced 2025-06-06 19:25:24 +00:00
feat: support extract raw image from video
This commit is contained in:
parent
615a938e5c
commit
016264f13f
@ -3,34 +3,42 @@ import json
|
||||
import argparse
|
||||
from PIL import Image, PngImagePlugin
|
||||
|
||||
|
||||
def read_metadata(image_path):
|
||||
try:
|
||||
img = Image.open(image_path)
|
||||
exif_data = img.info.get('exif')
|
||||
exif_data = img.info.get("exif")
|
||||
png_info = img.info if isinstance(img, PngImagePlugin.PngImageFile) else None
|
||||
|
||||
if not exif_data and not png_info:
|
||||
print("No EXIF or PNG metadata found.")
|
||||
return
|
||||
return None
|
||||
|
||||
metadata = {}
|
||||
|
||||
if exif_data:
|
||||
exif_dict = piexif.load(exif_data)
|
||||
metadata_json = exif_dict["0th"].get(piexif.ImageIFD.ImageDescription)
|
||||
if metadata_json:
|
||||
metadata = json.loads(metadata_json.decode())
|
||||
print("EXIF Metadata:", json.dumps(metadata, indent=4))
|
||||
metadata["exif"] = json.loads(metadata_json.decode())
|
||||
print("EXIF Metadata:", json.dumps(metadata["exif"], indent=4))
|
||||
else:
|
||||
print("No metadata found in the ImageDescription field of EXIF.")
|
||||
|
||||
if png_info:
|
||||
metadata_json = png_info.get("Description")
|
||||
if metadata_json:
|
||||
metadata = json.loads(metadata_json)
|
||||
print("PNG Metadata:", json.dumps(metadata, indent=4))
|
||||
metadata["png"] = json.loads(metadata_json)
|
||||
print("PNG Metadata:", json.dumps(metadata["png"], indent=4))
|
||||
else:
|
||||
print("No metadata found in the Description field of PNG.")
|
||||
|
||||
return metadata if metadata else None
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Read metadata from a screenshot")
|
||||
@ -39,5 +47,6 @@ def main():
|
||||
|
||||
read_metadata(args.image_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -13,6 +13,9 @@ from typing import List, Annotated
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
import logging # Import logging module
|
||||
import cv2
|
||||
from PIL import Image
|
||||
from .read_metadata import read_metadata
|
||||
|
||||
import typesense
|
||||
|
||||
@ -454,7 +457,9 @@ async def search_entities(
|
||||
end: int = None,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
library_ids = [int(id) for id in library_ids.split(",") if id] if library_ids else None
|
||||
library_ids = (
|
||||
[int(id) for id in library_ids.split(",") if id] if library_ids else None
|
||||
)
|
||||
folder_ids = [int(id) for id in folder_ids.split(",") if id] if folder_ids else None
|
||||
try:
|
||||
return indexing.search_entities(
|
||||
@ -567,6 +572,72 @@ def add_library_plugin(
|
||||
crud.add_plugin_to_library(library_id, new_plugin.plugin_id, db)
|
||||
|
||||
|
||||
def is_image(file_path: Path) -> bool:
|
||||
return file_path.suffix.lower() in [".png", ".jpg", ".jpeg"]
|
||||
|
||||
|
||||
def get_thumbnail_info(metadata: dict) -> tuple:
|
||||
if not metadata:
|
||||
return None, None, None
|
||||
|
||||
meta = metadata.get("exif", {}) or metadata.get("png", {})
|
||||
if not meta.get("sequence"):
|
||||
return None, None, False
|
||||
|
||||
return meta.get("screen_name"), meta.get("sequence"), True
|
||||
|
||||
|
||||
def extract_video_frame(video_path: Path, frame_number: int) -> Image.Image:
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
ret, frame = cap.read()
|
||||
cap.release()
|
||||
|
||||
if not ret:
|
||||
return None
|
||||
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
return Image.fromarray(frame_rgb)
|
||||
|
||||
|
||||
@app.get("/files/video/{file_path:path}", tags=["files"])
|
||||
async def get_video_frame(file_path: str):
|
||||
|
||||
full_path = Path("/") / file_path.strip("/")
|
||||
|
||||
if not full_path.is_file():
|
||||
raise HTTPException(status_code=404, detail="File not found")
|
||||
|
||||
if not is_image(full_path):
|
||||
return FileResponse(full_path)
|
||||
|
||||
metadata = read_metadata(str(full_path))
|
||||
screen, sequence, is_thumbnail = get_thumbnail_info(metadata)
|
||||
|
||||
print(screen, sequence, is_thumbnail)
|
||||
|
||||
if not all([screen, sequence, is_thumbnail]):
|
||||
return FileResponse(full_path)
|
||||
|
||||
video_path = full_path.parent / f"{screen}.mp4"
|
||||
print(video_path)
|
||||
if not video_path.is_file():
|
||||
return FileResponse(full_path)
|
||||
|
||||
frame_image = extract_video_frame(video_path, sequence)
|
||||
if frame_image is None:
|
||||
return FileResponse(full_path)
|
||||
|
||||
temp_dir = Path("/tmp")
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
temp_path = temp_dir / f"temp_{full_path.name}"
|
||||
frame_image.save(temp_path)
|
||||
|
||||
return FileResponse(
|
||||
temp_path, headers={"Content-Disposition": f"inline; filename={full_path.name}"}
|
||||
)
|
||||
|
||||
|
||||
@app.get("/files/{file_path:path}", tags=["files"])
|
||||
async def get_file(file_path: str):
|
||||
full_path = Path("/") / file_path.strip("/")
|
||||
|
@ -10,33 +10,35 @@ from PIL.PngImagePlugin import PngInfo
|
||||
from multiprocessing import Pool, Manager
|
||||
from tqdm import tqdm
|
||||
|
||||
parser = argparse.ArgumentParser(description='Compress and save image(s) with metadata')
|
||||
parser.add_argument('path', type=str, help='path to the directory or image file')
|
||||
parser = argparse.ArgumentParser(description="Compress and save image(s) with metadata")
|
||||
parser.add_argument("path", type=str, help="path to the directory or image file")
|
||||
args = parser.parse_args()
|
||||
input_path = args.path.rstrip('/')
|
||||
input_path = args.path.rstrip("/")
|
||||
|
||||
|
||||
def compress_and_save_image(image_path, order):
|
||||
# Open the image
|
||||
img = Image.open(image_path)
|
||||
|
||||
if image_path.endswith(('.jpg', '.jpeg', '.tiff')):
|
||||
if image_path.endswith((".jpg", ".jpeg", ".tiff")):
|
||||
# Add order to the image metadata for JPEG/TIFF
|
||||
exif_dict = piexif.load(image_path)
|
||||
existing_description = exif_dict["0th"].get(piexif.ImageIFD.ImageDescription, b'{}')
|
||||
existing_description = exif_dict["0th"].get(
|
||||
piexif.ImageIFD.ImageDescription, b"{}"
|
||||
)
|
||||
try:
|
||||
existing_data = json.loads(existing_description.decode('utf-8'))
|
||||
existing_data = json.loads(existing_description.decode("utf-8"))
|
||||
except json.JSONDecodeError:
|
||||
existing_data = {}
|
||||
existing_data["sequence"] = order
|
||||
existing_data["is_thumbnail"] = True
|
||||
updated_description = json.dumps(existing_data).encode('utf-8')
|
||||
updated_description = json.dumps(existing_data).encode("utf-8")
|
||||
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = updated_description
|
||||
exif_bytes = piexif.dump(exif_dict)
|
||||
elif image_path.endswith('.png'):
|
||||
elif image_path.endswith(".png"):
|
||||
# Add order to the image metadata for PNG
|
||||
metadata = PngInfo()
|
||||
existing_description = img.info.get("Description", '{}')
|
||||
existing_description = img.info.get("Description", "{}")
|
||||
try:
|
||||
existing_data = json.loads(existing_description)
|
||||
except json.JSONDecodeError:
|
||||
@ -51,7 +53,7 @@ def compress_and_save_image(image_path, order):
|
||||
|
||||
# Compress the image
|
||||
img = img.convert("RGB")
|
||||
if image_path.endswith('.png'):
|
||||
if image_path.endswith(".png"):
|
||||
img.save(image_path, "PNG", optimize=True, pnginfo=metadata)
|
||||
else:
|
||||
img.save(image_path, "JPEG", quality=30) # Lower quality for higher compression
|
||||
@ -59,12 +61,12 @@ def compress_and_save_image(image_path, order):
|
||||
# Resize the image proportionally
|
||||
max_size = (960, 960) # Define the maximum size for the thumbnail
|
||||
img.thumbnail(max_size)
|
||||
if image_path.endswith('.png'):
|
||||
if image_path.endswith(".png"):
|
||||
img.save(image_path, "PNG", optimize=True, pnginfo=metadata)
|
||||
else:
|
||||
img.save(image_path, "JPEG", quality=30) # Lower quality for higher compression
|
||||
|
||||
if image_path.endswith(('.jpg', '.jpeg', '.tiff')):
|
||||
if image_path.endswith((".jpg", ".jpeg", ".tiff")):
|
||||
# Insert updated EXIF data for JPEG/TIFF
|
||||
piexif.insert(exif_bytes, image_path)
|
||||
|
||||
@ -73,9 +75,13 @@ def compress_and_save_image(image_path, order):
|
||||
|
||||
def process_image(args):
|
||||
filename, screens = args
|
||||
if filename.endswith(('.jpg', '.png')): # consider files with .jpg or .png extension
|
||||
parts = filename.split('-of-') # split the file name at the "-of-" string
|
||||
display_name = parts[-1].rsplit('.', 1)[0] # get the last part and remove the extension
|
||||
if filename.endswith(
|
||||
(".jpg", ".png")
|
||||
): # consider files with .jpg or .png extension
|
||||
parts = filename.split("-of-") # split the file name at the "-of-" string
|
||||
display_name = parts[-1].rsplit(".", 1)[
|
||||
0
|
||||
] # get the last part and remove the extension
|
||||
screens.append(display_name) # add the display name to the set of screens
|
||||
|
||||
# call the function with the filename of the image
|
||||
@ -87,15 +93,27 @@ def process_directory(directory):
|
||||
with Manager() as manager:
|
||||
screens = manager.list()
|
||||
with Pool(min(8, os.cpu_count())) as p:
|
||||
list(tqdm(p.imap(process_image, [(filename, screens) for filename in os.listdir(directory)]), total=len(os.listdir(directory))))
|
||||
list(
|
||||
tqdm(
|
||||
p.imap(
|
||||
process_image,
|
||||
[(filename, screens) for filename in os.listdir(directory)],
|
||||
),
|
||||
total=len(os.listdir(directory)),
|
||||
)
|
||||
)
|
||||
|
||||
screens = set(screens)
|
||||
print(screens)
|
||||
|
||||
for screen in screens:
|
||||
# Check if there are jpg or png files for the screen
|
||||
jpg_files = [f for f in os.listdir(directory) if f.endswith('.jpg') and screen in f]
|
||||
png_files = [f for f in os.listdir(directory) if f.endswith('.png') and screen in f]
|
||||
jpg_files = [
|
||||
f for f in os.listdir(directory) if f.endswith(".jpg") and screen in f
|
||||
]
|
||||
png_files = [
|
||||
f for f in os.listdir(directory) if f.endswith(".png") and screen in f
|
||||
]
|
||||
|
||||
if jpg_files:
|
||||
input_pattern = f"{directory}/*{screen}*.jpg"
|
||||
@ -107,7 +125,7 @@ def process_directory(directory):
|
||||
continue # Skip if no matching files are found
|
||||
|
||||
# Create the frames.txt file
|
||||
with open(f"{directory}/{screen}.frames.txt", 'w') as f:
|
||||
with open(f"{directory}/{screen}.frames.txt", "w") as f:
|
||||
for frame, filename in enumerate(sorted(files)):
|
||||
f.write(f"{frame},{filename}\n")
|
||||
|
||||
@ -115,21 +133,39 @@ def process_directory(directory):
|
||||
command = f"ffmpeg -y -framerate 15 -pattern_type glob -i '{input_pattern}' -c:v libx264 -pix_fmt yuv420p {directory}/{screen}.mp4"
|
||||
|
||||
# Start the process
|
||||
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
|
||||
process = subprocess.Popen(
|
||||
command,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
universal_newlines=True,
|
||||
)
|
||||
|
||||
# Print the output in real-time
|
||||
for line in process.stdout:
|
||||
print(line, end='')
|
||||
print(line, end="")
|
||||
|
||||
# Compress and save all images after video generation
|
||||
for screen in screens:
|
||||
jpg_pattern = f"{directory}/*{screen}*.jpg"
|
||||
png_pattern = f"{directory}/*{screen}*.png"
|
||||
# Check if there are jpg or png files for the screen
|
||||
jpg_files = [
|
||||
f for f in os.listdir(directory) if f.endswith(".jpg") and screen in f
|
||||
]
|
||||
png_files = [
|
||||
f for f in os.listdir(directory) if f.endswith(".png") and screen in f
|
||||
]
|
||||
|
||||
for pattern in [jpg_pattern, png_pattern]:
|
||||
files = glob.glob(pattern)
|
||||
for order, input_path in enumerate(tqdm(files, desc=f"Compressing {screen} images", unit="file")):
|
||||
compress_and_save_image(input_path, order)
|
||||
if jpg_files:
|
||||
files = jpg_files
|
||||
elif png_files:
|
||||
files = png_files
|
||||
else:
|
||||
continue # Skip if no matching files are found
|
||||
|
||||
for frame, filename in enumerate(
|
||||
tqdm(sorted(files), desc=f"Compressing {screen} images", unit="file")
|
||||
):
|
||||
compress_and_save_image(os.path.join(directory, filename), frame)
|
||||
|
||||
# for filename in os.listdir(directory):
|
||||
# if filename.endswith(('.jpg', '.png')):
|
||||
@ -144,5 +180,6 @@ def main():
|
||||
else:
|
||||
print("Invalid path. Please provide a valid directory or file path.")
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
2
setup.py
2
setup.py
@ -25,6 +25,8 @@ setup(
|
||||
'magika',
|
||||
'pydantic-settings',
|
||||
'typesense',
|
||||
'opencv-python',
|
||||
'pillow',
|
||||
],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
|
@ -25,6 +25,10 @@
|
||||
* @type {any}
|
||||
*/
|
||||
export let image;
|
||||
/**
|
||||
* @type {string}
|
||||
*/
|
||||
export let video;
|
||||
/**
|
||||
* @type {string}
|
||||
*/
|
||||
@ -107,7 +111,7 @@
|
||||
<div class="flex flex-col md:flex-row h-full">
|
||||
<!-- Image container -->
|
||||
<div class="flex-none w-full md:w-1/2 h-full">
|
||||
<a href={image} target="_blank" rel="noopener noreferrer">
|
||||
<a href={video} target="_blank" rel="noopener noreferrer">
|
||||
<img class="w-full h-full object-contain" src={image} alt={title} />
|
||||
</a>
|
||||
</div>
|
||||
|
@ -180,6 +180,7 @@
|
||||
library_id={searchResults[selectedImage].library_id}
|
||||
folder_id={searchResults[selectedImage].folder_id}
|
||||
image={`${apiEndpoint}/files/${searchResults[selectedImage].filepath}`}
|
||||
video={`${apiEndpoint}/files/video/${searchResults[selectedImage].filepath}`}
|
||||
created_at={searchResults[selectedImage].file_created_at}
|
||||
filepath={searchResults[selectedImage].filepath}
|
||||
title={filename(searchResults[selectedImage].filepath)}
|
||||
|
Loading…
x
Reference in New Issue
Block a user