Merge pull request #6 from NatLee/patch/adjustment-0103-1

New Shared Prediction Method.
This commit is contained in:
halice 2025-01-13 23:04:29 +08:00 committed by GitHub
commit 8883b212b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 528 additions and 238 deletions

127
.github/CODE_OF_CONDUCT.md vendored Normal file
View File

@ -0,0 +1,127 @@
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at serengil@gmail.com.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.

View File

@ -16,6 +16,9 @@
[![GitHub Sponsors](https://img.shields.io/github/sponsors/serengil?logo=GitHub&color=lightgray)](https://github.com/sponsors/serengil)
[![Buy Me a Coffee](https://img.shields.io/badge/-buy_me_a%C2%A0coffee-gray?logo=buy-me-a-coffee)](https://buymeacoffee.com/serengil)
[![Hacker News](https://img.shields.io/badge/dynamic/json?color=orange&label=Hacker%20News&query=score&url=https%3A%2F%2Fhacker-news.firebaseio.com%2Fv0%2Fitem%2F42584896.json&logo=y-combinator)](https://news.ycombinator.com/item?id=42584896)
[![Product Hunt](https://img.shields.io/badge/Product%20Hunt-%E2%96%B2-orange?logo=producthunt)](https://www.producthunt.com/posts/deepface?embed=true&utm_source=badge-featured&utm_medium=badge&utm_souce=badge-deepface)
<!-- [![DOI](http://img.shields.io/:DOI-10.1109/ICEET53442.2021.9659697-blue.svg?style=flat)](https://doi.org/10.1109/ICEET53442.2021.9659697) -->
<!-- [![DOI](http://img.shields.io/:DOI-10.1109/ASYU50717.2020.9259802-blue.svg?style=flat)](https://doi.org/10.1109/ASYU50717.2020.9259802) -->
@ -392,7 +395,7 @@ Before creating a PR, you should run the unit tests and linting locally by runni
There are many ways to support a project - starring⭐ the GitHub repo is just one 🙏
If you do like this work, then you can support it financially on [Patreon](https://www.patreon.com/serengil?repo=deepface), [GitHub Sponsors](https://github.com/sponsors/serengil) or [Buy Me a Coffee](https://buymeacoffee.com/serengil).
If you do like this work, then you can support it financially on [Patreon](https://www.patreon.com/serengil?repo=deepface), [GitHub Sponsors](https://github.com/sponsors/serengil) or [Buy Me a Coffee](https://buymeacoffee.com/serengil). Also, your company's logo will be shown on README on GitHub and PyPI if you become a sponsor in gold, silver or bronze tiers.
<a href="https://www.patreon.com/serengil?repo=deepface">
<img src="https://raw.githubusercontent.com/serengil/deepface/master/icon/patreon.png" width="30%" height="30%">
@ -402,13 +405,25 @@ If you do like this work, then you can support it financially on [Patreon](https
<img src="https://raw.githubusercontent.com/serengil/deepface/master/icon/bmc-button.png" width="25%" height="25%">
</a>
Also, your company's logo will be shown on README on GitHub and PyPI if you become a sponsor in gold, silver or bronze tiers.
Additionally, you can help us reach a wider audience by upvoting our posts on Hacker News and Product Hunt.
<div style="display: flex; align-items: center; gap: 10px;">
<!-- Hacker News Badge -->
<a href="https://news.ycombinator.com/item?id=42584896">
<img src="https://hackerbadge.vercel.app/api?id=42584896&type=orange" style="width: 250px; height: 54px;" width="250" alt="Featured on Hacker News">
</a>
<!-- Product Hunt Badge -->
<a href="https://www.producthunt.com/posts/deepface?embed=true&utm_source=badge-featured&utm_medium=badge&utm_souce=badge-deepface" target="_blank">
<img src="https://api.producthunt.com/widgets/embed-image/v1/featured.svg?post_id=753599&theme=light" alt="DeepFace - A Lightweight Deep Face Recognition Library for Python | Product Hunt" style="width: 250px; height: 54px;" width="250" height="54" />
</a>
</div>
## Citation
Please cite deepface in your publications if it helps your research - see [`CITATIONS`](https://github.com/serengil/deepface/blob/master/CITATION.md) for more details. Here are its BibTex entries:
If you use deepface in your research for facial recogntion or face detection purposes, please cite these publications:
If you use deepface in your research for facial recognition or face detection purposes, please cite these publications:
```BibTeX
@article{serengil2024lightface,

View File

@ -2,7 +2,7 @@
import os
import warnings
import logging
from typing import Any, Dict, List, Union, Optional
from typing import Any, Dict, IO, List, Union, Optional
# this has to be set before importing tensorflow
os.environ["TF_USE_LEGACY_KERAS"] = "1"
@ -68,28 +68,30 @@ def build_model(model_name: str, task: str = "facial_recognition") -> Any:
def verify(
img1_path: Union[str, np.ndarray, List[float]],
img2_path: Union[str, np.ndarray, List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
img1_path: Union[str, np.ndarray, IO[bytes], List[float]],
img2_path: Union[str, np.ndarray, IO[bytes], List[float]],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
silent: bool = False,
threshold: Optional[float] = None,
anti_spoofing: bool = False,
) -> Dict[str, Any]:
"""
Verify if an image pair represents the same person or different persons.
Args:
img1_path (str or np.ndarray or List[float]): Path to the first image.
Accepts exact image path as a string, numpy array (BGR), base64 encoded images
img1_path (str or np.ndarray or IO[bytes] or List[float]): Path to the first image.
Accepts exact image path as a string, numpy array (BGR), a file object that supports
at least `.read` and is opened in binary mode, base64 encoded images
or pre-calculated embeddings.
img2_path (str or np.ndarray or List[float]): Path to the second image.
Accepts exact image path as a string, numpy array (BGR), base64 encoded images
img2_path (str or np.ndarray or IO[bytes] or List[float]): Path to the second image.
Accepts exact image path as a string, numpy array (BGR), a file object that supports
at least `.read` and is opened in binary mode, base64 encoded images
or pre-calculated embeddings.
model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512,
@ -164,21 +166,22 @@ def verify(
def analyze(
img_path: Union[str, np.ndarray],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray, IO[bytes]],
actions: Union[tuple, list] = ("emotion", "age", "gender", "race"),
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
silent: bool = False,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Analyze facial attributes such as age, gender, emotion, and race in the provided image.
Args:
img_path (str or np.ndarray): The exact path to the image, a numpy array in BGR format,
or a base64 encoded image. If the source image contains multiple faces, the result will
include information for each detected face.
img_path (str or np.ndarray or IO[bytes]): The exact path to the image, a numpy array
in BGR format, a file object that supports at least `.read` and is opened in binary
mode, or a base64 encoded image. If the source image contains multiple faces,
the result will include information for each detected face.
actions (tuple): Attributes to analyze. The default is ('age', 'gender', 'emotion', 'race').
You can exclude some of these attributes from the analysis if needed.
@ -263,27 +266,28 @@ def analyze(
def find(
img_path: Union[str, np.ndarray],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
img_path: Union[str, np.ndarray, IO[bytes]],
db_path: str,
model_name: str = "VGG-Face",
distance_metric: str = "cosine",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
threshold: Optional[float] = None,
normalization: str = "base",
silent: bool = False,
refresh_database: bool = True,
anti_spoofing: bool = False,
batched: bool = False,
) -> Union[List[pd.DataFrame], List[List[Dict[str, Any]]]]:
"""
Identify individuals in a database
Args:
img_path (str or np.ndarray): The exact path to the image, a numpy array in BGR format,
or a base64 encoded image. If the source image contains multiple faces, the result will
include information for each detected face.
img_path (str or np.ndarray or IO[bytes]): The exact path to the image, a numpy array
in BGR format, a file object that supports at least `.read` and is opened in binary
mode, or a base64 encoded image. If the source image contains multiple
faces, the result will include information for each detected face.
db_path (string): Path to the folder containing image files. All detected faces
in the database will be considered in the decision-making process.
@ -369,23 +373,24 @@ def find(
def represent(
img_path: Union[str, np.ndarray],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
img_path: Union[str, np.ndarray, IO[bytes]],
model_name: str = "VGG-Face",
enforce_detection: bool = True,
detector_backend: str = "opencv",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "base",
anti_spoofing: bool = False,
max_faces: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Represent facial images as multi-dimensional vector embeddings.
Args:
img_path (str or np.ndarray): The exact path to the image, a numpy array in BGR format,
or a base64 encoded image. If the source image contains multiple faces, the result will
include information for each detected face.
img_path (str or np.ndarray or IO[bytes]): The exact path to the image, a numpy array
in BGR format, a file object that supports at least `.read` and is opened in binary
mode, or a base64 encoded image. If the source image contains multiple faces,
the result will include information for each detected face.
model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512,
OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet
@ -441,15 +446,16 @@ def represent(
def stream(
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
db_path: str = "",
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
distance_metric: str = "cosine",
enable_face_analysis: bool = True,
source: Any = 0,
time_threshold: int = 5,
frame_threshold: int = 5,
anti_spoofing: bool = False,
output_path: Optional[str] = None,
) -> None:
"""
Run real time face recognition and facial attribute analysis
@ -478,6 +484,10 @@ def stream(
frame_threshold (int): The frame threshold for face recognition (default is 5).
anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
output_path (str): Path to save the output video. (default is None
If None, no video is saved).
Returns:
None
"""
@ -495,26 +505,28 @@ def stream(
time_threshold=time_threshold,
frame_threshold=frame_threshold,
anti_spoofing=anti_spoofing,
output_path=output_path,
)
def extract_faces(
img_path: Union[str, np.ndarray],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
img_path: Union[str, np.ndarray, IO[bytes]],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 0,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Extract faces from a given image
Args:
img_path (str or np.ndarray): Path to the first image. Accepts exact image path
as a string, numpy array (BGR), or base64 encoded images.
img_path (str or np.ndarray or IO[bytes]): Path to the first image. Accepts exact image path
as a string, numpy array (BGR), a file object that supports at least `.read` and is
opened in binary mode, or base64 encoded images.
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8', 'yolov11n', 'yolov11s', 'yolov11m',
@ -584,11 +596,11 @@ def cli() -> None:
def detectFace(
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
img_path: Union[str, np.ndarray],
target_size: tuple = (224, 224),
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
) -> Union[np.ndarray, None]:
"""
Deprecated face detection function. Use extract_faces for same functionality.

View File

@ -1,7 +1,7 @@
# built-in dependencies
import os
import io
from typing import List, Union, Tuple
from typing import Generator, IO, List, Union, Tuple
import hashlib
import base64
from pathlib import Path
@ -14,6 +14,10 @@ from PIL import Image
from werkzeug.datastructures import FileStorage
IMAGE_EXTS = {".jpg", ".jpeg", ".png"}
PIL_EXTS = {"jpeg", "png"}
def list_images(path: str) -> List[str]:
"""
List images in a given path
@ -25,19 +29,31 @@ def list_images(path: str) -> List[str]:
images = []
for r, _, f in os.walk(path):
for file in f:
exact_path = os.path.join(r, file)
ext_lower = os.path.splitext(exact_path)[-1].lower()
if ext_lower not in {".jpg", ".jpeg", ".png"}:
continue
with Image.open(exact_path) as img: # lazy
if img.format.lower() in {"jpeg", "png"}:
images.append(exact_path)
if os.path.splitext(file)[1].lower() in IMAGE_EXTS:
exact_path = os.path.join(r, file)
with Image.open(exact_path) as img: # lazy
if img.format.lower() in PIL_EXTS:
images.append(exact_path)
return images
def yield_images(path: str) -> Generator[str, None, None]:
"""
Yield images in a given path
Args:
path (str): path's location
Yields:
image (str): image path
"""
for r, _, f in os.walk(path):
for file in f:
if os.path.splitext(file)[1].lower() in IMAGE_EXTS:
exact_path = os.path.join(r, file)
with Image.open(exact_path) as img: # lazy
if img.format.lower() in PIL_EXTS:
yield exact_path
def find_image_hash(file_path: str) -> str:
"""
Find the hash of given image file with its properties
@ -61,11 +77,11 @@ def find_image_hash(file_path: str) -> str:
return hasher.hexdigest()
def load_image(img: Union[str, np.ndarray]) -> Tuple[np.ndarray, str]:
def load_image(img: Union[str, np.ndarray, IO[bytes]]) -> Tuple[np.ndarray, str]:
"""
Load image from path, url, base64 or numpy array.
Load image from path, url, file object, base64 or numpy array.
Args:
img: a path, url, base64 or numpy array.
img: a path, url, file object, base64 or numpy array.
Returns:
image (numpy array): the loaded image in BGR format
image name (str): image name itself
@ -75,6 +91,14 @@ def load_image(img: Union[str, np.ndarray]) -> Tuple[np.ndarray, str]:
if isinstance(img, np.ndarray):
return img, "numpy array"
# The image is an object that supports `.read`
if hasattr(img, 'read') and callable(img.read):
if isinstance(img, io.StringIO):
raise ValueError(
'img requires bytes and cannot be an io.StringIO object.'
)
return load_image_from_io_object(img), 'io object'
if isinstance(img, Path):
img = str(img)
@ -104,6 +128,32 @@ def load_image(img: Union[str, np.ndarray]) -> Tuple[np.ndarray, str]:
return img_obj_bgr, img
def load_image_from_io_object(obj: IO[bytes]) -> np.ndarray:
"""
Load image from an object that supports being read
Args:
obj: a file like object.
Returns:
img (np.ndarray): The decoded image as a numpy array (OpenCV format).
"""
try:
_ = obj.seek(0)
except (AttributeError, TypeError, io.UnsupportedOperation):
seekable = False
obj = io.BytesIO(obj.read())
else:
seekable = True
try:
nparr = np.frombuffer(obj.read(), np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise ValueError("Failed to decode image")
return img
finally:
if not seekable:
obj.close()
def load_image_from_base64(uri: str) -> np.ndarray:
"""
Load image from base64 string.

View File

@ -21,28 +21,54 @@ class Demography(ABC):
def predict(self, img: Union[np.ndarray, List[np.ndarray]]) -> Union[np.ndarray, np.float64]:
pass
def _preprocess_batch_or_single_input(self, img: Union[np.ndarray, List[np.ndarray]]) -> np.ndarray:
def _predict_internal(self, img_batch: np.ndarray) -> np.ndarray:
"""
Predict for single image or batched images.
This method uses legacy method while receiving single image as input.
And switch to batch prediction if receives batched images.
Args:
img_batch:
Batch of images as np.ndarray (n, x, y, c)
with n >= 1, x = image width, y = image height, c = channel
Or Single image as np.ndarray (1, x, y, c)
with x = image width, y = image height and c = channel
The channel dimension may be omitted if the image is grayscale. (For emotion model)
"""
if not self.model_name: # Check if called from derived class
raise NotImplementedError("no model selected")
assert img_batch.ndim == 4, "expected 4-dimensional tensor input"
# Single image
if img_batch.shape[0] == 1:
# Check if grayscale by checking last dimension, if not 3, it is grayscale.
if img_batch.shape[-1] != 3:
# Remove batch dimension
img_batch = img_batch.squeeze(0)
# Predict with legacy method.
return self.model(img_batch, training=False).numpy()[0, :]
# Batch of images
# Predict with batch prediction
return self.model.predict_on_batch(img_batch)
def _preprocess_batch_or_single_input(
self,
img: Union[np.ndarray, List[np.ndarray]]
) -> np.ndarray:
"""
Preprocess single or batch of images, return as 4-D numpy array.
Args:
img: Single image as np.ndarray (224, 224, 3) or
List of images as List[np.ndarray] or
Batch of images as np.ndarray (n, 224, 224, 3)
List of images as List[np.ndarray] or
Batch of images as np.ndarray (n, 224, 224, 3)
Returns:
Four-dimensional numpy array (n, 224, 224, 3)
"""
if isinstance(img, list): # Convert from list to image batch.
image_batch = np.array(img)
else:
image_batch = img
image_batch = np.array(img)
# Remove batch dimension in advance if exists
image_batch = image_batch.squeeze()
# Check input dimension
if len(image_batch.shape) == 3:
# Single image - add batch dimension
image_batch = np.expand_dims(image_batch, axis=0)
return image_batch

View File

@ -54,9 +54,9 @@ class ApparentAgeClient(Demography):
# Preprocessing input image or image list.
imgs = self._preprocess_batch_or_single_input(img)
# Batch prediction
age_predictions = self.model.predict_on_batch(imgs)
# Prediction from 3 channels image
age_predictions = self._predict_internal(imgs)
# Calculate apparent ages
apparent_ages = np.array(
[find_apparent_age(age_prediction) for age_prediction in age_predictions]

View File

@ -72,14 +72,17 @@ class EmotionClient(Demography):
# Preprocessing input image or image list.
imgs = self._preprocess_batch_or_single_input(img)
# Preprocess each image
processed_imgs = np.array([self._preprocess_image(img) for img in imgs])
# Add channel dimension for grayscale images
processed_imgs = np.expand_dims(processed_imgs, axis=-1)
if imgs.shape[0] == 1:
# Preprocess single image and add channel dimension for grayscale images
processed_imgs = np.expand_dims(np.array([self._preprocess_image(img) for img in imgs]), axis=0)
else:
# Preprocess batch of images and add channel dimension for grayscale images
processed_imgs = np.expand_dims(np.array([self._preprocess_image(img) for img in imgs]), axis=-1)
# Reshape input for model (expected shape=(n, 48, 48, 1)), where n is the batch size
processed_imgs = processed_imgs.reshape(processed_imgs.shape[0], 48, 48, 1)
# Batch prediction
predictions = self.model.predict_on_batch(processed_imgs)
# Prediction
predictions = self._predict_internal(processed_imgs)
return predictions

View File

@ -54,8 +54,8 @@ class GenderClient(Demography):
# Preprocessing input image or image list.
imgs = self._preprocess_batch_or_single_input(img)
# Batch prediction
predictions = self.model.predict_on_batch(imgs)
# Prediction
predictions = self._predict_internal(imgs)
return predictions

View File

@ -54,8 +54,8 @@ class RaceClient(Demography):
# Preprocessing input image or image list.
imgs = self._preprocess_batch_or_single_input(img)
# Batch prediction
predictions = self.model.predict_on_batch(imgs)
# Prediction
predictions = self._predict_internal(imgs)
return predictions

View File

@ -1,5 +1,5 @@
# built-in dependencies
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Union, Optional
# 3rd party dependencies
import numpy as np
@ -117,8 +117,6 @@ def analyze(
f"Invalid action passed ({repr(action)})). "
"Valid actions are `emotion`, `age`, `gender`, `race`."
)
# ---------------------------------
resp_objects = []
img_objs = detection.extract_faces(
img_path=img_path,
@ -130,109 +128,105 @@ def analyze(
anti_spoofing=anti_spoofing,
)
# Anti-spoofing check
if anti_spoofing and any(img_obj.get("is_real", True) is False for img_obj in img_objs):
raise ValueError("Spoof detected in the given image.")
# Prepare the input for the model
valid_faces = []
face_regions = []
face_confidences = []
for img_obj in img_objs:
# Extract the face content
def preprocess_face(img_obj: Dict[str, Any]) -> Optional[np.ndarray]:
"""
Preprocess the face image for analysis.
"""
img_content = img_obj["face"]
# Check if the face content is empty
if img_content.shape[0] == 0 or img_content.shape[1] == 0:
continue
return None
img_content = img_content[:, :, ::-1] # BGR to RGB
return preprocessing.resize_image(img=img_content, target_size=(224, 224))
# Convert the image to RGB format from BGR
img_content = img_content[:, :, ::-1]
# Resize the image to the target size for the model
img_content = preprocessing.resize_image(img=img_content, target_size=(224, 224))
valid_faces.append(img_content)
face_regions.append(img_obj["facial_area"])
face_confidences.append(img_obj["confidence"])
# If no valid faces are found, return an empty list
if not valid_faces:
# Filter out empty faces
face_data = [(preprocess_face(img_obj), img_obj["facial_area"], img_obj["confidence"])
for img_obj in img_objs if img_obj["face"].size > 0]
if not face_data:
return []
# Convert the list of valid faces to a numpy array
# Unpack the face data
valid_faces, face_regions, face_confidences = zip(*face_data)
faces_array = np.array(valid_faces)
# Create placeholder response objects for each face
for _ in range(len(valid_faces)):
resp_objects.append({})
# Initialize the results list with face regions and confidence scores
results = [{"region": region, "face_confidence": conf}
for region, conf in zip(face_regions, face_confidences)]
# For each action, predict the corresponding attribute
# Iterate over the actions and perform analysis
pbar = tqdm(
range(0, len(actions)),
actions,
desc="Finding actions",
disable=silent if len(actions) > 1 else True,
)
for index in pbar:
action = actions[index]
for action in pbar:
pbar.set_description(f"Action: {action}")
model = modeling.build_model(task="facial_attribute", model_name=action.capitalize())
predictions = model.predict(faces_array)
# If the model returns a single prediction, reshape it to match the number of faces.
# Determine the correct shape of predictions by using number of faces and predictions shape.
# Example: For 1 face with Emotion model, predictions will be reshaped to (1, 7).
if faces_array.shape[0] == 1 and len(predictions.shape) == 1:
# For models like `Emotion`, which return a single prediction for a single face
predictions = predictions.reshape(1, -1)
# Update the results with the predictions
# ----------------------------------------
# For emotion, calculate the percentage of each emotion and find the dominant emotion
if action == "emotion":
# Build the emotion model
model = modeling.build_model(task="facial_attribute", model_name="Emotion")
emotion_predictions = model.predict(faces_array)
for idx, predictions in enumerate(emotion_predictions):
sum_of_predictions = predictions.sum()
resp_objects[idx]["emotion"] = {}
for i, emotion_label in enumerate(Emotion.labels):
emotion_prediction = 100 * predictions[i] / sum_of_predictions
resp_objects[idx]["emotion"][emotion_label] = emotion_prediction
resp_objects[idx]["dominant_emotion"] = Emotion.labels[np.argmax(predictions)]
emotion_results = [
{
"emotion": {
label: 100 * pred[i] / pred.sum()
for i, label in enumerate(Emotion.labels)
},
"dominant_emotion": Emotion.labels[np.argmax(pred)]
}
for pred in predictions
]
for result, emotion_result in zip(results, emotion_results):
result.update(emotion_result)
# ----------------------------------------
# For age, find the dominant age category (0-100)
elif action == "age":
# Build the age model
model = modeling.build_model(task="facial_attribute", model_name="Age")
age_predictions = model.predict(faces_array)
for idx, age in enumerate(age_predictions):
resp_objects[idx]["age"] = int(age)
age_results = [{"age": int(np.argmax(pred) if len(pred.shape) > 0 else pred)}
for pred in predictions]
for result, age_result in zip(results, age_results):
result.update(age_result)
# ----------------------------------------
# For gender, calculate the percentage of each gender and find the dominant gender
elif action == "gender":
# Build the gender model
model = modeling.build_model(task="facial_attribute", model_name="Gender")
gender_predictions = model.predict(faces_array)
for idx, predictions in enumerate(gender_predictions):
resp_objects[idx]["gender"] = {}
for i, gender_label in enumerate(Gender.labels):
gender_prediction = 100 * predictions[i]
resp_objects[idx]["gender"][gender_label] = gender_prediction
resp_objects[idx]["dominant_gender"] = Gender.labels[np.argmax(predictions)]
gender_results = [
{
"gender": {
label: 100 * pred[i]
for i, label in enumerate(Gender.labels)
},
"dominant_gender": Gender.labels[np.argmax(pred)]
}
for pred in predictions
]
for result, gender_result in zip(results, gender_results):
result.update(gender_result)
# ----------------------------------------
# For race, calculate the percentage of each race and find the dominant race
elif action == "race":
# Build the race model
model = modeling.build_model(task="facial_attribute", model_name="Race")
race_predictions = model.predict(faces_array)
for idx, predictions in enumerate(race_predictions):
sum_of_predictions = predictions.sum()
resp_objects[idx]["race"] = {}
for i, race_label in enumerate(Race.labels):
race_prediction = 100 * predictions[i] / sum_of_predictions
resp_objects[idx]["race"][race_label] = race_prediction
resp_objects[idx]["dominant_race"] = Race.labels[np.argmax(predictions)]
race_results = [
{
"race": {
label: 100 * pred[i] / pred.sum()
for i, label in enumerate(Race.labels)
},
"dominant_race": Race.labels[np.argmax(pred)]
}
for pred in predictions
]
for result, race_result in zip(results, race_results):
result.update(race_result)
# Add the face region and confidence to the response objects
for idx, resp_obj in enumerate(resp_objects):
resp_obj["region"] = face_regions[idx]
resp_obj["face_confidence"] = face_confidences[idx]
return resp_objects
return results

View File

@ -1,5 +1,5 @@
# built-in dependencies
from typing import Any, Dict, List, Tuple, Union, Optional
from typing import Any, Dict, IO, List, Tuple, Union, Optional
# 3rd part dependencies
from heapq import nlargest
@ -19,7 +19,7 @@ logger = Logger()
def extract_faces(
img_path: Union[str, np.ndarray],
img_path: Union[str, np.ndarray, IO[bytes]],
detector_backend: str = "opencv",
enforce_detection: bool = True,
align: bool = True,
@ -34,8 +34,9 @@ def extract_faces(
Extract faces from a given image
Args:
img_path (str or np.ndarray): Path to the first image. Accepts exact image path
as a string, numpy array (BGR), or base64 encoded images.
img_path (str or np.ndarray or IO[bytes]): Path to the first image. Accepts exact image path
as a string, numpy array (BGR), a file object that supports at least `.read` and is
opened in binary mode, or base64 encoded images.
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8', 'yolov11n', 'yolov11s', 'yolov11m',

View File

@ -136,7 +136,7 @@ def find(
representations = []
# required columns for representations
df_cols = [
df_cols = {
"identity",
"hash",
"embedding",
@ -144,12 +144,12 @@ def find(
"target_y",
"target_w",
"target_h",
]
}
# Ensure the proper pickle file exists
if not os.path.exists(datastore_path):
with open(datastore_path, "wb") as f:
pickle.dump([], f)
pickle.dump([], f, pickle.HIGHEST_PROTOCOL)
# Load the representations from the pickle file
with open(datastore_path, "rb") as f:
@ -157,18 +157,15 @@ def find(
# check each item of representations list has required keys
for i, current_representation in enumerate(representations):
missing_keys = set(df_cols) - set(current_representation.keys())
missing_keys = df_cols - set(current_representation.keys())
if len(missing_keys) > 0:
raise ValueError(
f"{i}-th item does not have some required keys - {missing_keys}."
f"Consider to delete {datastore_path}"
)
# embedded images
pickled_images = [representation["identity"] for representation in representations]
# Get the list of images on storage
storage_images = image_utils.list_images(path=db_path)
storage_images = set(image_utils.yield_images(path=db_path))
if len(storage_images) == 0 and refresh_database is True:
raise ValueError(f"No item found in {db_path}")
@ -186,8 +183,13 @@ def find(
# Enforce data consistency amongst on disk images and pickle file
if refresh_database:
new_images = set(storage_images) - set(pickled_images) # images added to storage
old_images = set(pickled_images) - set(storage_images) # images removed from storage
# embedded images
pickled_images = {
representation["identity"] for representation in representations
}
new_images = storage_images - pickled_images # images added to storage
old_images = pickled_images - storage_images # images removed from storage
# detect replaced images
for current_representation in representations:
@ -232,7 +234,7 @@ def find(
if must_save_pickle:
with open(datastore_path, "wb") as f:
pickle.dump(representations, f)
pickle.dump(representations, f, pickle.HIGHEST_PROTOCOL)
if not silent:
logger.info(f"There are now {len(representations)} representations in {file_name}")

View File

@ -22,6 +22,7 @@ os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
IDENTIFIED_IMG_SIZE = 112
TEXT_COLOR = (255, 255, 255)
# pylint: disable=unused-variable
def analysis(
db_path: str,
@ -33,6 +34,7 @@ def analysis(
time_threshold=5,
frame_threshold=5,
anti_spoofing: bool = False,
output_path: Optional[str] = None,
):
"""
Run real time face recognition and facial attribute analysis
@ -62,6 +64,8 @@ def analysis(
anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
output_path (str): Path to save the output video. (default is None
If None, no video is saved).
Returns:
None
"""
@ -77,12 +81,31 @@ def analysis(
model_name=model_name,
)
cap = cv2.VideoCapture(source if isinstance(source, str) else int(source))
if not cap.isOpened():
logger.error(f"Cannot open video source: {source}")
return
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Codec for output file
# Ensure the output directory exists if output_path is provided
if output_path:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Initialize video writer if output_path is provided
video_writer = (
cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
if output_path
else None
)
freezed_img = None
freeze = False
num_frames_with_faces = 0
tic = time.time()
cap = cv2.VideoCapture(source) # webcam
while True:
has_frame, img = cap.read()
if not has_frame:
@ -91,9 +114,9 @@ def analysis(
# we are adding some figures into img such as identified facial image, age, gender
# that is why, we need raw image itself to make analysis
raw_img = img.copy()
faces_coordinates = []
if freeze is False:
if not freeze:
faces_coordinates = grab_facial_areas(
img=img, detector_backend=detector_backend, anti_spoofing=anti_spoofing
)
@ -101,7 +124,6 @@ def analysis(
# we will pass img to analyze modules (identity, demography) and add some illustrations
# that is why, we will not be able to extract detected face from img clearly
detected_faces = extract_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = highlight_facial_areas(img=img, faces_coordinates=faces_coordinates)
img = countdown_to_freeze(
img=img,
@ -111,8 +133,8 @@ def analysis(
)
num_frames_with_faces = num_frames_with_faces + 1 if len(faces_coordinates) else 0
freeze = num_frames_with_faces > 0 and num_frames_with_faces % frame_threshold == 0
if freeze:
# add analyze results into img - derive from raw_img
img = highlight_facial_areas(
@ -144,22 +166,28 @@ def analysis(
tic = time.time()
logger.info("freezed")
elif freeze is True and time.time() - tic > time_threshold:
elif freeze and time.time() - tic > time_threshold:
freeze = False
freezed_img = None
# reset counter for freezing
tic = time.time()
logger.info("freeze released")
logger.info("Freeze released")
freezed_img = countdown_to_release(img=freezed_img, tic=tic, time_threshold=time_threshold)
display_img = img if freezed_img is None else freezed_img
cv2.imshow("img", img if freezed_img is None else freezed_img)
# Save the frame to output video if writer is initialized
if video_writer:
video_writer.write(display_img)
if cv2.waitKey(1) & 0xFF == ord("q"): # press q to quit
cv2.imshow("img", display_img)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
# kill open cv things
# Release resources
cap.release()
if video_writer:
video_writer.release()
cv2.destroyAllWindows()

View File

@ -95,12 +95,23 @@ def test_filetype_for_find():
def test_filetype_for_find_bulk_embeddings():
imgs = image_utils.list_images("dataset")
# List
list_imgs = image_utils.list_images("dataset")
assert len(imgs) > 0
assert len(list_imgs) > 0
# img47 is webp even though its extension is jpg
assert "dataset/img47.jpg" not in imgs
assert "dataset/img47.jpg" not in list_imgs
# Generator
gen_imgs = list(image_utils.yield_images("dataset"))
assert len(gen_imgs) > 0
# img47 is webp even though its extension is jpg
assert "dataset/img47.jpg" not in gen_imgs
assert gen_imgs == list_imgs
def test_find_without_refresh_database():

View File

@ -1,5 +1,7 @@
# built-in dependencies
import io
import cv2
import pytest
# project dependencies
from deepface import DeepFace
@ -18,6 +20,25 @@ def test_standard_represent():
logger.info("✅ test standard represent function done")
def test_standard_represent_with_io_object():
img_path = "dataset/img1.jpg"
default_embedding_objs = DeepFace.represent(img_path)
io_embedding_objs = DeepFace.represent(open(img_path, 'rb'))
assert default_embedding_objs == io_embedding_objs
# Confirm non-seekable io objects are handled properly
io_obj = io.BytesIO(open(img_path, 'rb').read())
io_obj.seek = None
no_seek_io_embedding_objs = DeepFace.represent(io_obj)
assert default_embedding_objs == no_seek_io_embedding_objs
# Confirm non-image io objects raise exceptions
with pytest.raises(ValueError, match='Failed to decode image'):
DeepFace.represent(io.BytesIO(open(r'../requirements.txt', 'rb').read()))
logger.info("✅ test standard represent with io object function done")
def test_represent_for_skipped_detector_backend_with_image_path():
face_img = "dataset/img5.jpg"
img_objs = DeepFace.represent(img_path=face_img, detector_backend="skip")