383 lines
13 KiB
Python
383 lines
13 KiB
Python
import time
|
|
# start = time.time()
|
|
from deepface import DeepFace
|
|
# print("begin time: ", time.time() - start)
|
|
import cv2
|
|
import os
|
|
import sys
|
|
import numpy as np
|
|
import re
|
|
import pandas as pd
|
|
from typing import Union
|
|
import base64
|
|
from enum import Enum
|
|
import requests
|
|
|
|
from deepface.modules.streaming import search_identity
|
|
from deepface.modules import verification, detection
|
|
from deepface.models.FacialRecognition import FacialRecognition
|
|
from deepface.modules import preprocessing
|
|
from deepface.commons.logger import Logger
|
|
import pickle
|
|
|
|
# print("begin2 time: ", time.time() - start)
|
|
# start2 = time.time()
|
|
|
|
db_path = "C:/Users/JIALE/Desktop/bns_proj/proj_deepface/face"
|
|
detector_backend = "opencv"
|
|
distance_metric = "euclidean_l2"
|
|
model_name = "GhostFaceNet"
|
|
model: FacialRecognition = DeepFace.build_model(model_name=model_name)
|
|
target_size = model.input_shape
|
|
threshold = verification.find_threshold(model_name, distance_metric)
|
|
|
|
# print("begin3 time: ", time.time() - start)
|
|
# start3 = time.time()
|
|
|
|
class Opt(Enum):
|
|
ADD = 1
|
|
DELETE = 2
|
|
CHANGE = 3
|
|
SEARCH = 4
|
|
|
|
class Ftype(Enum):
|
|
NDARRAY = 0
|
|
PATH = 1
|
|
BASE64 = 2
|
|
URL = 3
|
|
|
|
class Status(Enum):
|
|
SUCCESS = 1 # 成功
|
|
HTTP_ERROR = "http error" # http 检索报错
|
|
DELETE_FINISH = "delete finished"
|
|
BASE64_EMPTY = "base64 is empty"
|
|
NotImplementedError = "Not Implemented Error"
|
|
ADD_HAS_EXISTED = "when adding, file has existed"
|
|
|
|
GLOBAL_DICT = {
|
|
"ADD": Opt.ADD,
|
|
"DELETE":Opt.DELETE,
|
|
"CHANGE":Opt.CHANGE,
|
|
"SEARCH":Opt.SEARCH,
|
|
|
|
"NDARRAY": Ftype.NDARRAY,
|
|
"PATH":Ftype.PATH,
|
|
"BASE64":Ftype.BASE64,
|
|
"URL":Ftype.URL
|
|
}
|
|
|
|
|
|
def get_facial_areas(
|
|
img: np.ndarray, detector_backend: str, target_size,
|
|
):
|
|
"""
|
|
Find facial area coordinates in the given image
|
|
Args:
|
|
img (np.ndarray): image itself
|
|
detector_backend (string): face detector backend. Options: 'opencv', 'retinaface',
|
|
'mtcnn', 'ssd', 'dlib', 'mediapipe', 'yolov8' (default is opencv).
|
|
target_size (tuple): input shape of the facial recognition model.
|
|
Returns
|
|
result (list): list of tuple with x, y, w and h coordinates
|
|
detected_face
|
|
descriptions
|
|
"""
|
|
print("get facial areas, img.shape: ", img.shape)
|
|
beg = time.time()
|
|
try:
|
|
face_objs = detection.extract_faces(
|
|
img_path=img,
|
|
target_size=target_size,
|
|
detector_backend=detector_backend,
|
|
enforce_detection=True,
|
|
align=True,
|
|
expand_percentage=0,
|
|
grayscale=False,
|
|
human_readable=False,
|
|
)
|
|
|
|
print("detection.extract_faces. use time: ", time.time()-beg )
|
|
beg = time.time()
|
|
|
|
faces_coordinates = [
|
|
(
|
|
face_obj["facial_area"]["x"],
|
|
face_obj["facial_area"]["y"],
|
|
face_obj["facial_area"]["w"],
|
|
face_obj["facial_area"]["h"],
|
|
)
|
|
for face_obj in face_objs
|
|
# if face_obj["facial_area"]["w"] > threshold
|
|
]
|
|
faces = [ face_obj["face"] for face_obj in face_objs ]
|
|
descriptions = []
|
|
for x,y,w,h in faces_coordinates:
|
|
if min(w,h)<40:
|
|
descriptions.append(f"width={w}, height={h}. the face is too small, please upload bigger face.")
|
|
else:
|
|
descriptions.append("success")
|
|
return faces_coordinates, faces, descriptions
|
|
except: # to avoid exception if no face detected
|
|
return [], [], ["no face detected. Blurry Image, Non-frontal View or Face Pixels too small?"]
|
|
|
|
def process_img(img, img_mode: Ftype):
|
|
target_img = None
|
|
if img_mode == Ftype.NDARRAY: # np.ndarray格式
|
|
target_img = img
|
|
elif img_mode == Ftype.PATH: # img path
|
|
target_img = cv2.imread(img)
|
|
elif img_mode == Ftype.BASE64: # base64
|
|
encoded_data_parts = img.split(",")
|
|
if len(encoded_data_parts) <= 0:
|
|
# raise "base64 is empty"
|
|
return Status.BASE64_EMPTY
|
|
encoded_data = encoded_data_parts[-1]
|
|
nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8)
|
|
target_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
elif img_mode == Ftype.URL: # url
|
|
try:
|
|
# response = requests.get(url=img,stream=True,timeout=60)
|
|
response = requests.get(url=img, stream=True, timeout=60, proxies={"http": None, "https": None})
|
|
response.raise_for_status()
|
|
image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8)
|
|
target_img = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
|
except requests.HTTPError as http_err:
|
|
# print(f"HTTP error occurred: {http_err}") # 输出 HTTP 错误信息
|
|
return Status.HTTP_ERROR
|
|
else:
|
|
return Status.NotImplementedError
|
|
|
|
return target_img
|
|
|
|
# interface 1: update face db. add / remove /change imgs. 我们输出的都是`bgr`颜色空间
|
|
def updateFaceDB(img: Union[str, np.ndarray], img_name: str, img_mode: Ftype, opt_mode: Opt ):
|
|
"""
|
|
description: in db_path, we just store `.jpg`, other Image format will be changed to `.jpg`
|
|
args:
|
|
- img: str or np.ndarray
|
|
- img_name: str
|
|
- img_mode: 0 np.ndarray; 1 img_path 2 base64
|
|
- opt_mode: 0 add; 1 delete; 2 change
|
|
ret:
|
|
- retjson: dict.
|
|
"""
|
|
retjson = {
|
|
"status": None, # 入库成功 1 or 失败 0
|
|
"detected_img": None,
|
|
"description": "None",
|
|
}
|
|
if opt_mode == Opt.DELETE: #delete
|
|
try:
|
|
os.remove(os.path.join(db_path, img_name+'.jpg'))
|
|
except FileNotFoundError:
|
|
pass
|
|
return Status.DELETE_FINISH
|
|
|
|
ret = process_img(img, img_mode)
|
|
if isinstance(ret, Status):
|
|
return ret
|
|
else:
|
|
target_img = ret
|
|
|
|
face_coors, detected_faces, descriptions = get_facial_areas(target_img, detector_backend, target_size)
|
|
detected_img = target_img.copy()
|
|
for x,y,w,h in face_coors:
|
|
cv2.rectangle(detected_img, (x,y),(x+w,y+h),(255,0,0),3)
|
|
success, encoded_detected_img = cv2.imencode('.png', detected_img)
|
|
if success:
|
|
detected_img_base64 = base64.b64encode(encoded_detected_img).decode("utf-8")
|
|
if len(face_coors) >= 2:
|
|
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": "too many faces, cannot select which one" }) # too many face
|
|
elif len(face_coors) <= 0:
|
|
retjson.update({"status": 0, "detected_img": detected_img_base64 , "description": descriptions[0]}) # no face
|
|
else: # only one face is detected
|
|
retjson.update({"status": 1 if descriptions[0] =="success" else 0 , "detected_img": detected_img_base64 , "description": descriptions[0]}) # success
|
|
|
|
if retjson['status'] == 0:
|
|
return retjson
|
|
|
|
db_imgs = os.listdir(db_path)
|
|
db_imgs = [db_img.split(".")[0] for db_img in db_imgs]
|
|
if opt_mode == Opt.ADD: # add
|
|
if img_name in db_imgs:
|
|
return Status.ADD_HAS_EXISTED # raise "operate is add, but db has existed the img_name"
|
|
cv2.imwrite(os.path.join(db_path, img_name+'.jpg'), target_img )
|
|
elif opt_mode == Opt.CHANGE: # change
|
|
cv2.imwrite(os.path.join(db_path, img_name+'.jpg'), target_img ) # whether exist or not in db, save it anymore.
|
|
else:
|
|
return Status.NotImplementedError
|
|
|
|
# update the face_db to keep latest
|
|
search_identity( # 保证database是最新的
|
|
detected_face=np.zeros([224, 224, 3]),
|
|
db_path=db_path,
|
|
detector_backend=detector_backend,
|
|
distance_metric=distance_metric,
|
|
model_name=model_name,
|
|
)
|
|
|
|
return retjson
|
|
|
|
|
|
#interface 2: face recognition using embeddings on face_db
|
|
def faceRecog(img: Union[str,np.ndarray], img_mode: Ftype):
|
|
beg = time.time()
|
|
# keep the database is the newest
|
|
search_identity(
|
|
detected_face=np.zeros([224, 224, 3]),
|
|
db_path=db_path,
|
|
detector_backend=detector_backend,
|
|
distance_metric=distance_metric,
|
|
model_name=model_name,
|
|
)
|
|
print("search_identity, use time: ", time.time()-beg)
|
|
beg = time.time()
|
|
|
|
# open and read face db
|
|
file_name = f"ds_{model_name}_{detector_backend}_v2.pkl"
|
|
file_name = file_name.replace("-", "").lower()
|
|
datastore_path = os.path.join(db_path, file_name)
|
|
with open(datastore_path, "rb") as f:
|
|
db_representations = pickle.load(f)
|
|
assert len(db_representations) > 0, 'no face in database'
|
|
print(f"{len(db_representations)} faces representataion in {file_name}.")
|
|
|
|
print("open and read face db, use time: ", time.time()-beg)
|
|
beg = time.time()
|
|
|
|
# process the img
|
|
ret = process_img(img, img_mode)
|
|
if isinstance(ret,Status):
|
|
return ret
|
|
target_img = ret
|
|
|
|
print("process img, use time: ", time.time()-beg)
|
|
beg = time.time()
|
|
|
|
# begin face recognition
|
|
df = pd.DataFrame(db_representations)
|
|
labels = []
|
|
|
|
print("copy df, use time: ", time.time()-beg)
|
|
beg = time.time()
|
|
|
|
face_coors, detected_faces, descriptions = get_facial_areas(target_img, detector_backend, target_size)
|
|
|
|
print("get facial areas, use time: ", time.time()-beg)
|
|
beg = time.time()
|
|
|
|
for idx, (x,y,w,h) in enumerate(face_coors):
|
|
detected_face = detected_faces[idx]
|
|
unknown_representation = model.find_embeddings(
|
|
preprocessing.normalize_input(img=detected_face,normalization='base')
|
|
)
|
|
distances = []
|
|
result_df = df.copy()
|
|
for _, instance in df.iterrows():
|
|
source_representation = instance['embedding']
|
|
if source_representation is None :
|
|
distances.append(float('inf'))
|
|
continue
|
|
|
|
assert len(unknown_representation) == len(source_representation), 'wrong len of embedding '
|
|
distance = verification.find_distance(source_representation,unknown_representation,distance_metric)
|
|
distances.append(distance)
|
|
|
|
result_df['distance'] = distances
|
|
result_df = result_df.drop(columns=['embedding'])
|
|
result_df = result_df[result_df['distance'] < threshold]
|
|
if result_df.shape[0] <= 0:
|
|
labels.append("unknown")
|
|
else:
|
|
result_df = result_df.sort_values(by=["distance"], ascending=True).reset_index(drop=True)
|
|
target_label, min_dist = result_df.loc[0, "identity"], result_df.loc[0, "distance"]
|
|
target_label = re.split(r'[\\\/]', target_label)[-1].split('.')[0] + f" {min_dist:.4f}"
|
|
labels.append(target_label)
|
|
|
|
print("face recognition, use time: ", time.time()-beg)
|
|
# draw the face frame for easy to view
|
|
# for label, (x,y,w,h) in zip(labels, face_coors):
|
|
for idx, (label, (x, y, w, h)) in enumerate(zip(labels, face_coors)):
|
|
color = (0,0,255) if label == 'unknown' else (255,0,0)
|
|
cv2.rectangle(target_img,(x,y),(x+w,y+h), color, 3)
|
|
cv2.putText(target_img, f"{idx}: " + label, (x,y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 1)
|
|
|
|
return labels, face_coors, target_img
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
input_data = sys.stdin.read()
|
|
split_data = input_data.replace('\r\n', '\n').split('\n') # 将 `\r\n` 优先认定为换行符, 而不是认定为 `\r + 换行`
|
|
img, img_name = "",""
|
|
TYPE = sys.argv[1]
|
|
if len(split_data) == 2:
|
|
img = split_data[0]
|
|
img_name = split_data[1]
|
|
elif len(split_data) == 1:
|
|
img = split_data[0]
|
|
|
|
print(TYPE, img, img_name)
|
|
|
|
interface_type, img_mode, opt_mode = TYPE.split("_")
|
|
img_mode = GLOBAL_DICT[img_mode]
|
|
if interface_type == "UPDATEDB":
|
|
opt_mode = GLOBAL_DICT[opt_mode]
|
|
# call updateFaceDB for handling the picture
|
|
ret = updateFaceDB(img = img,
|
|
img_name = img_name,
|
|
img_mode = img_mode,
|
|
opt_mode = opt_mode,
|
|
)
|
|
if isinstance(ret, Status): # means error happened
|
|
print(ret.value)
|
|
else:
|
|
print(f'status: {ret["status"]}, description: {ret["description"]}')
|
|
elif interface_type == "FACEID":
|
|
ret = faceRecog(
|
|
img = img,
|
|
img_mode = img_mode,
|
|
)
|
|
success, encoded_res_img = cv2.imencode('.png', ret[2])
|
|
if success:
|
|
res_img_base64 = base64.b64encode(encoded_res_img).decode("utf-8")
|
|
print(f"name: {ret[0]}, position: {ret[1]}, resImg: {res_img_base64}" )
|
|
else:
|
|
print(Status.NotImplementedError)
|
|
|
|
"""
|
|
img = sys.argv[1]
|
|
img_name = sys.argv[2]
|
|
img_mode = GLOBAL_DICT[ sys.argv[3] ]
|
|
opt_mode = GLOBAL_DICT[ sys.argv[4] ]
|
|
|
|
# call updateFaceDB for handling the picture
|
|
ret = updateFaceDB(img = img,
|
|
img_name = img_name,
|
|
img_mode = img_mode,
|
|
opt_mode = opt_mode,
|
|
)
|
|
if isinstance(ret, Status): # means error happened
|
|
print(ret.value)
|
|
else:
|
|
print('ret["status"], ret["description"]: ', ret["status"], ret["description"])
|
|
"""
|
|
"""
|
|
# use func to face recognition
|
|
tic = time.time()
|
|
img = sys.argv[1]
|
|
# img_name = sys.argv[2]
|
|
img_mode = GLOBAL_DICT[ sys.argv[2] ]
|
|
# opt_mode = GLOBAL_DICT[ sys.argv[4] ]
|
|
|
|
ret = faceRecog(
|
|
img = img,
|
|
img_mode = img_mode,
|
|
)
|
|
tec = time.time()
|
|
|
|
print(ret[0], ret[1], tec-tic)
|
|
print("img.size = ", ret[2].shape)
|
|
cv2.imshow("res",ret[2])
|
|
cv2.waitKey(0)
|
|
""" |