Merge pull request #1359 from serengil/feat-task-0610-combined-distance-functions

single and batch distance functions are stored in verify module
This commit is contained in:
Sefik Ilkin Serengil 2024-10-06 21:59:20 +01:00 committed by GitHub
commit cf7d428477
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 126 additions and 139 deletions

View File

@ -323,18 +323,18 @@ def find(
anti_spoofing (boolean): Flag to enable anti spoofing (default is False). anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
Returns: Returns:
results (List[pd.DataFrame] or List[List[Dict[str, Any]]]): results (List[pd.DataFrame] or List[List[Dict[str, Any]]]):
A list of pandas dataframes (if `batched=False`) or A list of pandas dataframes (if `batched=False`) or
a list of dicts (if `batched=True`). a list of dicts (if `batched=True`).
Each dataframe or dict corresponds to the identity information for Each dataframe or dict corresponds to the identity information for
an individual detected in the source image. an individual detected in the source image.
Note: If you have a large database and/or a source photo with many faces, Note: If you have a large database and/or a source photo with many faces,
use `batched=True`, as it is optimized for large batch processing. use `batched=True`, as it is optimized for large batch processing.
Please pay attention that when using `batched=True`, the function returns Please pay attention that when using `batched=True`, the function returns
a list of dicts (not a list of DataFrames), a list of dicts (not a list of DataFrames),
but with the same keys as the columns in the DataFrame. but with the same keys as the columns in the DataFrame.
The DataFrame columns or dict keys include: The DataFrame columns or dict keys include:
- 'identity': Identity label of the detected individual. - 'identity': Identity label of the detected individual.
@ -364,7 +364,7 @@ def find(
silent=silent, silent=silent,
refresh_database=refresh_database, refresh_database=refresh_database,
anti_spoofing=anti_spoofing, anti_spoofing=anti_spoofing,
batched=batched batched=batched,
) )

View File

@ -105,7 +105,7 @@ def download_all_models_in_one_shot() -> None:
Download all model weights in one shot Download all model weights in one shot
""" """
# weight urls as variables # import model weights from module here to avoid circular import issue
from deepface.models.facial_recognition.VGGFace import WEIGHTS_URL as VGGFACE_WEIGHTS from deepface.models.facial_recognition.VGGFace import WEIGHTS_URL as VGGFACE_WEIGHTS
from deepface.models.facial_recognition.Facenet import FACENET128_WEIGHTS, FACENET512_WEIGHTS from deepface.models.facial_recognition.Facenet import FACENET128_WEIGHTS, FACENET512_WEIGHTS
from deepface.models.facial_recognition.OpenFace import WEIGHTS_URL as OPENFACE_WEIGHTS from deepface.models.facial_recognition.OpenFace import WEIGHTS_URL as OPENFACE_WEIGHTS

View File

