from deepface import DeepFace import cv2 import time import os import sys import numpy as np import re import pandas as pd from typing import Union import base64 from enum import Enum import requests from deepface.modules.streaming import ( build_demography_models, build_facial_recognition_model, search_identity, highlight_facial_areas, grab_facial_areas, extract_facial_areas, perform_facial_recognition, perform_demography_analysis, ) from deepface.modules import recognition, verification, detection, representation from deepface.models.FacialRecognition import FacialRecognition from deepface.modules import preprocessing from deepface.commons.logger import Logger import pickle db_path = "./face" detector_backend = "opencv" distance_metric = "euclidean_l2" model_name = "GhostFaceNet" model: FacialRecognition = DeepFace.build_model(model_name=model_name) target_size = model.input_shape threshold = verification.find_threshold(model_name, distance_metric) class Opt(Enum): ADD = 1 DELETE = 2 CHANGE = 3 SEARCH = 4 class Ftype(Enum): NDARRAY = 0 PATH = 1 BASE64 = 2 URL = 3 class Status(Enum): SUCCESS = 1 # 成功 HTTP_ERROR = "http error" # http 检索报错 DELETE_FINISH = "delete finished" BASE64_EMPTY = "base64 is empty" NotImplementedError = "Not Implemented Error" ADD_HAS_EXISTED = "when adding, file has existed" GLOBAL_DICT = { "Opt.ADD": Opt.ADD, "Opt.DELETE":Opt.DELETE, "Opt.CHANGE":Opt.CHANGE, "Opt.SEARCH":Opt.SEARCH, "Ftype.NDARRAY": Ftype.NDARRAY, "Ftype.PATH":Ftype.PATH, "Ftype.BASE64":Ftype.BASE64, "Ftype.URL":Ftype.URL } def get_facial_areas( img: np.ndarray, detector_backend: str, target_size, ): """ Find facial area coordinates in the given image Args: img (np.ndarray): image itself detector_backend (string): face detector backend. Options: 'opencv', 'retinaface', 'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv). target_size (tuple): input shape of the facial recognition model. Returns result (list): list of tuple with x, y, w and h coordinates detected_face descriptions """ print("get facial areas, img.shape: ", img.shape) beg = time.time() try: face_objs = detection.extract_faces( img_path=img, target_size=target_size, detector_backend=detector_backend, enforce_detection=True, align=True, expand_percentage=0, grayscale=False, human_readable=False, ) print("detection.extract_faces. use time: ", time.time()-beg ) beg = time.time() faces_coordinates = [ ( face_obj["facial_area"]["x"], face_obj["facial_area"]["y"], face_obj["facial_area"]["w"], face_obj["facial_area"]["h"], ) for face_obj in face_objs # if face_obj["facial_area"]["w"] > threshold ] faces = [ face_obj["face"] for face_obj in face_objs ] descriptions = [] for x,y,w,h in faces_coordinates: if min(w,h)<40: descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.") else: descriptions.append("success") return faces_coordinates, faces, descriptions except: # to avoid exception if no face detected return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"] def process_img(img, img_mode: Ftype): target_img = None if img_mode == Ftype.NDARRAY: # np.ndarray格式 target_img = img elif img_mode == Ftype.PATH: # img path target_img = cv2.imread(img) elif img_mode == Ftype.BASE64: # base64 encoded_data_parts = img.split(",") if len(encoded_data_parts) <= 0: # raise "base64 is empty" return Status.BASE64_EMPTY encoded_data = encoded_data_parts[-1] nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8) target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) elif img_mode == Ftype.URL: # url try: # response = requests.get(url=img,stream=True,timeout=60) response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None}) response.raise_for_status() image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8) target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) except requests.HTTPError as http_err: # print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息 return Status.HTTP_ERROR else: return Status.NotImplementedError return target_img # interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间 def updateFaceDB(img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ): """ description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg` args: - img: str or np.ndarray - img_name: str - img_mode: 0 np.ndarray; 1 img_path 2 base64 - opt_mode: 0 add; 1 delete; 2 change ret: - retjson: dict. """ retjson = { "status": None, # 入库成功 1 or 失败 0 "detected_img": None, "description": "None", } if opt_mode == Opt.DELETE: #delete try: os.remove(os.path.join(db_path, img_name+'.jpg')) except FileNotFoundError: pass return Status.DELETE_FINISH ret = process_img(img, img_mode) if isinstance(ret, Status): return ret else: target_img = ret face_coors, detected_faces, descriptions = get_facial_areas(target_img, detector_backend, target_size) detected_img = target_img.copy() for x,y,w,h in face_coors: cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3) success, encoded_detected_img = cv2.imencode('.png', detected_img) if success: detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8") if len(face_coors) >= 2: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face elif len(face_coors) <= 0: retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face else: # only one face is detected retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success if retjson['status'] == 0: return retjson db_imgs = os.listdir(db_path) db_imgs = [db_img.split(".")[0] for db_img in db_imgs] if opt_mode == Opt.ADD: # add if img_name in db_imgs: return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name" cv2.imwrite(os.path.join(db_path, img_name+'.jpg'), target_img ) elif opt_mode == Opt.CHANGE: # change cv2.imwrite(os.path.join(db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore. else: return Status.NotImplementedError # update the face_db to keep latest search_identity( # 保证database是最新的 detected_face=np.zeros([224, 224, 3]), db_path=db_path, detector_backend=detector_backend, distance_metric=distance_metric, model_name=model_name, ) return retjson #interface 2: face recognition using embeddings on face_db def faceRecog(img: Union[str,np.ndarray],img_mode: Ftype): beg = time.time() # keep the database is the newest search_identity( detected_face=np.zeros([224, 224, 3]), db_path=db_path, detector_backend=detector_backend, distance_metric=distance_metric, model_name=model_name, ) print("search_identity, use time: ", time.time()-beg) beg = time.time() # open and read face db file_name = f"ds_{model_name}_{detector_backend}_v2.pkl" file_name = file_name.replace("-", "").lower() datastore_path = os.path.join(db_path, file_name) with open(datastore_path, "rb") as f: db_representations = pickle.load(f) assert len(db_representations) > 0, 'no face in database' print(f"{len(db_representations)} faces representataion in {file_name}.") print("open and read face db, use time: ", time.time()-beg) beg = time.time() # process the img ret = process_img(img, img_mode) if isinstance(ret,Status): return ret target_img = ret print("process img, use time: ", time.time()-beg) beg = time.time() # begin face recognition df = pd.DataFrame(db_representations) labels = [] print("copy df, use time: ", time.time()-beg) beg = time.time() face_coors, detected_faces, descriptions = get_facial_areas(target_img, detector_backend, target_size) print("get facial areas, use time: ", time.time()-beg) beg = time.time() for idx, (x,y,w,h) in enumerate(face_coors): detected_face = detected_faces[idx] unknown_representation = model.find_embeddings( preprocessing.normalize_input(img=detected_face,normalization='base') ) distances = [] result_df = df.copy() for _, instance in df.iterrows(): source_representation = instance['embedding'] if source_representation is None : distances.append(float('inf')) continue assert len(unknown_representation) == len(source_representation), 'wrong len of embedding ' distance = verification.find_distance(source_representation,unknown_representation,distance_metric) distances.append(distance) result_df['distance'] = distances result_df = result_df.drop(columns=['embedding']) result_df = result_df[result_df['distance'] < threshold] if result_df.shape[0] <= 0: labels.append("unknown") else: result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True) target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"] target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}" labels.append(target_label) print("face recognition, use time: ", time.time()-beg) # draw the face frame for easy to view # for label, (x,y,w,h) in zip(labels, face_coors): for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)): color = (0,0,255) if label == 'unknown' else (255,0,0) cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3) cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1) return labels, face_coors, target_img if __name__ == "__main__": """ img = sys.argv[1] img_name = sys.argv[2] img_mode = GLOBAL_DICT[ sys.argv[3] ] opt_mode = GLOBAL_DICT[ sys.argv[4] ] # call updateFaceDB for handling the picture ret = updateFaceDB(img = img, img_name = img_name, img_mode = img_mode, opt_mode = opt_mode, ) if isinstance(ret, Status): # means error happened print(ret.value) else: print('ret["status"], ret["description"]: ', ret["status"], ret["description"]) """ # use func to face recognition tic = time.time() img = sys.argv[1] # img_name = sys.argv[2] img_mode = GLOBAL_DICT[ sys.argv[2] ] # opt_mode = GLOBAL_DICT[ sys.argv[4] ] ret = faceRecog( img = img, img_mode = img_mode, ) tec = time.time() print(ret[0], ret[1], tec-tic) print("img.size = ", ret[2].shape) cv2.imshow("res",ret[2]) cv2.waitKey(0)