461 lines
19 KiB
Python
461 lines
19 KiB
Python
import time
|
|
begin = time.time()
|
|
from datetime import datetime
|
|
from deepface import DeepFace
|
|
import cv2
|
|
import os
|
|
import sys
|
|
import numpy as np
|
|
import re
|
|
import pandas as pd
|
|
from typing import Union
|
|
import base64
|
|
from enum import Enum
|
|
import requests
|
|
import logging
|
|
|
|
from deepface.modules.streaming import search_identity
|
|
from deepface.modules import verification, detection
|
|
from deepface.modules import preprocessing
|
|
import pickle
|
|
from fastapi import FastAPI
|
|
from pydantic import BaseModel
|
|
|
|
# setting Logger
|
|
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
log_dir = f"{os.path.dirname(os.path.abspath(__file__))}/log"
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
logging.basicConfig(filename=f'{log_dir}/{current_time}.log', level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__) # @@@@
|
|
|
|
logger.info(f"import package use time: {(time.time()-begin):.2f} s")
|
|
begin = time.time()
|
|
|
|
# db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face"
|
|
# detector_backend = "opencv"
|
|
# distance_metric = "euclidean_l2"
|
|
# model_name = "GhostFaceNet"
|
|
# model: FacialRecognition = DeepFace.build_model(model_name=model_name)
|
|
# target_size = model.input_shape
|
|
# threshold = verification.find_threshold(model_name, distance_metric)
|
|
|
|
# print("begin3 time: ", time.time() - start)
|
|
# start3 = time.time()
|
|
|
|
class Opt(Enum):
|
|
ADD = 1
|
|
DELETE = 2
|
|
CHANGE = 3
|
|
SEARCH = 4
|
|
|
|
class Ftype(Enum):
|
|
NDARRAY = 0
|
|
PATH = 1
|
|
BASE64 = 2
|
|
URL = 3
|
|
|
|
class Status(Enum):
|
|
HTTP_ERROR = "http error" # http 检索报错
|
|
DELETE_FINISH = "delete finished"
|
|
BASE64_EMPTY = "base64 is empty"
|
|
NotImplementedError = "Not Implemented Error"
|
|
ADD_HAS_EXISTED = "when adding, file has existed"
|
|
DB_NO_FACE = "face_db has no face"
|
|
RUNTIME_ERROR = "runtime error"
|
|
IMG_NO_FACE = "no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"
|
|
|
|
GLOBAL_DICT = {
|
|
"ADD": Opt.ADD,
|
|
"DELETE":Opt.DELETE,
|
|
"CHANGE":Opt.CHANGE,
|
|
"SEARCH":Opt.SEARCH,
|
|
|
|
"NDARRAY": Ftype.NDARRAY,
|
|
"PATH":Ftype.PATH,
|
|
"BASE64":Ftype.BASE64,
|
|
"URL":Ftype.URL
|
|
}
|
|
|
|
class FaceHelper:
|
|
|
|
def __init__(self, db_path: str, detector_backend: str, distance_metric: str, model_name: str):
|
|
self.db_path = db_path
|
|
self.detector_backend = detector_backend
|
|
self.distance_metric = distance_metric
|
|
self.model_name = model_name
|
|
self.model = DeepFace.build_model(model_name=self.model_name)
|
|
self.target_size = self.model.input_shape
|
|
self.threshold = verification.find_threshold(self.model_name, self.distance_metric)
|
|
|
|
file_parts = [
|
|
"ds", "model", self.model_name, "detector",self.detector_backend,
|
|
"aligned",
|
|
"normalization_base",
|
|
"expand_0"]
|
|
self.pkl_name = ("_".join(file_parts)+".pkl").replace("-", "").lower()
|
|
self.dirty = True
|
|
self.db_representations = None
|
|
self.keepDBLive()
|
|
|
|
def get_facial_areas(self, img: np.ndarray):
|
|
"""
|
|
Find facial area coordinates in the given image
|
|
Args:
|
|
img (np.ndarray): image itself
|
|
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
|
|
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv).
|
|
target_size (tuple): input shape of the facial recognition model.
|
|
Returns
|
|
result (list): list of tuple with x, y, w and h coordinates
|
|
detected_face
|
|
descriptions
|
|
"""
|
|
logger.info("Detecting faces using backend: %s", self.detector_backend) # @@@@
|
|
try:
|
|
face_objs = detection.extract_faces(
|
|
img_path = img,
|
|
# target_size = self.target_size,
|
|
detector_backend = self.detector_backend,
|
|
enforce_detection = True,
|
|
align = True,
|
|
expand_percentage = 0,
|
|
grayscale = False,
|
|
# human_readable = False,
|
|
)
|
|
|
|
# print("detection.extract_faces. use time: ", time.time()-beg )
|
|
# beg = time.time()
|
|
|
|
faces_coordinates = [
|
|
(
|
|
face_obj["facial_area"]["x"],
|
|
face_obj["facial_area"]["y"],
|
|
face_obj["facial_area"]["w"],
|
|
face_obj["facial_area"]["h"],
|
|
)
|
|
for face_obj in face_objs
|
|
# if face_obj["facial_area"]["w"] > threshold
|
|
]
|
|
faces = [ face_obj["face"] for face_obj in face_objs ]
|
|
# faces = [ preprocessing.resize_image( face_obj["face"][:,:,::-1],(self.target_size[1],self.target_size[0]) ) for face_obj in face_objs ]
|
|
descriptions = []
|
|
for x,y,w,h in faces_coordinates:
|
|
if min(w,h)<40:
|
|
descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.")
|
|
else:
|
|
descriptions.append("success")
|
|
return faces_coordinates, faces, descriptions
|
|
except ValueError as ve: # to avoid exception if no face detected
|
|
return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"]
|
|
except Exception as e:
|
|
logger.error("Error detecting faces: %s", str(ve)) # @@@@
|
|
return Status.RUNTIME_ERROR
|
|
|
|
@staticmethod
|
|
def process_img(img, img_mode: Ftype):
|
|
target_img = None
|
|
try:
|
|
if img_mode == Ftype.NDARRAY: # np.ndarray格式
|
|
target_img = img
|
|
elif img_mode == Ftype.PATH: # img path
|
|
target_img = cv2.imread(img)
|
|
elif img_mode == Ftype.BASE64: # base64
|
|
encoded_data_parts = img.split(",")
|
|
if len(encoded_data_parts) <= 0:
|
|
# raise "base64 is empty"
|
|
return Status.BASE64_EMPTY
|
|
encoded_data = encoded_data_parts[-1]
|
|
nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8)
|
|
target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
elif img_mode == Ftype.URL: # url
|
|
try:
|
|
# response = requests.get(url=img,stream=True,timeout=60)
|
|
response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None})
|
|
response.raise_for_status()
|
|
image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8)
|
|
target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
|
except requests.HTTPError as http_err:
|
|
# print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息
|
|
return Status.HTTP_ERROR
|
|
else:
|
|
return Status.NotImplementedError
|
|
|
|
return target_img
|
|
|
|
except Exception as e: # @@@@
|
|
logger.error("Error processing image: %s", str(e)) # @@@@
|
|
return Status.RUNTIME_ERROR # @@@@
|
|
|
|
|
|
# interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间
|
|
def updateFaceDB(self, img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ):
|
|
"""
|
|
description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg`
|
|
args:
|
|
- img: str or np.ndarray
|
|
- img_name: str
|
|
- img_mode: 0 np.ndarray; 1 img_path 2 base64
|
|
- opt_mode: 0 add; 1 delete; 2 change
|
|
ret:
|
|
- retjson: dict.
|
|
"""
|
|
try:
|
|
retjson = {
|
|
"status": None, # 入库成功 1 or 失败 0
|
|
"detected_img": None,
|
|
"description": "None",
|
|
}
|
|
if opt_mode == Opt.DELETE: #delete
|
|
try:
|
|
os.remove(os.path.join(self.db_path, img_name+'.jpg'))
|
|
except FileNotFoundError:
|
|
pass
|
|
return Status.DELETE_FINISH
|
|
|
|
ret = FaceHelper.process_img(img, img_mode)
|
|
if isinstance(ret, Status):
|
|
return ret
|
|
else:
|
|
target_img = ret
|
|
|
|
face_coors, detected_faces, descriptions = self.get_facial_areas(target_img)
|
|
detected_img = target_img.copy()
|
|
for x,y,w,h in face_coors:
|
|
cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3)
|
|
success, encoded_detected_img = cv2.imencode('.png', detected_img)
|
|
if success:
|
|
detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8")
|
|
if len(face_coors) >= 2:
|
|
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face
|
|
elif len(face_coors) <= 0:
|
|
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face
|
|
else: # only one face is detected
|
|
retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success
|
|
|
|
if retjson['status'] == 0:
|
|
return retjson
|
|
|
|
db_imgs = [db_img.split(".")[0] for db_img in os.listdir(self.db_path) if db_img.endswith(".jpg")]
|
|
if opt_mode == Opt.ADD: # add
|
|
if img_name in db_imgs:
|
|
return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name"
|
|
cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img )
|
|
elif opt_mode == Opt.CHANGE: # change
|
|
cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore.
|
|
else:
|
|
return Status.NotImplementedError
|
|
|
|
# update the face_db to keep latest
|
|
self.dirty = True
|
|
self.keepDBLive()
|
|
|
|
return retjson
|
|
except Exception as e: # @@@@
|
|
logger.error("Error update FaceDB: %s", str(e)) # @@@@
|
|
return Status.RUNTIME_ERROR # @@@@
|
|
|
|
#interface 2: face recognition using embeddings on face_db
|
|
def faceRecog(self, img: Union[str,np.ndarray], img_mode: Ftype):
|
|
try:
|
|
begin = time.time()
|
|
# keep the database is the newest
|
|
if self.dirty:
|
|
self.keepDBLive()
|
|
if len(self.db_representations) <= 0:
|
|
return Status.DB_NO_FACE
|
|
|
|
# process the img
|
|
ret = FaceHelper.process_img(img, img_mode)
|
|
if isinstance(ret, Status):
|
|
return ret
|
|
target_img = ret
|
|
|
|
# begin face recognition
|
|
df = pd.DataFrame(self.db_representations)
|
|
labels = []
|
|
|
|
logger.info(f"in faceRecog. before get_facial_area use time: {time.time()-begin:.2f} s")
|
|
begin = time.time()
|
|
|
|
face_coors, detected_faces, descriptions = self.get_facial_areas(target_img)
|
|
if face_coors == [] or detected_faces == []:
|
|
return Status.IMG_NO_FACE
|
|
|
|
logger.info(f"in faceRecog. get_facial_area use time: {time.time()-begin:.2f} s")
|
|
begin = time.time()
|
|
|
|
for idx, (x,y,w,h) in enumerate(face_coors):
|
|
detected_face = detected_faces[idx]
|
|
detected_face = preprocessing.resize_image(img = detected_face, target_size=(self.target_size[1],self.target_size[0]))
|
|
unknown_representation = self.model.forward(
|
|
preprocessing.normalize_input(img=detected_face,normalization='base')
|
|
)
|
|
distances = []
|
|
result_df = df.copy()
|
|
for _, instance in df.iterrows():
|
|
source_representation = instance['embedding']
|
|
if source_representation is None :
|
|
distances.append(float('inf'))
|
|
continue
|
|
|
|
assert len(unknown_representation) == len(source_representation), 'wrong len of embedding '
|
|
distance = verification.find_distance(source_representation, unknown_representation, self.distance_metric)
|
|
distances.append(distance)
|
|
|
|
result_df['distance'] = distances
|
|
result_df = result_df.drop(columns=['embedding'])
|
|
result_df = result_df[result_df['distance'] < self.threshold]
|
|
if result_df.shape[0] <= 0:
|
|
labels.append("unknown")
|
|
else:
|
|
result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True)
|
|
target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"]
|
|
target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}"
|
|
labels.append(target_label)
|
|
|
|
logger.info(f"in faceRecog. face forward and facereg use time: {time.time()-begin:.2f} s")
|
|
begin = time.time()
|
|
|
|
# draw the face frame for easy to view
|
|
for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)):
|
|
color = (0,0,255) if label == 'unknown' else (255,0,0)
|
|
cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3)
|
|
cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1)
|
|
success, encoded_target_img = cv2.imencode('.png', target_img)
|
|
if success:
|
|
target_img_base64 = base64.b64encode(encoded_target_img).decode("utf-8")
|
|
|
|
logger.info(f"in faceRecog. draw and trans img use time: {time.time()-begin:.2f} s")
|
|
begin = time.time()
|
|
|
|
return labels, face_coors, target_img_base64
|
|
|
|
except Exception as e: # @@@@
|
|
logger.error("Error update FaceDB: %s", str(e)) # @@@@
|
|
return Status.RUNTIME_ERROR # @@@@
|
|
|
|
def keepDBLive(self):
|
|
# keep the database is the newest
|
|
search_identity(
|
|
detected_face=np.zeros([224, 224, 3]),
|
|
db_path=self.db_path,
|
|
detector_backend=self.detector_backend,
|
|
distance_metric=self.distance_metric,
|
|
model_name=self.model_name,
|
|
)
|
|
self.dirty = False
|
|
|
|
# load the representations from face_db
|
|
datestore_path = os.path.join(self.db_path, self.pkl_name)
|
|
with open(datestore_path, "rb") as f:
|
|
self.db_representations = pickle.load(f)
|
|
|
|
return len(self.db_representations)
|
|
|
|
class config(BaseModel):
|
|
db_path: str # default: "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face"
|
|
detector_backend: str #default:
|
|
distance_metric: str
|
|
model_name: str
|
|
|
|
class item(BaseModel):
|
|
TYPE: str
|
|
img: str
|
|
img_name: str
|
|
|
|
default_db_path = "./face"
|
|
default_detector_backend = "retinaface"
|
|
default_distance_metric = "euclidean_l2"
|
|
default_model_name = "GhostFaceNet"
|
|
|
|
facehelper = FaceHelper(
|
|
db_path = default_db_path,# "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face",
|
|
detector_backend = default_detector_backend, # "opencv",
|
|
distance_metric = default_distance_metric, # "euclidean_l2",
|
|
model_name = default_model_name, # "GhostFaceNet" ,
|
|
)
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
@app.post("/init/")
|
|
def init(input: config):
|
|
begin = time.time()
|
|
global default_db_path, default_detector_backend, default_distance_metric, default_model_name
|
|
global facehelper
|
|
|
|
db_path = default_db_path if input.db_path.lower() in ["string","None",""] else input.db_path
|
|
detector_backend = default_detector_backend if input.detector_backend.lower() in ["string","None",""] else input.detector_backend
|
|
distance_metric = default_distance_metric if input.distance_metric.lower() in ["string","None",""] else input.distance_metric
|
|
model_name = default_model_name if input.model_name.lower() in ["string","None",""] else input.model_name
|
|
|
|
facehelper = FaceHelper(
|
|
db_path = db_path,
|
|
detector_backend = detector_backend,
|
|
distance_metric = distance_metric,
|
|
model_name = model_name,
|
|
)
|
|
logger.info(f"init use time: {time.time()-begin:.2f} s")
|
|
|
|
return {"status": 1, "description": f"success init. {detector_backend}@{distance_metric}@{model_name}"}
|
|
|
|
|
|
@app.post("/keepDBLatest")
|
|
def keepDBLastest():
|
|
begin = time.time()
|
|
global facehelper
|
|
ret = facehelper.keepDBLive()
|
|
logger.info(f"keepDBLatest use time: {time.time()-begin:.2f} s")
|
|
return {"status":1, "description": f"the faceDB is latest. db has {ret} faces"}
|
|
|
|
|
|
@app.post("/interface/")
|
|
def main(input: item):
|
|
begin = time.time()
|
|
global facehelper
|
|
vars = input.TYPE.split("_")
|
|
if len(vars) != 3:
|
|
return {"status": 0, "description":"wrong TYPE"}
|
|
interface_type, img_mode, opt_mode = vars
|
|
try:
|
|
img_mode = GLOBAL_DICT[img_mode]
|
|
except KeyError:
|
|
return {"status": 0, "description": "wrong img_mode"}
|
|
try:
|
|
opt_mode = GLOBAL_DICT[opt_mode]
|
|
except KeyError:
|
|
return {"status": 0, "description": "wrong opt_mode"}
|
|
|
|
if interface_type == "UPDATEDB":
|
|
# call updateFaceDB for handling the picture
|
|
ret = facehelper.updateFaceDB(img = input.img,
|
|
img_name = input.img_name,
|
|
img_mode = img_mode,
|
|
opt_mode = opt_mode,
|
|
)
|
|
logger.info(f"update dbface time: {time.time()-begin:.2f} s")
|
|
if isinstance(ret, Status): # means error happened
|
|
return {"status": 0, "description":ret.value}
|
|
else:
|
|
return {"status": ret["status"], "description": ret["description"] }
|
|
elif interface_type == "FACEID":
|
|
ret = facehelper.faceRecog(
|
|
img = input.img,
|
|
img_mode = img_mode,
|
|
)
|
|
logger.info(f"face reg use time: {time.time()-begin:.2f} s")
|
|
if isinstance(ret, Status): # means error happened
|
|
return {"status": 0, "description":ret.value}
|
|
else:
|
|
return {"status": 1, "name": ret[0], "position": ret[1], "resImg": ret[2]}
|
|
else:
|
|
return {"status": 0, "description": "wrong inferface_type, which just includes {'UPDATEDB','FACEID'}" }
|
|
|
|
|
|
if __name__ == "__main__":
|
|
detector_backend = sys.argv[1]
|
|
dfs = DeepFace.find(img_path="C:/Users/JIALE/Desktop/zly.jpg", model_name="GhostFaceNet", detector_backend=detector_backend, db_path=r"C:\Users\JIALE\Desktop\bns_proj\proj_deepface\face")
|
|
|
|
pass
|