@ -78,18 +78,18 @@ def find(
Returns: Returns:
results (List[pd.DataFrame] or List[List[Dict[str, Any]]]): results (List[pd.DataFrame] or List[List[Dict[str, Any]]]):
A list of pandas dataframes (if `batched=False`) or A list of pandas dataframes (if `batched=False`) or
a list of dicts (if `batched=True`). a list of dicts (if `batched=True`).
Each dataframe or dict corresponds to the identity information for Each dataframe or dict corresponds to the identity information for
an individual detected in the source image. an individual detected in the source image.
Note: If you have a large database and/or a source photo with many faces, Note: If you have a large database and/or a source photo with many faces,
use `batched=True`, as it is optimized for large batch processing. use `batched=True`, as it is optimized for large batch processing.
Please pay attention that when using `batched=True`, the function returns Please pay attention that when using `batched=True`, the function returns
a list of dicts (not a list of DataFrames), a list of dicts (not a list of DataFrames),
but with the same keys as the columns in the DataFrame. but with the same keys as the columns in the DataFrame.
The DataFrame columns or dict keys include: The DataFrame columns or dict keys include:
- 'identity': Identity label of the detected individual. - 'identity': Identity label of the detected individual.
@ -266,7 +266,7 @@ def find(
align, align,
threshold, threshold,
normalization, normalization,
anti_spoofing anti_spoofing,
) )
df = pd.DataFrame(representations) df = pd.DataFrame(representations)
@ -441,6 +441,7 @@ def __find_bulk_embeddings(
return representations return representations
def find_batched( def find_batched(
representations: List[Dict[str, Any]], representations: List[Dict[str, Any]],
source_objs: List[Dict[str, Any]], source_objs: List[Dict[str, Any]],
@ -459,11 +460,11 @@ def find_batched(
The function uses batch processing for efficient computation of distances. The function uses batch processing for efficient computation of distances.
Args: Args:
representations (List[Dict[str, Any]]): representations (List[Dict[str, Any]]):
A list of dictionaries containing precomputed target embeddings and associated metadata. A list of dictionaries containing precomputed target embeddings and associated metadata.
Each dictionary should have at least the key `embedding`. Each dictionary should have at least the key `embedding`.
source_objs (List[Dict[str, Any]]): source_objs (List[Dict[str, Any]]):
A list of dictionaries representing the source images to compare against A list of dictionaries representing the source images to compare against
the target embeddings. Each dictionary should contain: the target embeddings. Each dictionary should contain:
- `face`: The image data or path to the source face image. - `face`: The image data or path to the source face image.
@ -471,7 +472,7 @@ def find_batched(
indicating the facial region. indicating the facial region.
- Optionally, `is_real`: A boolean indicating if the face is real - Optionally, `is_real`: A boolean indicating if the face is real
(used for anti-spoofing). (used for anti-spoofing).
model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512, model_name (str): Model for face recognition. Options: VGG-Face, Facenet, Facenet512,
OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet (default is VGG-Face). OpenFace, DeepFace, DeepID, Dlib, ArcFace, SFace and GhostFaceNet (default is VGG-Face).
@ -499,7 +500,7 @@ def find_batched(
anti_spoofing (boolean): Flag to enable anti spoofing (default is False). anti_spoofing (boolean): Flag to enable anti spoofing (default is False).
Returns: Returns:
List[List[Dict[str, Any]]]: List[List[Dict[str, Any]]]:
A list where each element corresponds to a source face and A list where each element corresponds to a source face and
contains a list of dictionaries with matching faces. contains a list of dictionaries with matching faces.
""" """
@ -508,27 +509,24 @@ def find_batched(
metadata = set() metadata = set()
for item in representations: for item in representations:
emb = item.get('embedding') emb = item.get("embedding")
if emb is not None: if emb is not None:
embeddings_list.append(emb) embeddings_list.append(emb)
valid_mask.append(True) valid_mask.append(True)
else: else:
embeddings_list.append(np.zeros_like(representations[0]['embedding'])) embeddings_list.append(np.zeros_like(representations[0]["embedding"]))
valid_mask.append(False) valid_mask.append(False)
metadata.update(item.keys()) metadata.update(item.keys())
# remove embedding key from other keys # remove embedding key from other keys
metadata.discard('embedding') metadata.discard("embedding")
metadata = list(metadata) metadata = list(metadata)
embeddings = np.array(embeddings_list) # (N, D) embeddings = np.array(embeddings_list) # (N, D)
valid_mask = np.array(valid_mask) # (N,) valid_mask = np.array(valid_mask) # (N,)
data = { data = {key: np.array([item.get(key, None) for item in representations]) for key in metadata}
key: np.array([item.get(key, None) for item in representations])
for key in metadata
}
target_embeddings = [] target_embeddings = []
source_regions = [] source_regions = []
@ -558,101 +556,46 @@ def find_batched(
target_threshold = threshold or verification.find_threshold(model_name, distance_metric) target_threshold = threshold or verification.find_threshold(model_name, distance_metric)
target_thresholds.append(target_threshold) target_thresholds.append(target_threshold)
target_embeddings = np.array(target_embeddings) # (M, D) target_embeddings = np.array(target_embeddings) # (M, D)
target_thresholds = np.array(target_thresholds) # (M,) target_thresholds = np.array(target_thresholds) # (M,)
source_regions_arr = { source_regions_arr = {
'source_x': np.array([region['x'] for region in source_regions]), "source_x": np.array([region["x"] for region in source_regions]),
'source_y': np.array([region['y'] for region in source_regions]), "source_y": np.array([region["y"] for region in source_regions]),
'source_w': np.array([region['w'] for region in source_regions]), "source_w": np.array([region["w"] for region in source_regions]),
'source_h': np.array([region['h'] for region in source_regions]), "source_h": np.array([region["h"] for region in source_regions]),
} }
def find_cosine_distance_batch( distances = verification.find_distance(embeddings, target_embeddings, distance_metric) # (M, N)
embeddings: np.ndarray, target_embeddings: np.ndarray
) -> np.ndarray:
"""
Find the cosine distances between batches of embeddings
Args:
embeddings (np.ndarray): array of shape (N, D)
target_embeddings (np.ndarray): array of shape (M, D)
Returns:
np.ndarray: distance matrix of shape (M, N)
"""
embeddings_norm = verification.l2_normalize(embeddings, axis=1)
target_embeddings_norm = verification.l2_normalize(target_embeddings, axis=1)
cosine_similarities = np.dot(target_embeddings_norm, embeddings_norm.T)
cosine_distances = 1 - cosine_similarities
return cosine_distances
def find_euclidean_distance_batch(
embeddings: np.ndarray, target_embeddings: np.ndarray
) -> np.ndarray:
"""
Find the Euclidean distances between batches of embeddings
Args:
embeddings (np.ndarray): array of shape (N, D)
target_embeddings (np.ndarray): array of shape (M, D)
Returns:
np.ndarray: distance matrix of shape (M, N)
"""
diff = embeddings[None, :, :] - target_embeddings[:, None, :] # (M, N, D)
distances = np.linalg.norm(diff, axis=2) # (M, N)
return distances
def find_distance_batch(
embeddings: np.ndarray, target_embeddings: np.ndarray, distance_metric: str,
) -> np.ndarray:
"""
Find pairwise distances between batches of embeddings using the specified distance metric
Args:
embeddings (np.ndarray): array of shape (N, D)
target_embeddings (np.ndarray): array of shape (M, D)
distance_metric (str): distance metric ('cosine', 'euclidean', 'euclidean_l2')
Returns:
np.ndarray: distance matrix of shape (M, N)
"""
if distance_metric == "cosine":
distances = find_cosine_distance_batch(embeddings, target_embeddings)
elif distance_metric == "euclidean":
distances = find_euclidean_distance_batch(embeddings, target_embeddings)
elif distance_metric == "euclidean_l2":
embeddings_norm = verification.l2_normalize(embeddings, axis=1)
target_embeddings_norm = verification.l2_normalize(target_embeddings, axis=1)
distances = find_euclidean_distance_batch(embeddings_norm, target_embeddings_norm)
else:
raise ValueError("Invalid distance_metric passed - ", distance_metric)
return np.round(distances, 6)
distances = find_distance_batch(embeddings, target_embeddings, distance_metric) # (M, N)
distances[:, ~valid_mask] = np.inf distances[:, ~valid_mask] = np.inf
resp_obj = [] resp_obj = []
for i in range(len(target_embeddings)): for i in range(len(target_embeddings)):
target_distances = distances[i] # (N,) target_distances = distances[i] # (N,)
target_threshold = target_thresholds[i] target_threshold = target_thresholds[i]
N = embeddings.shape[0] N = embeddings.shape[0]
result_data = dict(data) result_data = dict(data)
result_data.update({ result_data.update(
'source_x': np.full(N, source_regions_arr['source_x'][i]), {
'source_y': np.full(N, source_regions_arr['source_y'][i]), "source_x": np.full(N, source_regions_arr["source_x"][i]),
'source_w': np.full(N, source_regions_arr['source_w'][i]), "source_y": np.full(N, source_regions_arr["source_y"][i]),
'source_h': np.full(N, source_regions_arr['source_h'][i]), "source_w": np.full(N, source_regions_arr["source_w"][i]),
'threshold': np.full(N, target_threshold), "source_h": np.full(N, source_regions_arr["source_h"][i]),
'distance': target_distances, "threshold": np.full(N, target_threshold),
}) "distance": target_distances,
}
)
mask = target_distances <= target_threshold mask = target_distances <= target_threshold
filtered_data = {key: value[mask] for key, value in result_data.items()} filtered_data = {key: value[mask] for key, value in result_data.items()}
sorted_indices = np.argsort(filtered_data['distance']) sorted_indices = np.argsort(filtered_data["distance"])
sorted_data = {key: value[sorted_indices] for key, value in filtered_data.items()} sorted_data = {key: value[sorted_indices] for key, value in filtered_data.items()}
num_results = len(sorted_data['distance']) num_results = len(sorted_data["distance"])
result_dicts = [ result_dicts = [
{key: sorted_data[key][i] for key in sorted_data} {key: sorted_data[key][i] for key in sorted_data} for i in range(num_results)
for i in range(num_results)
] ]
resp_obj.append(result_dicts) resp_obj.append(result_dicts)
return resp_obj return resp_obj

View File

@ -263,45 +263,73 @@ def __extract_faces_and_embeddings(
def find_cosine_distance( def find_cosine_distance(
source_representation: Union[np.ndarray, list], test_representation: Union[np.ndarray, list] source_representation: Union[np.ndarray, list], test_representation: Union[np.ndarray, list]
) -> np.float64: ) -> Union[np.float64, np.ndarray]:
""" """
Find cosine distance between two given vectors Find cosine distance between two given vectors or batches of vectors.
Args: Args:
source_representation (np.ndarray or list): 1st vector source_representation (np.ndarray or list): 1st vector or batch of vectors.
test_representation (np.ndarray or list): 2nd vector test_representation (np.ndarray or list): 2nd vector or batch of vectors.
Returns Returns
distance (np.float64): calculated cosine distance np.float64 or np.ndarray: Calculated cosine distance(s).
It returns a np.float64 for single embeddings and np.ndarray for batch embeddings.
""" """
if isinstance(source_representation, list): # Convert inputs to numpy arrays if necessary
source_representation = np.array(source_representation) source_representation = np.asarray(source_representation)
test_representation = np.asarray(test_representation)
if isinstance(test_representation, list): if source_representation.ndim == 1 and test_representation.ndim == 1:
test_representation = np.array(test_representation) # single embedding
dot_product = np.dot(source_representation, test_representation)
a = np.dot(source_representation, test_representation) source_norm = np.linalg.norm(source_representation)
b = np.linalg.norm(source_representation) test_norm = np.linalg.norm(test_representation)
c = np.linalg.norm(test_representation) distances = 1 - dot_product / (source_norm * test_norm)
return 1 - a / (b * c) elif source_representation.ndim == 2 and test_representation.ndim == 2:
# list of embeddings (batch)
source_normed = l2_normalize(source_representation, axis=1) # (N, D)
test_normed = l2_normalize(test_representation, axis=1) # (M, D)
cosine_similarities = np.dot(test_normed, source_normed.T) # (M, N)
distances = 1 - cosine_similarities
else:
raise ValueError(
f"Embeddings must be 1D or 2D, but received "
f"source shape: {source_representation.shape}, test shape: {test_representation.shape}"
)
return distances
def find_euclidean_distance( def find_euclidean_distance(
source_representation: Union[np.ndarray, list], test_representation: Union[np.ndarray, list] source_representation: Union[np.ndarray, list], test_representation: Union[np.ndarray, list]
) -> np.float64: ) -> Union[np.float64, np.ndarray]:
""" """
Find euclidean distance between two given vectors Find Euclidean distance between two vectors or batches of vectors.
Args: Args:
source_representation (np.ndarray or list): 1st vector source_representation (np.ndarray or list): 1st vector or batch of vectors.
test_representation (np.ndarray or list): 2nd vector test_representation (np.ndarray or list): 2nd vector or batch of vectors.
Returns
distance (np.float64): calculated euclidean distance Returns:
np.float64 or np.ndarray: Euclidean distance(s).
Returns a np.float64 for single embeddings and np.ndarray for batch embeddings.
""" """
if isinstance(source_representation, list): # Convert inputs to numpy arrays if necessary
source_representation = np.array(source_representation) source_representation = np.asarray(source_representation)
test_representation = np.asarray(test_representation)
if isinstance(test_representation, list): # Single embedding case (1D arrays)
test_representation = np.array(test_representation) if source_representation.ndim == 1 and test_representation.ndim == 1:
distances = np.linalg.norm(source_representation - test_representation)
return np.linalg.norm(source_representation - test_representation) # Batch embeddings case (2D arrays)
elif source_representation.ndim == 2 and test_representation.ndim == 2:
diff = (
source_representation[None, :, :] - test_representation[:, None, :]
) # (N, D) - (M, D) = (M, N, D)
distances = np.linalg.norm(diff, axis=2) # (M, N)
else:
raise ValueError(
f"Embeddings must be 1D or 2D, but received "
f"source shape: {source_representation.shape}, test shape: {test_representation.shape}"
)
return distances
def l2_normalize( def l2_normalize(
@ -315,8 +343,8 @@ def l2_normalize(
Returns: Returns:
np.ndarray: l2 normalized vector np.ndarray: l2 normalized vector
""" """
if isinstance(x, list): # Convert inputs to numpy arrays if necessary
x = np.array(x) x = np.asarray(x)
norm = np.linalg.norm(x, axis=axis, keepdims=True) norm = np.linalg.norm(x, axis=axis, keepdims=True)
return x / (norm + epsilon) return x / (norm + epsilon)
@ -325,23 +353,39 @@ def find_distance(
alpha_embedding: Union[np.ndarray, list], alpha_embedding: Union[np.ndarray, list],
beta_embedding: Union[np.ndarray, list], beta_embedding: Union[np.ndarray, list],
distance_metric: str, distance_metric: str,
) -> np.float64: ) -> Union[np.float64, np.ndarray]:
""" """
Wrapper to find distance between vectors according to the given distance metric Wrapper to find the distance between vectors based on the specified distance metric.
Args: Args:
source_representation (np.ndarray or list): 1st vector alpha_embedding (np.ndarray or list): 1st vector or batch of vectors.
test_representation (np.ndarray or list): 2nd vector beta_embedding (np.ndarray or list): 2nd vector or batch of vectors.
Returns distance_metric (str): The type of distance to compute
distance (np.float64): calculated cosine distance ('cosine', 'euclidean', or 'euclidean_l2').
Returns:
np.float64 or np.ndarray: The calculated distance(s).
""" """
# Convert inputs to numpy arrays if necessary
alpha_embedding = np.asarray(alpha_embedding)
beta_embedding = np.asarray(beta_embedding)
# Ensure that both embeddings are either 1D or 2D
if alpha_embedding.ndim != beta_embedding.ndim or alpha_embedding.ndim not in (1, 2):
raise ValueError(
f"Both embeddings must be either 1D or 2D, but received "
f"alpha shape: {alpha_embedding.shape}, beta shape: {beta_embedding.shape}"
)
if distance_metric == "cosine": if distance_metric == "cosine":
distance = find_cosine_distance(alpha_embedding, beta_embedding) distance = find_cosine_distance(alpha_embedding, beta_embedding)
elif distance_metric == "euclidean": elif distance_metric == "euclidean":
distance = find_euclidean_distance(alpha_embedding, beta_embedding) distance = find_euclidean_distance(alpha_embedding, beta_embedding)
elif distance_metric == "euclidean_l2": elif distance_metric == "euclidean_l2":
distance = find_euclidean_distance( axis = None if alpha_embedding.ndim == 1 else 1
l2_normalize(alpha_embedding), l2_normalize(beta_embedding) normalized_alpha = l2_normalize(alpha_embedding, axis=axis)
) normalized_beta = l2_normalize(beta_embedding, axis=axis)
distance = find_euclidean_distance(normalized_alpha, normalized_beta)
else: else:
raise ValueError("Invalid distance_metric passed - ", distance_metric) raise ValueError("Invalid distance_metric passed - ", distance_metric)
return np.round(distance, 6) return np.round(distance, 6)