Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement adaptive thresholding #31

Merged
merged 12 commits into from
May 3, 2022
201 changes: 181 additions & 20 deletions facenet.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Facial recognition with FaceNet in Keras, TensorFlow, or TensorRT.
"""

from copy import copy
import json
import os
from timeit import default_timer as timer
import threading
import random

import cv2
import numpy as np
Expand All @@ -31,7 +33,13 @@
retrieve_embeds,
get_frozen_graph,
)
from util.common import DB_LOB, DEFAULT_MODEL, EMBED_KEY_PATH, NAME_KEY_PATH
from util.common import (
DB_LOB,
DEFAULT_MODEL,
EMBED_KEY_PATH,
NAME_KEY_PATH,
name_cleanup,
)
from util.pbar import ProgressBar
from util.visuals import Camera, GraphicsRenderer
from util.log import Logger
Expand Down Expand Up @@ -89,6 +97,8 @@ def __init__(
print(f"[DEBUG] inference backend is {self.mode}")

self._db = {}
self._db_threshold = {}
self._db_threshold_stripped = {}
self.classifier = None
self.classifier_type = classifier

Expand All @@ -103,7 +113,7 @@ def __init__(

@property
def data(self):
"""Property for static database
"""Property for static database of embeddings
:returns: self._db
"""

Expand All @@ -118,6 +128,13 @@ def metadata(self):
"img_norm": self.img_norm,
}

@property
def data_threshold(self):
"""Property for static database of thresholds
:returns: self._db_threshold
"""
return self._db_threshold

