Merge pull request #1415 from Mehrab-Shahbazi/master

video path is enabled in stream
This commit is contained in:
Sefik Ilkin Serengil 2025-01-06 09:26:35 +00:00 committed by GitHub
commit c465234788
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 110 additions and 76 deletions

View File

@ -68,18 +68,18 @@ def build_model(model_name: str, task: str = "facial_recognition") -> Any:
def verify( def verify(
img1_path: Union[str, np.ndarray, List[float]], img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]], img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face", model_name: str = "VGG-Face",
detector_backend: str = "opencv", detector_backend: str = "opencv",
distance_metric: str = "cosine", distance_metric: str = "cosine",
enforce_detection: bool = True, enforce_detection: bool = True,
align: bool = True, align: bool = True,
expand_percentage: int = 0, expand_percentage: int = 0,
normalization: str = "base", normalization: str = "base",
silent: bool = False, silent: bool = False,
threshold: Optional[float] = None, threshold: Optional[float] = None,
anti_spoofing: bool = False, anti_spoofing: bool = False,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Verify if an image pair represents the same person or different persons. Verify if an image pair represents the same person or different persons.
@ -164,14 +164,14 @@ def verify(
def analyze( def analyze(
img_path: Union[str, np.ndarray], img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"), actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True, enforce_detection: bool = True,
detector_backend: str = "opencv", detector_backend: str = "opencv",
align: bool = True, align: bool = True,
expand_percentage: int = 0, expand_percentage: int = 0,
silent: bool = False, silent: bool = False,
anti_spoofing: bool = False, anti_spoofing: bool = False,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Analyze facial attributes such as age, gender, emotion, and race in the provided image. Analyze facial attributes such as age, gender, emotion, and race in the provided image.
@ -263,20 +263,20 @@ def analyze(
def find( def find(
img_path: Union[str, np.ndarray], img_path: Union[str, np.ndarray],
db_path: str, db_path: str,
model_name: str = "VGG-Face", model_name: str = "VGG-Face",
distance_metric: str = "cosine", distance_metric: str = "cosine",
enforce_detection: bool = True, enforce_detection: bool = True,
detector_backend: str = "opencv", detector_backend: str = "opencv",
align: bool = True, align: bool = True,
expand_percentage: int = 0, expand_percentage: int = 0,
threshold: Optional[float] = None, threshold: Optional[float] = None,
normalization: str = "base", normalization: str = "base",
silent: bool = False, silent: bool = False,
refresh_database: bool = True, refresh_database: bool = True,
anti_spoofing: bool = False, anti_spoofing: bool = False,
batched: bool = False, batched: bool = False,
) -> Union[List[pd.DataFrame], List[List[Dict[str, Any]]]]: ) -> Union[List[pd.DataFrame], List[List[Dict[str, Any]]]]:
""" """
Identify individuals in a database Identify individuals in a database
@ -369,15 +369,15 @@ def find(
def represent( def represent(
img_path: Union[str, np.ndarray], img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face", model_name: str = "VGG-Face",
enforce_detection: bool = True, enforce_detection: bool = True,
detector_backend: str = "opencv", detector_backend: str = "opencv",
align: bool = True, align: bool = True,
expand_percentage: int = 0, expand_percentage: int = 0,
normalization: str = "base", normalization: str = "base",
anti_spoofing: bool = False, anti_spoofing: bool = False,
max_faces: Optional[int] = None, max_faces: Optional[int] = None,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Represent facial images as multi-dimensional vector embeddings. Represent facial images as multi-dimensional vector embeddings.
@ -441,15 +441,16 @@ def represent(
def stream( def stream(
db_path: str = "", db_path: str = "",
model_name: str = "VGG-Face", model_name: str = "VGG-Face",
detector_backend: str = "opencv", detector_backend: str = "opencv",
distance_metric: str = "cosine", distance_metric: str = "cosine",
enable_face_analysis: bool = True, enable_face_analysis: bool = True,
source: Any = 0, source: Any = 0,
time_threshold: int = 5, time_threshold: int = 5,
frame_threshold: int = 5, frame_threshold: int = 5,
anti_spoofing: bool = False, anti_spoofing: bool = False,
output_path: Optional[str] = None,
) -> None: ) -> None:
""" """
Run real time face recognition and facial attribute analysis Run real time face recognition and facial attribute analysis
@ -478,6 +479,10 @@ def stream(
frame_threshold (int): The frame threshold for face recognition (default is 5). frame_threshold (int): The frame threshold for face recognition (default is 5).
anti_spoofing (boolean): Flag to enable anti spoofing (default is False). anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
output_path (str): Path to save the output video. (default is None
If None, no video is saved).
Returns: Returns:
None None
""" """
@ -495,19 +500,20 @@ def stream(
time_threshold=time_threshold, time_threshold=time_threshold,
frame_threshold=frame_threshold, frame_threshold=frame_threshold,
anti_spoofing=anti_spoofing, anti_spoofing=anti_spoofing,
output_path=output_path,
) )
def extract_faces( def extract_faces(
img_path: Union[str, np.ndarray], img_path: Union[str, np.ndarray],
detector_backend: str = "opencv", detector_backend: str = "opencv",
enforce_detection: bool = True, enforce_detection: bool = True,
align: bool = True, align: bool = True,
expand_percentage: int = 0, expand_percentage: int = 0,
grayscale: bool = False, grayscale: bool = False,
color_face: str = "rgb", color_face: str = "rgb",
normalize_face: bool = True, normalize_face: bool = True,
anti_spoofing: bool = False, anti_spoofing: bool = False,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Extract faces from a given image Extract faces from a given image
@ -584,11 +590,11 @@ def cli() -> None:
def detectFace( def detectFace(
img_path: Union[str, np.ndarray], img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224), target_size: tuple = (224, 224),
detector_backend: str = "opencv", detector_backend: str = "opencv",
enforce_detection: bool = True, enforce_detection: bool = True,
align: bool = True, align: bool = True,
) -> Union[np.ndarray, None]: ) -> Union[np.ndarray, None]:
""" """
Deprecated face detection function. Use extract_faces for same functionality. Deprecated face detection function. Use extract_faces for same functionality.

View File

@ -22,6 +22,7 @@ os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
IDENTIFIED_IMG_SIZE = 112 IDENTIFIED_IMG_SIZE = 112
TEXT_COLOR = (255, 255, 255) TEXT_COLOR = (255, 255, 255)
# pylint: disable=unused-variable # pylint: disable=unused-variable
def analysis( def analysis(
db_path: str, db_path: str,
@ -33,6 +34,7 @@ def analysis(
time_threshold=5, time_threshold=5,
frame_threshold=5, frame_threshold=5,
anti_spoofing: bool = False, anti_spoofing: bool = False,
output_path: Optional[str] = None,
): ):
""" """
Run real time face recognition and facial attribute analysis Run real time face recognition and facial attribute analysis
@ -62,6 +64,8 @@ def analysis(
anti_spoofing (boolean): Flag to enable anti spoofing (default is False). anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
output_path (str): Path to save the output video. (default is None
If None, no video is saved).
Returns: Returns:
None None
""" """
@ -77,12 +81,31 @@ def analysis(
model_name=model_name, model_name=model_name,
) )
cap = cv2.VideoCapture(source if isinstance(source, str) else int(source))
if not cap.isOpened():
logger.error(f"Cannot open video source: {source}")
return
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec for output file
# Ensure the output directory exists if output_path is provided
if output_path:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Initialize video writer if output_path is provided
video_writer = (
cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
if output_path
else None
)
freezed_img = None freezed_img = None
freeze = False freeze = False
num_frames_with_faces = 0 num_frames_with_faces = 0
tic = time.time() tic = time.time()
cap = cv2.VideoCapture(source) # webcam
while True: while True:
has_frame, img = cap.read() has_frame, img = cap.read()
if not has_frame: if not has_frame:
@ -91,9 +114,9 @@ def analysis(
# we are adding some figures into img such as identified facial image, age, gender # we are adding some figures into img such as identified facial image, age, gender
# that is why, we need raw image itself to make analysis # that is why, we need raw image itself to make analysis
raw_img = img.copy() raw_img = img.copy()
faces_coordinates = [] faces_coordinates = []
if freeze is False:
if not freeze:
faces_coordinates = grab_facial_areas( faces_coordinates = grab_facial_areas(
img=img, detector_backend=detector_backend, anti_spoofing=anti_spoofing img=img, detector_backend=detector_backend, anti_spoofing=anti_spoofing
) )
@ -101,7 +124,6 @@ def analysis(
# we will pass img to analyze modules (identity, demography) and add some illustrations # we will pass img to analyze modules (identity, demography) and add some illustrations
# that is why, we will not be able to extract detected face from img clearly # that is why, we will not be able to extract detected face from img clearly
detected_faces = extract_facial_areas(img=img, faces_coordinates=faces_coordinates) detected_faces = extract_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = highlight_facial_areas(img=img, faces_coordinates=faces_coordinates) img = highlight_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = countdown_to_freeze( img = countdown_to_freeze(
img=img, img=img,
@ -111,8 +133,8 @@ def analysis(
) )
num_frames_with_faces = num_frames_with_faces + 1 if len(faces_coordinates) else 0 num_frames_with_faces = num_frames_with_faces + 1 if len(faces_coordinates) else 0
freeze = num_frames_with_faces > 0 and num_frames_with_faces % frame_threshold == 0 freeze = num_frames_with_faces > 0 and num_frames_with_faces % frame_threshold == 0
if freeze: if freeze:
# add analyze results into img - derive from raw_img # add analyze results into img - derive from raw_img
img = highlight_facial_areas( img = highlight_facial_areas(
@ -144,22 +166,28 @@ def analysis(
tic = time.time() tic = time.time()
logger.info("freezed") logger.info("freezed")
elif freeze is True and time.time() - tic > time_threshold: elif freeze and time.time() - tic > time_threshold:
freeze = False freeze = False
freezed_img = None freezed_img = None
# reset counter for freezing # reset counter for freezing
tic = time.time() tic = time.time()
logger.info("freeze released") logger.info("Freeze released")
freezed_img = countdown_to_release(img=freezed_img, tic=tic, time_threshold=time_threshold) freezed_img = countdown_to_release(img=freezed_img, tic=tic, time_threshold=time_threshold)
display_img = img if freezed_img is None else freezed_img
cv2.imshow("img", img if freezed_img is None else freezed_img) # Save the frame to output video if writer is initialized
if video_writer:
video_writer.write(display_img)
if cv2.waitKey(1) & 0xFF == ord("q"): # press q to quit cv2.imshow("img", display_img)
if cv2.waitKey(1) & 0xFF == ord("q"):
break break
# kill open cv things # Release resources
cap.release() cap.release()
if video_writer:
video_writer.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()