import time from deepface import DeepFace import cv2 import os import sys import numpy as np import re import pandas as pd from typing import Union import base64 from enum import Enum import requests from deepface.modules.streaming import search_identity from deepface.modules import verification, detection from deepface.models.FacialRecognition import FacialRecognition from deepface.modules import preprocessing from deepface.commons.logger import Logger import pickle from fastapi import FastAPI from pydantic import BaseModel # db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" # detector_backend = "opencv" # distance_metric = "euclidean_l2" # model_name = "GhostFaceNet" # model: FacialRecognition = DeepFace.build_model(model_name=model_name) # target_size = model.input_shape # threshold = verification.find_threshold(model_name, distance_metric) # print("begin3 time: ", time.time() - start) # start3 = time.time() class Opt(Enum): ADD = 1 DELETE = 2 CHANGE = 3 SEARCH = 4 class Ftype(Enum): NDARRAY = 0 PATH = 1 BASE64 = 2 URL = 3 class Status(Enum): SUCCESS = 1 # 成功 HTTP_ERROR = "http error" # http 检索报错 DELETE_FINISH = "delete finished" BASE64_EMPTY = "base64 is empty" NotImplementedError = "Not Implemented Error" ADD_HAS_EXISTED = "when adding, file has existed" DB_NO_FACE = "face_db has no face" GLOBAL_DICT = { "ADD": Opt.ADD, "DELETE":Opt.DELETE, "CHANGE":Opt.CHANGE, "SEARCH":Opt.SEARCH, "NDARRAY": Ftype.NDARRAY, "PATH":Ftype.PATH, "BASE64":Ftype.BASE64, "URL":Ftype.URL } class FaceHelper: def __init__(self, db_path: str, detector_backend: str, distance_metric: str, model_name: str): self.db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" self.detector_backend = "opencv" self.distance_metric = "euclidean_l2" self.model_name = "GhostFaceNet" self.model = DeepFace.build_model(model_name=model_name) self.target_size = self.model.input_shape self.threshold = verification.find_threshold(model_name, distance_metric) self.pkl_name = f"ds_{self.model_name}_{self.detector_backend}_v2.pkl".replace("-", "").lower() self.dirty = True self.db_representations = None self.keepDBLive() def get_facial_areas(self, img: np.ndarray): """ Find facial area coordinates in the given image Args: img (np.ndarray): image itself detector_backend (string): face detector backend. Options: 'opencv', 'retinaface', 'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv). target_size (tuple): input shape of the facial recognition model. Returns result (list): list of tuple with x, y, w and h coordinates detected_face descriptions """ try: face_objs = detection.extract_faces( img_path = img, target_size = self.target_size, detector_backend = self.detector_backend, enforce_detection = True, align = True, expand_percentage = 0, grayscale = False, human_readable = False, ) # print("detection.extract_faces. use time: ", time.time()-beg ) # beg = time.time() faces_coordinates = [ ( face_obj["facial_area"]["x"], face_obj["facial_area"]["y"], face_obj["facial_area"]["w"], face_obj["facial_area"]["h"], ) for face_obj in face_objs # if face_obj["facial_area"]["w"] > threshold ] faces = [ face_obj["face"] for face_obj in face_objs ] descriptions = [] for x,y,w,h in faces_coordinates: if min(w,h)<40: descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.") else: descriptions.append("success") return faces_coordinates, faces, descriptions except: # to avoid exception if no face detected return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"] @staticmethod def process_img(img, img_mode: Ftype): target_img = None if img_mode == Ftype.NDARRAY: # np.ndarray格式 target_img = img elif img_mode == Ftype.PATH: # img path target_img = cv2.imread(img) elif img_mode == Ftype.BASE64: # base64 encoded_data_parts = img.split(",") if len(encoded_data_parts) <= 0: # raise "base64 is empty" return Status.BASE64_EMPTY encoded_data = encoded_data_parts[-1] nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8) target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) elif img_mode == Ftype.URL: # url try: # response = requests.get(url=img,stream=True,timeout=60) response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None}) response.raise_for_status() image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8) target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) except requests.HTTPError as http_err: # print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息 return Status.HTTP_ERROR else: return Status.NotImplementedError return target_img # interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间 def updateFaceDB(self, img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ): """ description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg` args: - img: str or np.ndarray - img_name: str - img_mode: 0 np.ndarray; 1 img_path 2 base64 - opt_mode: 0 add; 1 delete; 2 change ret: - retjson: dict. """ retjson = { "status": None, # 入库成功 1 or 失败 0 "detected_img": None, "description": "None", } if opt_mode == Opt.DELETE: #delete try: os.remove(os.path.join(db_path, img_name+'.jpg')) except FileNotFoundError: pass return Status.DELETE_FINISH ret = FaceHelper.process_img(img, img_mode) if isinstance(ret, Status): return ret else: target_img = ret face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) detected_img = target_img.copy() for x,y,w,h in face_coors: cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3) success, encoded_detected_img = cv2.imencode('.png', detected_img) if success: detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8") if len(face_coors) >= 2: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face elif len(face_coors) <= 0: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face else: # only one face is detected retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success if retjson['status'] == 0: return retjson db_imgs = [db_img.split(".")[0] for db_img in os.listdir(self.db_path) if db_img.endswith(".jpg")] if opt_mode == Opt.ADD: # add if img_name in db_imgs: return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name" cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) elif opt_mode == Opt.CHANGE: # change cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore. else: return Status.NotImplementedError # update the face_db to keep latest self.dirty = True self.keepDBLive() return retjson #interface 2: face recognition using embeddings on face_db def faceRecog(self, img: Union[str,np.ndarray], img_mode: Ftype): # keep the database is the newest if self.dirty: self.keepDBLive() if len(self.db_representations) <= 0: return Status.DB_NO_FACE # process the img ret = FaceHelper.process_img(img, img_mode) if isinstance(ret, Status): return ret target_img = ret # begin face recognition df = pd.DataFrame(self.db_representations) labels = [] face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) for idx, (x,y,w,h) in enumerate(face_coors): detected_face = detected_faces[idx] unknown_representation = self.model.find_embeddings( preprocessing.normalize_input(img=detected_face,normalization='base') ) distances = [] result_df = df.copy() for _, instance in df.iterrows(): source_representation = instance['embedding'] if source_representation is None : distances.append(float('inf')) continue assert len(unknown_representation) == len(source_representation), 'wrong len of embedding ' distance = verification.find_distance(source_representation, unknown_representation, self.distance_metric) distances.append(distance) result_df['distance'] = distances result_df = result_df.drop(columns=['embedding']) result_df = result_df[result_df['distance'] < self.threshold] if result_df.shape[0] <= 0: labels.append("unknown") else: result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True) target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"] target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}" labels.append(target_label) # draw the face frame for easy to view for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)): color = (0,0,255) if label == 'unknown' else (255,0,0) cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3) cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1) success, encoded_target_img = cv2.imencode('.png', target_img) if success: target_img_base64 = base64.b64encode(encoded_target_img).decode("utf-8") return labels, face_coors, target_img_base64 def keepDBLive(self): # keep the database is the newest search_identity( detected_face=np.zeros([224, 224, 3]), db_path=self.db_path, detector_backend=self.detector_backend, distance_metric=self.distance_metric, model_name=self.model_name, ) self.dirty = False # load the representations from face_db datestore_path = os.path.join(self.db_path, self.pkl_name) with open(datestore_path, "rb") as f: self.db_representations = pickle.load(f) return len(self.db_representations) facehelper = FaceHelper( db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face", detector_backend = "opencv", distance_metric = "euclidean_l2", model_name = "GhostFaceNet" , ) app = FastAPI() class item(BaseModel): TYPE: str img: str img_name: str class config(BaseModel): db_path: str # default: "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" detector_backend: str #default: distance_metric: str model_name: str @app.post("/init/") def init(input: config): facehelper = FaceHelper( db_path = input.db_path, detector_backend = input.detector_backend, distance_metric = input.distance_metric, model_name = input.model_name, ) @app.get("/keepDBLastest") def keepDBLastest(): ret = facehelper.keepDBLive() return {"status":"the faceDB is lastest", "description": f"db has {ret} faces"} @app.post("/interface/") def main(input: item): vars = input.TYPE.split("_") if len(vars) != 3: return {"status": 0, "description":"wrong TYPE"} interface_type, img_mode, opt_mode = vars try: img_mode = GLOBAL_DICT[img_mode] except KeyError: return {"status": 0, "description": "wrong img_mode"} try: opt_mode = GLOBAL_DICT[opt_mode] except KeyError: return {"status": 0, "description": "wrong opt_mode"} if interface_type == "UPDATEDB": # call updateFaceDB for handling the picture ret = facehelper.updateFaceDB(img = input.img, img_name = input.img_name, img_mode = img_mode, opt_mode = opt_mode, ) if isinstance(ret, Status): # means error happened return {"status": 0, "description":ret.value} else: return {"status": ret["status"], "description": ret["description"] } elif interface_type == "FACEID": ret = facehelper.faceRecog( img = input.img, img_mode = img_mode, ) if isinstance(ret, Status): # means error happened return {"status": 0, "description":ret.value} else: return {"status": 1, "name": ret[0], "position": ret[1], "resImg": ret[2]} # print(f"name: {ret[0]}, position: {ret[1]}, resImg: {ret[2]}" ) else: # print(Status.NotImplementedError) return {"status": 0, "description": "wrong inferface_type, which just includes {'UPDATEDB','FACEID'}" } if __name__ == "__main__": input_data = sys.stdin.read() split_data = input_data.replace('\r\n', '\n').split('\n') # 将 `\r\n` 优先认定为换行符, 而不是认定为 `\r + 换行` img, img_name = "","" TYPE = sys.argv[1] if len(split_data) == 2: img = split_data[0] img_name = split_data[1] elif len(split_data) == 1: img = split_data[0] print(TYPE, img, img_name) interface_type, img_mode, opt_mode = TYPE.split("_") img_mode = GLOBAL_DICT[img_mode] if interface_type == "UPDATEDB": opt_mode = GLOBAL_DICT[opt_mode] # call updateFaceDB for handling the picture ret = updateFaceDB(img = img, img_name = img_name, img_mode = img_mode, opt_mode = opt_mode, ) if isinstance(ret, Status): # means error happened print(ret.value) else: print(f'status: {ret["status"]}, description: {ret["description"]}') elif interface_type == "FACEID": ret = faceRecog( img = img, img_mode = img_mode, ) success, encoded_res_img = cv2.imencode('.png', ret[2]) if success: res_img_base64 = base64.b64encode(encoded_res_img).decode("utf-8") print(f"name: {ret[0]}, position: {ret[1]}, resImg: {res_img_base64}" ) else: print(Status.NotImplementedError) """ img = sys.argv[1] img_name = sys.argv[2] img_mode = GLOBAL_DICT[ sys.argv[3] ] opt_mode = GLOBAL_DICT[ sys.argv[4] ] # call updateFaceDB for handling the picture ret = updateFaceDB(img = img, img_name = img_name, img_mode = img_mode, opt_mode = opt_mode, ) if isinstance(ret, Status): # means error happened print(ret.value) else: print('ret["status"], ret["description"]: ', ret["status"], ret["description"]) """ """ # use func to face recognition tic = time.time() img = sys.argv[1] # img_name = sys.argv[2] img_mode = GLOBAL_DICT[ sys.argv[2] ] # opt_mode = GLOBAL_DICT[ sys.argv[4] ] ret = faceRecog( img = img, img_mode = img_mode, ) tec = time.time() print(ret[0], ret[1], tec-tic) print("img.size = ", ret[2].shape) cv2.imshow("res",ret[2]) cv2.waitKey(0) """