import time begin = time.time() from datetime import datetime from deepface import DeepFace import cv2 import os import sys import numpy as np import re import pandas as pd from typing import Union import base64 from enum import Enum import requests import logging from deepface.modules.streaming import search_identity from deepface.modules import verification, detection from deepface.modules import preprocessing import pickle from fastapi import FastAPI from pydantic import BaseModel # setting Logger current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_dir = f"{os.path.dirname(os.path.abspath(__file__))}/log" os.makedirs(log_dir, exist_ok=True) logging.basicConfig(filename=f'{log_dir}/{current_time}.log', level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # @@@@ logger.info(f"import package use time: {(time.time()-begin):.2f} s") begin = time.time() # db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" # detector_backend = "opencv" # distance_metric = "euclidean_l2" # model_name = "GhostFaceNet" # model: FacialRecognition = DeepFace.build_model(model_name=model_name) # target_size = model.input_shape # threshold = verification.find_threshold(model_name, distance_metric) # print("begin3 time: ", time.time() - start) # start3 = time.time() class Opt(Enum): ADD = 1 DELETE = 2 CHANGE = 3 SEARCH = 4 class Ftype(Enum): NDARRAY = 0 PATH = 1 BASE64 = 2 URL = 3 class Status(Enum): HTTP_ERROR = "http error" # http 检索报错 DELETE_FINISH = "delete finished" BASE64_EMPTY = "base64 is empty" NotImplementedError = "Not Implemented Error" ADD_HAS_EXISTED = "when adding, file has existed" DB_NO_FACE = "face_db has no face" RUNTIME_ERROR = "runtime error" IMG_NO_FACE = "no face detected. Blurry Image, Non-frontal View or Face Pixels too small?" GLOBAL_DICT = { "ADD": Opt.ADD, "DELETE":Opt.DELETE, "CHANGE":Opt.CHANGE, "SEARCH":Opt.SEARCH, "NDARRAY": Ftype.NDARRAY, "PATH":Ftype.PATH, "BASE64":Ftype.BASE64, "URL":Ftype.URL } class FaceHelper: def __init__(self, db_path: str, detector_backend: str, distance_metric: str, model_name: str): self.db_path = db_path self.detector_backend = detector_backend self.distance_metric = distance_metric self.model_name = model_name self.model = DeepFace.build_model(model_name=self.model_name) self.target_size = self.model.input_shape self.threshold = verification.find_threshold(self.model_name, self.distance_metric) file_parts = [ "ds", "model", self.model_name, "detector",self.detector_backend, "aligned", "normalization_base", "expand_0"] self.pkl_name = ("_".join(file_parts)+".pkl").replace("-", "").lower() self.dirty = True self.db_representations = None self.keepDBLive() def get_facial_areas(self, img: np.ndarray): """ Find facial area coordinates in the given image Args: img (np.ndarray): image itself detector_backend (string): face detector backend. Options: 'opencv', 'retinaface', 'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv). target_size (tuple): input shape of the facial recognition model. Returns result (list): list of tuple with x, y, w and h coordinates detected_face descriptions """ logger.info("Detecting faces using backend: %s", self.detector_backend) # @@@@ try: face_objs = detection.extract_faces( img_path = img, # target_size = self.target_size, detector_backend = self.detector_backend, enforce_detection = True, align = True, expand_percentage = 0, grayscale = False, # human_readable = False, ) # print("detection.extract_faces. use time: ", time.time()-beg ) # beg = time.time() faces_coordinates = [ ( face_obj["facial_area"]["x"], face_obj["facial_area"]["y"], face_obj["facial_area"]["w"], face_obj["facial_area"]["h"], ) for face_obj in face_objs # if face_obj["facial_area"]["w"] > threshold ] faces = [ face_obj["face"] for face_obj in face_objs ] # faces = [ preprocessing.resize_image( face_obj["face"][:,:,::-1],(self.target_size[1],self.target_size[0]) ) for face_obj in face_objs ] descriptions = [] for x,y,w,h in faces_coordinates: if min(w,h)<40: descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.") else: descriptions.append("success") return faces_coordinates, faces, descriptions except ValueError as ve: # to avoid exception if no face detected return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"] except Exception as e: logger.error("Error detecting faces: %s", str(ve)) # @@@@ return Status.RUNTIME_ERROR @staticmethod def process_img(img, img_mode: Ftype): target_img = None try: if img_mode == Ftype.NDARRAY: # np.ndarray格式 target_img = img elif img_mode == Ftype.PATH: # img path target_img = cv2.imread(img) elif img_mode == Ftype.BASE64: # base64 encoded_data_parts = img.split(",") if len(encoded_data_parts) <= 0: # raise "base64 is empty" return Status.BASE64_EMPTY encoded_data = encoded_data_parts[-1] nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8) target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) elif img_mode == Ftype.URL: # url try: # response = requests.get(url=img,stream=True,timeout=60) response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None}) response.raise_for_status() image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8) target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) except requests.HTTPError as http_err: # print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息 return Status.HTTP_ERROR else: return Status.NotImplementedError return target_img except Exception as e: # @@@@ logger.error("Error processing image: %s", str(e)) # @@@@ return Status.RUNTIME_ERROR # @@@@ # interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间 def updateFaceDB(self, img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ): """ description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg` args: - img: str or np.ndarray - img_name: str - img_mode: 0 np.ndarray; 1 img_path 2 base64 - opt_mode: 0 add; 1 delete; 2 change ret: - retjson: dict. """ try: retjson = { "status": None, # 入库成功 1 or 失败 0 "detected_img": None, "description": "None", } if opt_mode == Opt.DELETE: #delete try: os.remove(os.path.join(self.db_path, img_name+'.jpg')) except FileNotFoundError: pass return Status.DELETE_FINISH ret = FaceHelper.process_img(img, img_mode) if isinstance(ret, Status): return ret else: target_img = ret face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) detected_img = target_img.copy() for x,y,w,h in face_coors: cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3) success, encoded_detected_img = cv2.imencode('.png', detected_img) if success: detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8") if len(face_coors) >= 2: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face elif len(face_coors) <= 0: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face else: # only one face is detected retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success if retjson['status'] == 0: return retjson db_imgs = [db_img.split(".")[0] for db_img in os.listdir(self.db_path) if db_img.endswith(".jpg")] if opt_mode == Opt.ADD: # add if img_name in db_imgs: return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name" cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) elif opt_mode == Opt.CHANGE: # change cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore. else: return Status.NotImplementedError # update the face_db to keep latest self.dirty = True self.keepDBLive() return retjson except Exception as e: # @@@@ logger.error("Error update FaceDB: %s", str(e)) # @@@@ return Status.RUNTIME_ERROR # @@@@ #interface 2: face recognition using embeddings on face_db def faceRecog(self, img: Union[str,np.ndarray], img_mode: Ftype): try: begin = time.time() # keep the database is the newest if self.dirty: self.keepDBLive() if len(self.db_representations) <= 0: return Status.DB_NO_FACE # process the img ret = FaceHelper.process_img(img, img_mode) if isinstance(ret, Status): return ret target_img = ret # begin face recognition df = pd.DataFrame(self.db_representations) labels = [] logger.info(f"in faceRecog. before get_facial_area use time: {time.time()-begin:.2f} s") begin = time.time() face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) if face_coors == [] or detected_faces == []: return Status.IMG_NO_FACE logger.info(f"in faceRecog. get_facial_area use time: {time.time()-begin:.2f} s") begin = time.time() for idx, (x,y,w,h) in enumerate(face_coors): detected_face = detected_faces[idx] detected_face = preprocessing.resize_image(img = detected_face, target_size=(self.target_size[1],self.target_size[0])) unknown_representation = self.model.forward( preprocessing.normalize_input(img=detected_face,normalization='base') ) distances = [] result_df = df.copy() for _, instance in df.iterrows(): source_representation = instance['embedding'] if source_representation is None : distances.append(float('inf')) continue assert len(unknown_representation) == len(source_representation), 'wrong len of embedding ' distance = verification.find_distance(source_representation, unknown_representation, self.distance_metric) distances.append(distance) result_df['distance'] = distances result_df = result_df.drop(columns=['embedding']) result_df = result_df[result_df['distance'] < self.threshold] if result_df.shape[0] <= 0: labels.append("unknown") else: result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True) target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"] target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}" labels.append(target_label) logger.info(f"in faceRecog. face forward and facereg use time: {time.time()-begin:.2f} s") begin = time.time() # draw the face frame for easy to view for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)): color = (0,0,255) if label == 'unknown' else (255,0,0) cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3) cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1) success, encoded_target_img = cv2.imencode('.png', target_img) if success: target_img_base64 = base64.b64encode(encoded_target_img).decode("utf-8") logger.info(f"in faceRecog. draw and trans img use time: {time.time()-begin:.2f} s") begin = time.time() return labels, face_coors, target_img_base64 except Exception as e: # @@@@ logger.error("Error update FaceDB: %s", str(e)) # @@@@ return Status.RUNTIME_ERROR # @@@@ def keepDBLive(self): # keep the database is the newest search_identity( detected_face=np.zeros([224, 224, 3]), db_path=self.db_path, detector_backend=self.detector_backend, distance_metric=self.distance_metric, model_name=self.model_name, ) self.dirty = False # load the representations from face_db datestore_path = os.path.join(self.db_path, self.pkl_name) with open(datestore_path, "rb") as f: self.db_representations = pickle.load(f) return len(self.db_representations) class config(BaseModel): db_path: str # default: "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" detector_backend: str #default: distance_metric: str model_name: str class item(BaseModel): TYPE: str img: str img_name: str default_db_path = "./face" default_detector_backend = "retinaface" default_distance_metric = "euclidean_l2" default_model_name = "GhostFaceNet" facehelper = FaceHelper( db_path = default_db_path,# "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face", detector_backend = default_detector_backend, # "opencv", distance_metric = default_distance_metric, # "euclidean_l2", model_name = default_model_name, # "GhostFaceNet" , ) app = FastAPI() @app.post("/init/") def init(input: config): begin = time.time() global default_db_path, default_detector_backend, default_distance_metric, default_model_name global facehelper db_path = default_db_path if input.db_path.lower() in ["string","None",""] else input.db_path detector_backend = default_detector_backend if input.detector_backend.lower() in ["string","None",""] else input.detector_backend distance_metric = default_distance_metric if input.distance_metric.lower() in ["string","None",""] else input.distance_metric model_name = default_model_name if input.model_name.lower() in ["string","None",""] else input.model_name facehelper = FaceHelper( db_path = db_path, detector_backend = detector_backend, distance_metric = distance_metric, model_name = model_name, ) logger.info(f"init use time: {time.time()-begin:.2f} s") return {"status": 1, "description": f"success init. {detector_backend}@{distance_metric}@{model_name}"} @app.post("/keepDBLatest") def keepDBLastest(): begin = time.time() global facehelper ret = facehelper.keepDBLive() logger.info(f"keepDBLatest use time: {time.time()-begin:.2f} s") return {"status":1, "description": f"the faceDB is latest. db has {ret} faces"} @app.post("/interface/") def main(input: item): begin = time.time() global facehelper vars = input.TYPE.split("_") if len(vars) != 3: return {"status": 0, "description":"wrong TYPE"} interface_type, img_mode, opt_mode = vars try: img_mode = GLOBAL_DICT[img_mode] except KeyError: return {"status": 0, "description": "wrong img_mode"} try: opt_mode = GLOBAL_DICT[opt_mode] except KeyError: return {"status": 0, "description": "wrong opt_mode"} if interface_type == "UPDATEDB": # call updateFaceDB for handling the picture ret = facehelper.updateFaceDB(img = input.img, img_name = input.img_name, img_mode = img_mode, opt_mode = opt_mode, ) logger.info(f"update dbface time: {time.time()-begin:.2f} s") if isinstance(ret, Status): # means error happened return {"status": 0, "description":ret.value} else: return {"status": ret["status"], "description": ret["description"] } elif interface_type == "FACEID": ret = facehelper.faceRecog( img = input.img, img_mode = img_mode, ) logger.info(f"face reg use time: {time.time()-begin:.2f} s") if isinstance(ret, Status): # means error happened return {"status": 0, "description":ret.value} else: return {"status": 1, "name": ret[0], "position": ret[1], "resImg": ret[2]} else: return {"status": 0, "description": "wrong inferface_type, which just includes {'UPDATEDB','FACEID'}" } if __name__ == "__main__": detector_backend = sys.argv[1] dfs = DeepFace.find(img_path="C:/Users/JIALE/Desktop/zly.jpg", model_name="GhostFaceNet", detector_backend=detector_backend, db_path=r"C:\Users\JIALE\Desktop\bns_proj\proj_deepface\face") pass