def _keras_init(self, filepath):
"""Initializes a Keras model
:param filepath: path to model (.h5)
Expand Down Expand Up @@ -210,6 +227,7 @@ def add_entry(self, person, embeddings, train_classifier=True):

embeds = np.array(embeddings).reshape(len(embeddings), -1)
self._db[person] = embeds
self._db_threshold[person] = 0

stripped = strip_id(person)
self._stripped_names.append(stripped)
Expand Down Expand Up @@ -267,6 +285,7 @@ def set_data(self, data, metadata):
if data:
for person, embed in data.items():
self.add_entry(person, embed, train_classifier=False)
self.apply_thresholds()
self._train_classifier()

def _train_classifier(self):
Expand Down Expand Up @@ -345,8 +364,6 @@ def predict(self, img, detector, margin=10, flip=False, verbose=True):

cropped_faces, face_coords = detector.crop_face(img, margin, flip, verbose)
if cropped_faces is None:
if verbose:
print("No face detected")
return None, None

start = timer()
Expand All @@ -359,16 +376,17 @@ def predict(self, img, detector, margin=10, flip=False, verbose=True):
elapsed = round(1000.0 * (timer() - start), 2)
time = colored(f"{elapsed} ms", attrs=["bold"])
vecs = f"{len(embeds)} vector{'s' if len(embeds) > 1 else ''}"
print(f"Embedding time ({vecs}): " + time)
print(f"Embedding time ({vecs}): {time}")

return embeds, face_coords

def recognize(self, img, *args, verbose=True, **kwargs):
def recognize(self, img, *args, verbose=True, mode="default", **kwargs):
"""Facial recognition
:param img: image array in BGR mode
:param args: will be passed to self.predict
:param verbose: verbose or not (default: True)
:param kwargs: will be passed to self.predict
:param mode: ["default", "adaptive_threshold"]
:returns: face, is recognized, best match, time elapsed
"""
start = timer()
Expand All @@ -380,19 +398,36 @@ def recognize(self, img, *args, verbose=True, **kwargs):
try:
embeds, face = self.predict(img, *args, **kwargs, verbose=verbose)
if embeds is not None:
best_match = self.classifier.predict(embeds)[0]

nearest = self._stripped_db[best_match]
dists = self.dist_metric.distance(embeds, nearest, True)
dist = np.average(dists)
is_recognized = dist <= self.alpha

if verbose and dist:
info = colored(
f"{round(dist, 4)} ({best_match})",
color="green" if is_recognized else "red",
)
print(f"{self.dist_metric}: {info}")
intruder = self.is_intruder(embeds)
if not intruder:
if mode == "adaptive_threshold":
best_match = self.classifier.predict(embeds)[0]

other = np.average(self._stripped_db[best_match], axis=0)
simliarity_score = self.compute_similarity(embeds, other)
threshold = np.average(self._db_threshold_stripped[best_match])
is_recognized = simliarity_score >= threshold

if verbose and simliarity_score:
info = colored(
f"{round(simliarity_score, 4)} > {round(threshold, 4)} ({best_match})",
color="green" if is_recognized else "red",
)
print(f"adaptive thresholding: {info}")
elif mode == "default":
best_match = self.classifier.predict(embeds)[0]

nearest = self._stripped_db[best_match]
dists = self.dist_metric.distance(embeds, nearest, True)
dist = np.average(dists)
is_recognized = dist <= self.alpha

if verbose and dist:
info = colored(
f"{round(dist, 4)} ({best_match})",
color="green" if is_recognized else "red",
)
print(f"{self.dist_metric}: {info}")

except (ValueError, cv2.error) as error:
incompatible = "query data dimension"
Expand All @@ -416,6 +451,7 @@ def real_time_recognize(
graphics=True,
socket=None,
mtcnn_stride=1,
mode="default"
):
"""Real-time facial recognition
:param width: width of frame (default: 640)
Expand All @@ -426,6 +462,7 @@ def real_time_recognize(
:param graphics: whether or not to use graphics (default: True)
:param socket: socket (dev) (default: None)
:param mtcnn_stride: stride frame stride (default: 1)
:param mode: ["default", "adaptive_threshold"] (default: "default)
"""

assert self._db, "data must be provided"
Expand All @@ -448,7 +485,7 @@ def real_time_recognize(
frame = cv2.resize(frame, (0, 0), fx=resize, fy=resize)

# facial detection and recognition
info = self.recognize(frame, detector, flip=flip)
info = self.recognize(frame, detector, flip=flip, mode=mode)
face, is_recognized, best_match, elapsed = info

# logging and socket
Expand All @@ -467,3 +504,127 @@ def real_time_recognize(

cap.release()
cv2.destroyAllWindows()

def compute_similarity(self, embedding1, embedding2) -> float:
"""Calculates the similarity score.

Parameters:
embedding1 (embedding): The first embedding.
embedding2 (embedding): The second embedding to be compared with.

Returns (float):
Similarity score.

"""
return 1 - self.dist_metric.distance(embedding1, embedding2, True)[0]

def find_threshold(self, person) -> float:
"""Calculates the adaptive threshold for each person.

Parameters:
person (str): Person's name.

Returns (float):
The threshold value.

"""
embedding = self.data[person]
compares = []
people = copy(self.data)
del people[person]
people_thresholds = people.values()
for x in people_thresholds:
s = self.compute_similarity(embedding, x)
compares.append(s)

return np.max(np.std(compares))

def apply_thresholds(self) -> None:
"""Applys the threshold values to every person in the database."""
people = list(self.data.keys())
i = 0
for person in people:
thresholds = [0]
if i != 0:
thresholds = [0, self.find_threshold(people[i - 1])]

for j in range(len(people)):
person_name1 = name_cleanup(people[j])
person_name2 = name_cleanup(person)
if person_name1 != person_name2:
thresholds.append(
self.compute_similarity(self.data[people[j]], self.data[person])
)

self._db_threshold[person] = np.max(thresholds)
if name_cleanup(person) in list(self._db_threshold_stripped.keys()):
self._db_threshold_stripped[name_cleanup(person)].append(
self._db_threshold[person]
)
else:
self._db_threshold_stripped[name_cleanup(person)] = []
i += 1

def find_similar_embedding(self, embedding) -> int:
"""Returns index of similar embedding from self.data.

Parameters:
embedding (embedding): Embedding to be compared with.

"""
compares = []
for x in self.data.values():
s = self.compute_similarity(embedding, x)
compares.append(s)
return np.argmax(compares)

def is_intruder(self, embedding) -> bool:
"""Returns a boolean if the person's embedding is registered
in the database or not.

Parameters:
embedding (embedding): Embedding to be compared with.

Returns (bool):
Is intruder or not.

"""
simliar_index = self.find_similar_embedding(embedding)
other = list(self.data.values())[simliar_index]
simliarity_score = self.compute_similarity(embedding, other)
threshold = self.data_threshold[list(self.data.keys())[simliar_index]]

return simliarity_score < threshold

def adapt_evaluation(self, embedding, detected_person) -> bool:
"""This was in the article about adaptive thresholding.
https://arxiv.org/pdf/1810.11160.pdf

Parameters:
embedding (embedding): Embedding that is inputted into the recognition program
detected_person (str): The detected person's key in the database

Return (str):
Case type.

"""
simliar_index = self.find_similar_embedding(embedding)
other_key = list(self.data.keys())[simliar_index]
other_val = list(self.data.values())[simliar_index]
simliarity_score = self.compute_similarity(embedding, other_val)
threshold = self.data_threshold[list(self.data.keys())[simliar_index]]

case_type = None
if simliarity_score >= threshold and name_cleanup(other_key) == detected_person:
case_type = "true accept"

if simliarity_score < threshold and other_key in list(self.data.keys()):
case_type = "false reject"

if simliarity_score >= threshold and other_key not in list(self.data.keys()):
case_type = "false accept"

if simliarity_score < threshold and other_key not in list(self.data.keys()):
case_type = "true reject"

return case_type, simliarity_score
2 changes: 1 addition & 1 deletion scripts/facenet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

facenet = FaceNet()
facenet.real_time_recognize(
detector=detector, graphics=graphics, mtcnn_stride=mtcnn_stride, resize=resize
detector=detector, graphics=graphics, mtcnn_stride=mtcnn_stride, resize=resize, mode="adaptive_threshold"
)
4 changes: 4 additions & 0 deletions util/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@
ON_JETSON = platform.machine() == "aarch64"

HAS_RS = importlib.util.find_spec("pyrealsense2") is not None


def name_cleanup(name: str) -> str:
return name.split("-")[0]
3 changes: 0 additions & 3 deletions util/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,4 @@ def crop_face(self, img_bgr, margin, flip: bool = False, verbose: bool = True):
confidence = round(face["confidence"] * 100, 2)
print(f"{confidence}% detect confidence (too low)")

elif verbose:
print("No face detected")

return resized_faces, face
2 changes: 1 addition & 1 deletion util/distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class DistMetric:
def __init__(
self, metric, normalize: bool = True, mean: None | float = None
self, metric, normalize: bool = True, mean: None or float = None
) -> None:
assert metric in ("cosine", "euclidean"), f"{metric} not supported"
if metric == "cosine":
Expand Down