diff --git a/proj_deepface/webmain.py b/proj_deepface/webmain.py new file mode 100644 index 0000000..9c978e6 --- /dev/null +++ b/proj_deepface/webmain.py @@ -0,0 +1,423 @@ +import time +from deepface import DeepFace +import cv2 +import os +import sys +import numpy as np +import re +import pandas as pd +from typing import Union +import base64 +from enum import Enum +import requests + +from deepface.modules.streaming import search_identity +from deepface.modules import verification, detection +from deepface.models.FacialRecognition import FacialRecognition +from deepface.modules import preprocessing +from deepface.commons.logger import Logger +import pickle +from fastapi import FastAPI +from pydantic import BaseModel + + +# db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" +# detector_backend = "opencv" +# distance_metric = "euclidean_l2" +# model_name = "GhostFaceNet" +# model: FacialRecognition = DeepFace.build_model(model_name=model_name) +# target_size = model.input_shape +# threshold = verification.find_threshold(model_name, distance_metric) + +# print("begin3 time: ", time.time() - start) +# start3 = time.time() + +class Opt(Enum): + ADD = 1 + DELETE = 2 + CHANGE = 3 + SEARCH = 4 + +class Ftype(Enum): + NDARRAY = 0 + PATH = 1 + BASE64 = 2 + URL = 3 + +class Status(Enum): + SUCCESS = 1 # 成功 + HTTP_ERROR = "http error" # http 检索报错 + DELETE_FINISH = "delete finished" + BASE64_EMPTY = "base64 is empty" + NotImplementedError = "Not Implemented Error" + ADD_HAS_EXISTED = "when adding, file has existed" + +GLOBAL_DICT = { + "ADD": Opt.ADD, + "DELETE":Opt.DELETE, + "CHANGE":Opt.CHANGE, + "SEARCH":Opt.SEARCH, + + "NDARRAY": Ftype.NDARRAY, + "PATH":Ftype.PATH, + "BASE64":Ftype.BASE64, + "URL":Ftype.URL +} + +class FaceHelper: + def __init__(self, db_path: str, detector_backend: str, distance_metric: str, model_name: str): + self.db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face" + self.detector_backend = "opencv" + self.distance_metric = "euclidean_l2" + self.model_name = "GhostFaceNet" + self.model = DeepFace.build_model(model_name=model_name) + self.target_size = self.model.input_shape + self.threshold = verification.find_threshold(model_name, distance_metric) + self.pkl_name = f"ds_{self.model_name}_{self.detector_backend}_v2.pkl".replace("-", "").lower() + + def get_facial_areas(self, img: np.ndarray): + """ + Find facial area coordinates in the given image + Args: + img (np.ndarray): image itself + detector_backend (string): face detector backend. Options: 'opencv', 'retinaface', + 'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv). + target_size (tuple): input shape of the facial recognition model. + Returns + result (list): list of tuple with x, y, w and h coordinates + detected_face + descriptions + """ + # print("get facial areas, img.shape: ", img.shape) + # beg = time.time() + try: + face_objs = detection.extract_faces( + img_path = img, + target_size = self.target_size, + detector_backend = self.detector_backend, + enforce_detection = True, + align = True, + expand_percentage = 0, + grayscale = False, + human_readable = False, + ) + + # print("detection.extract_faces. use time: ", time.time()-beg ) + # beg = time.time() + + faces_coordinates = [ + ( + face_obj["facial_area"]["x"], + face_obj["facial_area"]["y"], + face_obj["facial_area"]["w"], + face_obj["facial_area"]["h"], + ) + for face_obj in face_objs + # if face_obj["facial_area"]["w"] > threshold + ] + faces = [ face_obj["face"] for face_obj in face_objs ] + descriptions = [] + for x,y,w,h in faces_coordinates: + if min(w,h)<40: + descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.") + else: + descriptions.append("success") + return faces_coordinates, faces, descriptions + except: # to avoid exception if no face detected + return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"] + + @staticmethod + def process_img(img, img_mode: Ftype): + target_img = None + if img_mode == Ftype.NDARRAY: # np.ndarray格式 + target_img = img + elif img_mode == Ftype.PATH: # img path + target_img = cv2.imread(img) + elif img_mode == Ftype.BASE64: # base64 + encoded_data_parts = img.split(",") + if len(encoded_data_parts) <= 0: + # raise "base64 is empty" + return Status.BASE64_EMPTY + encoded_data = encoded_data_parts[-1] + nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8) + target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + elif img_mode == Ftype.URL: # url + try: + # response = requests.get(url=img,stream=True,timeout=60) + response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None}) + response.raise_for_status() + image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8) + target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) + except requests.HTTPError as http_err: + # print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息 + return Status.HTTP_ERROR + else: + return Status.NotImplementedError + + return target_img + + # interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间 + def updateFaceDB(self, img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ): + """ + description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg` + args: + - img: str or np.ndarray + - img_name: str + - img_mode: 0 np.ndarray; 1 img_path 2 base64 + - opt_mode: 0 add; 1 delete; 2 change + ret: + - retjson: dict. + """ + retjson = { + "status": None, # 入库成功 1 or 失败 0 + "detected_img": None, + "description": "None", + } + if opt_mode == Opt.DELETE: #delete + try: + os.remove(os.path.join(db_path, img_name+'.jpg')) + except FileNotFoundError: + pass + return Status.DELETE_FINISH + + ret = FaceHelper.process_img(img, img_mode) + if isinstance(ret, Status): + return ret + else: + target_img = ret + + face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) + detected_img = target_img.copy() + for x,y,w,h in face_coors: + cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3) + success, encoded_detected_img = cv2.imencode('.png', detected_img) + if success: + detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8") + if len(face_coors) >= 2: + retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face + elif len(face_coors) <= 0: + retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face + else: # only one face is detected + retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success + + if retjson['status'] == 0: + return retjson + + # db_imgs = os.listdir(self.db_path) + db_imgs = [db_img.split(".")[0] for db_img in os.listdir(self.db_path) if db_img.endswith(".jpg")] + if opt_mode == Opt.ADD: # add + if img_name in db_imgs: + return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name" + cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) + elif opt_mode == Opt.CHANGE: # change + cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore. + else: + return Status.NotImplementedError + + # update the face_db to keep latest + search_identity( # 保证database是最新的 + detected_face=np.zeros([224, 224, 3]), + db_path=self.db_path, + detector_backend=self.detector_backend, + distance_metric=self.distance_metric, + model_name=self.model_name, + ) + + return retjson + + + #interface 2: face recognition using embeddings on face_db + def faceRecog(self, img: Union[str,np.ndarray], img_mode: Ftype): + # keep the database is the newest + search_identity( + detected_face=np.zeros([224, 224, 3]), + db_path=self.db_path, + detector_backend=self.detector_backend, + distance_metric=self.distance_metric, + model_name=self.model_name, + ) + + # open and read face db + datastore_path = os.path.join(self.db_path, self.pkl_name) + with open(datastore_path, "rb") as f: + db_representations = pickle.load(f) + assert len(db_representations) > 0, 'no face in database' + print(f"{len(db_representations)} faces representataion in {self.pkl_name}.") + + # process the img + ret = FaceHelper.process_img(img, img_mode) + if isinstance(ret, Status): + return ret + target_img = ret + + # begin face recognition + df = pd.DataFrame(db_representations) + labels = [] + + face_coors, detected_faces, descriptions = self.get_facial_areas(target_img) + + for idx, (x,y,w,h) in enumerate(face_coors): + detected_face = detected_faces[idx] + unknown_representation = self.model.find_embeddings( + preprocessing.normalize_input(img=detected_face,normalization='base') + ) + distances = [] + result_df = df.copy() + for _, instance in df.iterrows(): + source_representation = instance['embedding'] + if source_representation is None : + distances.append(float('inf')) + continue + + assert len(unknown_representation) == len(source_representation), 'wrong len of embedding ' + distance = verification.find_distance(source_representation, unknown_representation, self.distance_metric) + distances.append(distance) + + result_df['distance'] = distances + result_df = result_df.drop(columns=['embedding']) + result_df = result_df[result_df['distance'] < self.threshold] + if result_df.shape[0] <= 0: + labels.append("unknown") + else: + result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True) + target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"] + target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}" + labels.append(target_label) + + # draw the face frame for easy to view + for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)): + color = (0,0,255) if label == 'unknown' else (255,0,0) + cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3) + cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1) + success, encoded_target_img = cv2.imencode('.png', target_img) + if success: + target_img_base64 = base64.b64encode(encoded_target_img).decode("utf-8") + return labels, face_coors, target_img_base64 + + +facehelper = FaceHelper( + db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face", + detector_backend = "opencv", + distance_metric = "euclidean_l2", + model_name = "GhostFaceNet" , +) + +app = FastAPI() + +class item(BaseModel): + type: str + img: str + img_name: str + + +# @app.post("/master/") +# def master(input: item ): +# return {"type": item["type"], "img":"fdasfsdf"} + + +@app.post("/interface/") +def main(input: item): + interface_type, img_mode, opt_mode = input.type.split("_") + img_mode = GLOBAL_DICT[img_mode] + if interface_type == "UPDATEDB": + opt_mode = GLOBAL_DICT[opt_mode] + # call updateFaceDB for handling the picture + ret = facehelper.updateFaceDB(img = input.img, + img_name = input.img_name, + img_mode = img_mode, + opt_mode = opt_mode, + ) + if isinstance(ret, Status): # means error happened + return {"status": 0, "description":ret.value} + else: + return {"status": ret["status"], "description": ret["description"] } + elif interface_type == "FACEID": + ret = facehelper.faceRecog( + img = input.img, + img_mode = img_mode, + ) + if isinstance(ret, Status): # means error happened + return {"status": 0, "description":ret.value} + else: + return {"status": 1, "name": ret[0], "position": ret[1], "resImg": ret[2]} + # print(f"name: {ret[0]}, position: {ret[1]}, resImg: {ret[2]}" ) + else: + # print(Status.NotImplementedError) + return {"status": 0, "description": "wrong inferface_type, which just includes {'UPDATEDB','FACEID'}" } + + +if __name__ == "__main__": + input_data = sys.stdin.read() + split_data = input_data.replace('\r\n', '\n').split('\n') # 将 `\r\n` 优先认定为换行符, 而不是认定为 `\r + 换行` + img, img_name = "","" + TYPE = sys.argv[1] + if len(split_data) == 2: + img = split_data[0] + img_name = split_data[1] + elif len(split_data) == 1: + img = split_data[0] + + print(TYPE, img, img_name) + + interface_type, img_mode, opt_mode = TYPE.split("_") + img_mode = GLOBAL_DICT[img_mode] + if interface_type == "UPDATEDB": + opt_mode = GLOBAL_DICT[opt_mode] + # call updateFaceDB for handling the picture + ret = updateFaceDB(img = img, + img_name = img_name, + img_mode = img_mode, + opt_mode = opt_mode, + ) + if isinstance(ret, Status): # means error happened + print(ret.value) + else: + print(f'status: {ret["status"]}, description: {ret["description"]}') + elif interface_type == "FACEID": + ret = faceRecog( + img = img, + img_mode = img_mode, + ) + success, encoded_res_img = cv2.imencode('.png', ret[2]) + if success: + res_img_base64 = base64.b64encode(encoded_res_img).decode("utf-8") + print(f"name: {ret[0]}, position: {ret[1]}, resImg: {res_img_base64}" ) + else: + print(Status.NotImplementedError) + + """ + img = sys.argv[1] + img_name = sys.argv[2] + img_mode = GLOBAL_DICT[ sys.argv[3] ] + opt_mode = GLOBAL_DICT[ sys.argv[4] ] + + # call updateFaceDB for handling the picture + ret = updateFaceDB(img = img, + img_name = img_name, + img_mode = img_mode, + opt_mode = opt_mode, + ) + if isinstance(ret, Status): # means error happened + print(ret.value) + else: + print('ret["status"], ret["description"]: ', ret["status"], ret["description"]) + """ + """ + # use func to face recognition + tic = time.time() + img = sys.argv[1] + # img_name = sys.argv[2] + img_mode = GLOBAL_DICT[ sys.argv[2] ] + # opt_mode = GLOBAL_DICT[ sys.argv[4] ] + + ret = faceRecog( + img = img, + img_mode = img_mode, + ) + tec = time.time() + + print(ret[0], ret[1], tec-tic) + print("img.size = ", ret[2].shape) + cv2.imshow("res",ret[2]) + cv2.waitKey(0) + """ \ No newline at end of file