add web service
This commit is contained in:
parent
8cb2db80d9
commit
93c597dd94
|
|
@ -0,0 +1,423 @@
|
|||
import time
|
||||
from deepface import DeepFace
|
||||
import cv2
|
||||
import os
|
||||
import sys
|
||||
import numpy as np
|
||||
import re
|
||||
import pandas as pd
|
||||
from typing import Union
|
||||
import base64
|
||||
from enum import Enum
|
||||
import requests
|
||||
|
||||
from deepface.modules.streaming import search_identity
|
||||
from deepface.modules import verification, detection
|
||||
from deepface.models.FacialRecognition import FacialRecognition
|
||||
from deepface.modules import preprocessing
|
||||
from deepface.commons.logger import Logger
|
||||
import pickle
|
||||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
# db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face"
|
||||
# detector_backend = "opencv"
|
||||
# distance_metric = "euclidean_l2"
|
||||
# model_name = "GhostFaceNet"
|
||||
# model: FacialRecognition = DeepFace.build_model(model_name=model_name)
|
||||
# target_size = model.input_shape
|
||||
# threshold = verification.find_threshold(model_name, distance_metric)
|
||||
|
||||
# print("begin3 time: ", time.time() - start)
|
||||
# start3 = time.time()
|
||||
|
||||
class Opt(Enum):
|
||||
ADD = 1
|
||||
DELETE = 2
|
||||
CHANGE = 3
|
||||
SEARCH = 4
|
||||
|
||||
class Ftype(Enum):
|
||||
NDARRAY = 0
|
||||
PATH = 1
|
||||
BASE64 = 2
|
||||
URL = 3
|
||||
|
||||
class Status(Enum):
|
||||
SUCCESS = 1 # 成功
|
||||
HTTP_ERROR = "http error" # http 检索报错
|
||||
DELETE_FINISH = "delete finished"
|
||||
BASE64_EMPTY = "base64 is empty"
|
||||
NotImplementedError = "Not Implemented Error"
|
||||
ADD_HAS_EXISTED = "when adding, file has existed"
|
||||
|
||||
GLOBAL_DICT = {
|
||||
"ADD": Opt.ADD,
|
||||
"DELETE":Opt.DELETE,
|
||||
"CHANGE":Opt.CHANGE,
|
||||
"SEARCH":Opt.SEARCH,
|
||||
|
||||
"NDARRAY": Ftype.NDARRAY,
|
||||
"PATH":Ftype.PATH,
|
||||
"BASE64":Ftype.BASE64,
|
||||
"URL":Ftype.URL
|
||||
}
|
||||
|
||||
class FaceHelper:
|
||||
def __init__(self, db_path: str, detector_backend: str, distance_metric: str, model_name: str):
|
||||
self.db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face"
|
||||
self.detector_backend = "opencv"
|
||||
self.distance_metric = "euclidean_l2"
|
||||
self.model_name = "GhostFaceNet"
|
||||
self.model = DeepFace.build_model(model_name=model_name)
|
||||
self.target_size = self.model.input_shape
|
||||
self.threshold = verification.find_threshold(model_name, distance_metric)
|
||||
self.pkl_name = f"ds_{self.model_name}_{self.detector_backend}_v2.pkl".replace("-", "").lower()
|
||||
|
||||
def get_facial_areas(self, img: np.ndarray):
|
||||
"""
|
||||
Find facial area coordinates in the given image
|
||||
Args:
|
||||
img (np.ndarray): image itself
|
||||
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
|
||||
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv).
|
||||
target_size (tuple): input shape of the facial recognition model.
|
||||
Returns
|
||||
result (list): list of tuple with x, y, w and h coordinates
|
||||
detected_face
|
||||
descriptions
|
||||
"""
|
||||
# print("get facial areas, img.shape: ", img.shape)
|
||||
# beg = time.time()
|
||||
try:
|
||||
face_objs = detection.extract_faces(
|
||||
img_path = img,
|
||||
target_size = self.target_size,
|
||||
detector_backend = self.detector_backend,
|
||||
enforce_detection = True,
|
||||
align = True,
|
||||
expand_percentage = 0,
|
||||
grayscale = False,
|
||||
human_readable = False,
|
||||
)
|
||||
|
||||
# print("detection.extract_faces. use time: ", time.time()-beg )
|
||||
# beg = time.time()
|
||||
|
||||
faces_coordinates = [
|
||||
(
|
||||
face_obj["facial_area"]["x"],
|
||||
face_obj["facial_area"]["y"],
|
||||
face_obj["facial_area"]["w"],
|
||||
face_obj["facial_area"]["h"],
|
||||
)
|
||||
for face_obj in face_objs
|
||||
# if face_obj["facial_area"]["w"] > threshold
|
||||
]
|
||||
faces = [ face_obj["face"] for face_obj in face_objs ]
|
||||
descriptions = []
|
||||
for x,y,w,h in faces_coordinates:
|
||||
if min(w,h)<40:
|
||||
descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.")
|
||||
else:
|
||||
descriptions.append("success")
|
||||
return faces_coordinates, faces, descriptions
|
||||
except: # to avoid exception if no face detected
|
||||
return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"]
|
||||
|
||||
@staticmethod
|
||||
def process_img(img, img_mode: Ftype):
|
||||
target_img = None
|
||||
if img_mode == Ftype.NDARRAY: # np.ndarray格式
|
||||
target_img = img
|
||||
elif img_mode == Ftype.PATH: # img path
|
||||
target_img = cv2.imread(img)
|
||||
elif img_mode == Ftype.BASE64: # base64
|
||||
encoded_data_parts = img.split(",")
|
||||
if len(encoded_data_parts) <= 0:
|
||||
# raise "base64 is empty"
|
||||
return Status.BASE64_EMPTY
|
||||
encoded_data = encoded_data_parts[-1]
|
||||
nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8)
|
||||
target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
elif img_mode == Ftype.URL: # url
|
||||
try:
|
||||
# response = requests.get(url=img,stream=True,timeout=60)
|
||||
response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None})
|
||||
response.raise_for_status()
|
||||
image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8)
|
||||
target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
||||
except requests.HTTPError as http_err:
|
||||
# print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息
|
||||
return Status.HTTP_ERROR
|
||||
else:
|
||||
return Status.NotImplementedError
|
||||
|
||||
return target_img
|
||||
|
||||
# interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间
|
||||
def updateFaceDB(self, img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ):
|
||||
"""
|
||||
description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg`
|
||||
args:
|
||||
- img: str or np.ndarray
|
||||
- img_name: str
|
||||
- img_mode: 0 np.ndarray; 1 img_path 2 base64
|
||||
- opt_mode: 0 add; 1 delete; 2 change
|
||||
ret:
|
||||
- retjson: dict.
|
||||
"""
|
||||
retjson = {
|
||||
"status": None, # 入库成功 1 or 失败 0
|
||||
"detected_img": None,
|
||||
"description": "None",
|
||||
}
|
||||
if opt_mode == Opt.DELETE: #delete
|
||||
try:
|
||||
os.remove(os.path.join(db_path, img_name+'.jpg'))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return Status.DELETE_FINISH
|
||||
|
||||
ret = FaceHelper.process_img(img, img_mode)
|
||||
if isinstance(ret, Status):
|
||||
return ret
|
||||
else:
|
||||
target_img = ret
|
||||
|
||||
face_coors, detected_faces, descriptions = self.get_facial_areas(target_img)
|
||||
detected_img = target_img.copy()
|
||||
for x,y,w,h in face_coors:
|
||||
cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3)
|
||||
success, encoded_detected_img = cv2.imencode('.png', detected_img)
|
||||
if success:
|
||||
detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8")
|
||||
if len(face_coors) >= 2:
|
||||
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face
|
||||
elif len(face_coors) <= 0:
|
||||
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face
|
||||
else: # only one face is detected
|
||||
retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success
|
||||
|
||||
if retjson['status'] == 0:
|
||||
return retjson
|
||||
|
||||
# db_imgs = os.listdir(self.db_path)
|
||||
db_imgs = [db_img.split(".")[0] for db_img in os.listdir(self.db_path) if db_img.endswith(".jpg")]
|
||||
if opt_mode == Opt.ADD: # add
|
||||
if img_name in db_imgs:
|
||||
return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name"
|
||||
cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img )
|
||||
elif opt_mode == Opt.CHANGE: # change
|
||||
cv2.imwrite(os.path.join(self.db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore.
|
||||
else:
|
||||
return Status.NotImplementedError
|
||||
|
||||
# update the face_db to keep latest
|
||||
search_identity( # 保证database是最新的
|
||||
detected_face=np.zeros([224, 224, 3]),
|
||||
db_path=self.db_path,
|
||||
detector_backend=self.detector_backend,
|
||||
distance_metric=self.distance_metric,
|
||||
model_name=self.model_name,
|
||||
)
|
||||
|
||||
return retjson
|
||||
|
||||
|
||||
#interface 2: face recognition using embeddings on face_db
|
||||
def faceRecog(self, img: Union[str,np.ndarray], img_mode: Ftype):
|
||||
# keep the database is the newest
|
||||
search_identity(
|
||||
detected_face=np.zeros([224, 224, 3]),
|
||||
db_path=self.db_path,
|
||||
detector_backend=self.detector_backend,
|
||||
distance_metric=self.distance_metric,
|
||||
model_name=self.model_name,
|
||||
)
|
||||
|
||||
# open and read face db
|
||||
datastore_path = os.path.join(self.db_path, self.pkl_name)
|
||||
with open(datastore_path, "rb") as f:
|
||||
db_representations = pickle.load(f)
|
||||
assert len(db_representations) > 0, 'no face in database'
|
||||
print(f"{len(db_representations)} faces representataion in {self.pkl_name}.")
|
||||
|
||||
# process the img
|
||||
ret = FaceHelper.process_img(img, img_mode)
|
||||
if isinstance(ret, Status):
|
||||
return ret
|
||||
target_img = ret
|
||||
|
||||
# begin face recognition
|
||||
df = pd.DataFrame(db_representations)
|
||||
labels = []
|
||||
|
||||
face_coors, detected_faces, descriptions = self.get_facial_areas(target_img)
|
||||
|
||||
for idx, (x,y,w,h) in enumerate(face_coors):
|
||||
detected_face = detected_faces[idx]
|
||||
unknown_representation = self.model.find_embeddings(
|
||||
preprocessing.normalize_input(img=detected_face,normalization='base')
|
||||
)
|
||||
distances = []
|
||||
result_df = df.copy()
|
||||
for _, instance in df.iterrows():
|
||||
source_representation = instance['embedding']
|
||||
if source_representation is None :
|
||||
distances.append(float('inf'))
|
||||
continue
|
||||
|
||||
assert len(unknown_representation) == len(source_representation), 'wrong len of embedding '
|
||||
distance = verification.find_distance(source_representation, unknown_representation, self.distance_metric)
|
||||
distances.append(distance)
|
||||
|
||||
result_df['distance'] = distances
|
||||
result_df = result_df.drop(columns=['embedding'])
|
||||
result_df = result_df[result_df['distance'] < self.threshold]
|
||||
if result_df.shape[0] <= 0:
|
||||
labels.append("unknown")
|
||||
else:
|
||||
result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True)
|
||||
target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"]
|
||||
target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}"
|
||||
labels.append(target_label)
|
||||
|
||||
# draw the face frame for easy to view
|
||||
for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)):
|
||||
color = (0,0,255) if label == 'unknown' else (255,0,0)
|
||||
cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3)
|
||||
cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1)
|
||||
success, encoded_target_img = cv2.imencode('.png', target_img)
|
||||
if success:
|
||||
target_img_base64 = base64.b64encode(encoded_target_img).decode("utf-8")
|
||||
return labels, face_coors, target_img_base64
|
||||
|
||||
|
||||
facehelper = FaceHelper(
|
||||
db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face",
|
||||
detector_backend = "opencv",
|
||||
distance_metric = "euclidean_l2",
|
||||
model_name = "GhostFaceNet" ,
|
||||
)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
class item(BaseModel):
|
||||
type: str
|
||||
img: str
|
||||
img_name: str
|
||||
|
||||
|
||||
# @app.post("/master/")
|
||||
# def master(input: item ):
|
||||
# return {"type": item["type"], "img":"fdasfsdf"}
|
||||
|
||||
|
||||
@app.post("/interface/")
|
||||
def main(input: item):
|
||||
interface_type, img_mode, opt_mode = input.type.split("_")
|
||||
img_mode = GLOBAL_DICT[img_mode]
|
||||
if interface_type == "UPDATEDB":
|
||||
opt_mode = GLOBAL_DICT[opt_mode]
|
||||
# call updateFaceDB for handling the picture
|
||||
ret = facehelper.updateFaceDB(img = input.img,
|
||||
img_name = input.img_name,
|
||||
img_mode = img_mode,
|
||||
opt_mode = opt_mode,
|
||||
)
|
||||
if isinstance(ret, Status): # means error happened
|
||||
return {"status": 0, "description":ret.value}
|
||||
else:
|
||||
return {"status": ret["status"], "description": ret["description"] }
|
||||
elif interface_type == "FACEID":
|
||||
ret = facehelper.faceRecog(
|
||||
img = input.img,
|
||||
img_mode = img_mode,
|
||||
)
|
||||
if isinstance(ret, Status): # means error happened
|
||||
return {"status": 0, "description":ret.value}
|
||||
else:
|
||||
return {"status": 1, "name": ret[0], "position": ret[1], "resImg": ret[2]}
|
||||
# print(f"name: {ret[0]}, position: {ret[1]}, resImg: {ret[2]}" )
|
||||
else:
|
||||
# print(Status.NotImplementedError)
|
||||
return {"status": 0, "description": "wrong inferface_type, which just includes {'UPDATEDB','FACEID'}" }
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
input_data = sys.stdin.read()
|
||||
split_data = input_data.replace('\r\n', '\n').split('\n') # 将 `\r\n` 优先认定为换行符, 而不是认定为 `\r + 换行`
|
||||
img, img_name = "",""
|
||||
TYPE = sys.argv[1]
|
||||
if len(split_data) == 2:
|
||||
img = split_data[0]
|
||||
img_name = split_data[1]
|
||||
elif len(split_data) == 1:
|
||||
img = split_data[0]
|
||||
|
||||
print(TYPE, img, img_name)
|
||||
|
||||
interface_type, img_mode, opt_mode = TYPE.split("_")
|
||||
img_mode = GLOBAL_DICT[img_mode]
|
||||
if interface_type == "UPDATEDB":
|
||||
opt_mode = GLOBAL_DICT[opt_mode]
|
||||
# call updateFaceDB for handling the picture
|
||||
ret = updateFaceDB(img = img,
|
||||
img_name = img_name,
|
||||
img_mode = img_mode,
|
||||
opt_mode = opt_mode,
|
||||
)
|
||||
if isinstance(ret, Status): # means error happened
|
||||
print(ret.value)
|
||||
else:
|
||||
print(f'status: {ret["status"]}, description: {ret["description"]}')
|
||||
elif interface_type == "FACEID":
|
||||
ret = faceRecog(
|
||||
img = img,
|
||||
img_mode = img_mode,
|
||||
)
|
||||
success, encoded_res_img = cv2.imencode('.png', ret[2])
|
||||
if success:
|
||||
res_img_base64 = base64.b64encode(encoded_res_img).decode("utf-8")
|
||||
print(f"name: {ret[0]}, position: {ret[1]}, resImg: {res_img_base64}" )
|
||||
else:
|
||||
print(Status.NotImplementedError)
|
||||
|
||||
"""
|
||||
img = sys.argv[1]
|
||||
img_name = sys.argv[2]
|
||||
img_mode = GLOBAL_DICT[ sys.argv[3] ]
|
||||
opt_mode = GLOBAL_DICT[ sys.argv[4] ]
|
||||
|
||||
# call updateFaceDB for handling the picture
|
||||
ret = updateFaceDB(img = img,
|
||||
img_name = img_name,
|
||||
img_mode = img_mode,
|
||||
opt_mode = opt_mode,
|
||||
)
|
||||
if isinstance(ret, Status): # means error happened
|
||||
print(ret.value)
|
||||
else:
|
||||
print('ret["status"], ret["description"]: ', ret["status"], ret["description"])
|
||||
"""
|
||||
"""
|
||||
# use func to face recognition
|
||||
tic = time.time()
|
||||
img = sys.argv[1]
|
||||
# img_name = sys.argv[2]
|
||||
img_mode = GLOBAL_DICT[ sys.argv[2] ]
|
||||
# opt_mode = GLOBAL_DICT[ sys.argv[4] ]
|
||||
|
||||
ret = faceRecog(
|
||||
img = img,
|
||||
img_mode = img_mode,
|
||||
)
|
||||
tec = time.time()
|
||||
|
||||
print(ret[0], ret[1], tec-tic)
|
||||
print("img.size = ", ret[2].shape)
|
||||
cv2.imshow("res",ret[2])
|
||||
cv2.waitKey(0)
|
||||
"""
|
||||
Loading…
Reference in New Issue