new faceregweb project. commit!
This commit is contained in:
parent
3e20a4106a
commit
fc3ffb6e31
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,53 @@
|
|||
faceReg:
|
||||
sim_threshold: 0.7
|
||||
|
||||
# brightness of image B: v1<= B <=v2 is high, v0<= B < v1 or v2 < B <= v3 is middle, B < v0 or B > v3 is low.
|
||||
# we just accept high brightness.
|
||||
brightness:
|
||||
v0: 69.0
|
||||
v1: 70.0
|
||||
v2: 230.0
|
||||
v3: 231.0
|
||||
|
||||
# min resolution of face box, if w or h is smaller, abort this image.
|
||||
resolution:
|
||||
height: 112
|
||||
width: 112
|
||||
|
||||
# evaluate pose.
|
||||
# Retrieve the angles yaw and pitch of the face, if the condition: abs(yaw) <= yaw_thrd and abs(pitch) <= pitch_thrd is satisfied, then return 'front face'(true);
|
||||
# otherwise, raise an error:
|
||||
# elif yaw < -1.0 * yaw_thrd:
|
||||
# return "rightFace"
|
||||
# elif yaw > yaw_thrd:
|
||||
# return "leftFace"
|
||||
# elif pitch > pitch_thrd:
|
||||
# return "upFace"
|
||||
# elif pitch < -1.0 * pitch_thrd:
|
||||
# return "downFace"
|
||||
pose:
|
||||
yaw_thrd: 30.0
|
||||
pitch_thrd: 25.0
|
||||
var_onnx_path: ./checkpoints/fsanet-var.onnx
|
||||
conv_onnx_path: ./checkpoints/fsanet-conv.onnx
|
||||
|
||||
# evaluate Clarity of image.
|
||||
# if clarity < self.low_thresh:
|
||||
# level = "LOW"
|
||||
# elif self.low_thresh <= clarity < self.high_thresh:
|
||||
# level = "MEDIUM"
|
||||
# else:
|
||||
# level = "HIGH"
|
||||
# return level != 'LOW'
|
||||
clarity:
|
||||
low_thrd: 0.10
|
||||
high_thrd: 0.20
|
||||
|
||||
ck_paths:
|
||||
landmk1: ./checkpoints/face_landmarker_pts5_net1.onnx
|
||||
landmk2: ./checkpoints/face_landmarker_pts5_net2.onnx
|
||||
num_threads: 4
|
||||
FcReg: ./checkpoints/face_recognizer.onnx
|
||||
FcBx: ./checkpoints/faceboxesv2-640x640.onnx
|
||||
rotifer: ./checkpoints/model_gray_mobilenetv2_rotcls.onnx
|
||||
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
import redis
|
||||
import os
|
||||
|
||||
# Redis config
|
||||
redis_host = str(os.getenv("REDIS_HOST", 'localhost'))
|
||||
redis_port = int(os.getenv("REDIS_PORT", 2012))
|
||||
redis_password = str(os.getenv("REDIS_PASSWORD", 'Xjsfzb@Redis123!'))
|
||||
num_workers = int(os.getenv("NUM_WORKERS", 10))
|
||||
|
||||
# connected
|
||||
r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=0)
|
||||
|
||||
# delete specific key in redisdb
|
||||
keys_to_delete = ['write_lock', 'read_lock'] + [f"worker_{i}"for i in range(num_workers)]
|
||||
|
||||
print("Deleted key:", end = " ")
|
||||
for key in keys_to_delete:
|
||||
r.delete(key)
|
||||
print(f"{key}", end =" ")
|
||||
print()
|
||||
|
||||
print("Specified keys deleted successfully.")
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
from .facerecoger import FaceRecoger
|
||||
from .facedetector import FaceBoxesV2
|
||||
from .facelandmarks5er import Landmark5er
|
||||
from .facealign import FaceAlign
|
||||
from .imgchecker import QualityOfClarity, QualityOfPose, QualityChecker
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -0,0 +1,184 @@
|
|||
"""
|
||||
这部分输入face5点位置和原图
|
||||
输出aligned cropped face
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
import time
|
||||
import os
|
||||
|
||||
# image_data: src image
|
||||
# image_width, image_height,image_channels: width and height, channels of src image
|
||||
# src_x, src_y: 输出image每个像素对应的src image 中的像素位置.
|
||||
def sampling(image_data, image_width, image_height, image_channels, src_x, src_y):
|
||||
ux = np.floor(src_x).astype(int)
|
||||
uy = np.floor(src_y).astype(int)
|
||||
|
||||
# 创建一个与src_x形状相同的空数组,用于存储最终的像素值
|
||||
pixel = np.zeros((*src_x.shape, image_channels), dtype=np.uint8)
|
||||
|
||||
# 创建一个掩码数组,标记有效的采样点
|
||||
valid_mask = (ux >= 0) & (ux < image_height - 1) & (uy >= 0) & (uy < image_width - 1)
|
||||
|
||||
# 计算插值
|
||||
x = src_x - ux
|
||||
y = src_y - uy
|
||||
|
||||
# 提取图像数据的各个通道
|
||||
image_data_reshape = image_data.reshape(-1, image_channels) # (height * width, channels)
|
||||
ux_uy = ux * image_width + uy # (height * width)
|
||||
ux_uy_next = ux_uy + 1
|
||||
ux_next = (ux + 1) * image_width + uy
|
||||
ux_next_next = ux_next + 1
|
||||
|
||||
ux_uy[~valid_mask] = 0
|
||||
ux_uy_next[~valid_mask] = 0
|
||||
ux_next[~valid_mask] = 0
|
||||
ux_next_next[~valid_mask] = 0
|
||||
|
||||
# 使用广播计算各个通道的插值
|
||||
top_left = image_data_reshape[ux_uy]
|
||||
top_right = image_data_reshape[ux_uy_next]
|
||||
bottom_left = image_data_reshape[ux_next]
|
||||
bottom_right = image_data_reshape[ux_next_next]
|
||||
|
||||
# 计算插值
|
||||
interpolated_top = (1 - y[:, :, np.newaxis]) * top_left + y[:, :, np.newaxis] * top_right
|
||||
interpolated_bottom = (1 - y[:, :, np.newaxis]) * bottom_left + y[:, :, np.newaxis] * bottom_right
|
||||
interpolated_pixel = (1 - x[:, :, np.newaxis]) * interpolated_top + x[:, :, np.newaxis] * interpolated_bottom
|
||||
|
||||
# 填充最终的像素值
|
||||
pixel[valid_mask] = np.clip(interpolated_pixel[valid_mask], 0, 255).astype(np.uint8)
|
||||
|
||||
return pixel
|
||||
|
||||
def spatial_transform(image_data, image_width, image_height, image_channels,
|
||||
crop_data, crop_width, crop_height, transformation,
|
||||
pad_top=0, pad_bottom=0, pad_left=0, pad_right=0,
|
||||
type='LINEAR', dtype='ZERO_PADDING', N=1):
|
||||
channels = image_channels
|
||||
dst_h = crop_height + pad_top + pad_bottom
|
||||
dst_w = crop_width + pad_left + pad_right
|
||||
|
||||
for n in range(N):
|
||||
theta_data = transformation.reshape(-1)
|
||||
scale = np.sqrt(theta_data[0] ** 2 + theta_data[3] ** 2)
|
||||
|
||||
bx, by = np.meshgrid(np.arange(dst_w) - pad_left, np.arange(dst_h) - pad_top)
|
||||
bx = bx.T
|
||||
by = by.T
|
||||
src_y = theta_data[0] * by + theta_data[1] * bx + theta_data[2]
|
||||
src_x = theta_data[3] * by + theta_data[4] * bx + theta_data[5]
|
||||
|
||||
crop_data[:] = sampling(image_data, image_width, image_height, image_channels, src_x, src_y,)
|
||||
|
||||
return True
|
||||
|
||||
def transformation_maker(crop_width, crop_height, points, mean_shape, mean_shape_width, mean_shape_height):
|
||||
points_num = len(points) # point 个数 5
|
||||
std_points = np.zeros((points_num, 2), dtype=np.float32) # 标准点
|
||||
|
||||
# 生成标准点的坐标
|
||||
for i in range(points_num):
|
||||
std_points[i, 0] = mean_shape[i * 2] * crop_width / mean_shape_width
|
||||
std_points[i, 1] = mean_shape[i * 2 + 1] * crop_height / mean_shape_height
|
||||
|
||||
feat_points = np.array(points, dtype=np.float32).reshape(points_num, 2)
|
||||
|
||||
# 初始化
|
||||
sum_x = 0.0
|
||||
sum_y = 0.0
|
||||
sum_u = 0.0
|
||||
sum_v = 0.0
|
||||
sum_xx_yy = 0.0
|
||||
sum_ux_vy = 0.0
|
||||
sum_vx_uy = 0.0
|
||||
|
||||
for c in range(points_num):
|
||||
sum_x += std_points[c, 0]
|
||||
sum_y += std_points[c, 1]
|
||||
sum_u += feat_points[c, 0]
|
||||
sum_v += feat_points[c, 1]
|
||||
sum_xx_yy += std_points[c, 0] ** 2 + std_points[c, 1] ** 2
|
||||
sum_ux_vy += std_points[c, 0] * feat_points[c, 0] + std_points[c, 1] * feat_points[c, 1]
|
||||
sum_vx_uy += feat_points[c, 1] * std_points[c, 0] - feat_points[c, 0] * std_points[c, 1]
|
||||
|
||||
if sum_xx_yy <= np.finfo(np.float32).eps:
|
||||
return False, None
|
||||
|
||||
q = sum_u - sum_x * sum_ux_vy / sum_xx_yy + sum_y * sum_vx_uy / sum_xx_yy
|
||||
p = sum_v - sum_y * sum_ux_vy / sum_xx_yy - sum_x * sum_vx_uy / sum_xx_yy
|
||||
r = points_num - (sum_x ** 2 + sum_y ** 2) / sum_xx_yy
|
||||
|
||||
if np.abs(r) <= np.finfo(np.float32).eps:
|
||||
return False, None
|
||||
|
||||
a = (sum_ux_vy - sum_x * q / r - sum_y * p / r) / sum_xx_yy
|
||||
b = (sum_vx_uy + sum_y * q / r - sum_x * p / r) / sum_xx_yy
|
||||
c = q / r
|
||||
d = p / r
|
||||
|
||||
transformation = np.zeros((2, 3), dtype=np.float64)
|
||||
transformation[0, 0] = transformation[1, 1] = a
|
||||
transformation[0, 1] = -b
|
||||
transformation[1, 0] = b
|
||||
transformation[0, 2] = c
|
||||
transformation[1, 2] = d
|
||||
|
||||
return True, transformation
|
||||
|
||||
|
||||
class FaceAlign:
|
||||
def __init__(self) -> None:
|
||||
self.crop_width, self.crop_height = 256, 256
|
||||
self.mean_shape_width, self.mean_shape_height = 256, 256
|
||||
self.mean_face = [ # 标准人脸的特征点的位置
|
||||
89.3095, 72.9025,
|
||||
169.3095, 72.9025,
|
||||
127.8949, 127.0441,
|
||||
96.8796, 184.8907,
|
||||
159.1065, 184.7601
|
||||
]
|
||||
|
||||
|
||||
# landmarks5 = [
|
||||
# [268.99814285714285, 166.26619999999997],
|
||||
# [342.636625, 164.43359999999998],
|
||||
# [311.5448214285714, 221.24419999999998],
|
||||
# [272.2709642857143, 243.23539999999997],
|
||||
# [344.2730357142857, 241.40279999999996]
|
||||
# ]
|
||||
def align(self, image, landmarks5): # 原图image landmarks5
|
||||
success, transformation = transformation_maker(self.crop_width, self.crop_height, landmarks5, self.mean_face, self.mean_shape_width, self.mean_shape_height)
|
||||
if not success:
|
||||
print("Failed to compute transformation matrix.")
|
||||
|
||||
img_height, img_width, img_channels = image.shape
|
||||
|
||||
crop_data = np.zeros((self.crop_height, self.crop_width, 3), dtype=np.uint8)
|
||||
success = spatial_transform(image, img_width, img_height, img_channels,
|
||||
crop_data, self.crop_width, self.crop_height,
|
||||
transformation,
|
||||
)
|
||||
if success:
|
||||
if os.path.exists("./images/result1.jpg"):
|
||||
cv2.imwrite("./images/result2.jpg", crop_data, [cv2.IMWRITE_JPEG_QUALITY, 100])
|
||||
else:
|
||||
cv2.imwrite("./images/result1.jpg", crop_data, [cv2.IMWRITE_JPEG_QUALITY, 100])
|
||||
else:
|
||||
print("error when spatial_transform...")
|
||||
|
||||
return crop_data
|
||||
|
||||
if __name__ == "__main__":
|
||||
fa = FaceAlign()
|
||||
landmarks5 = [(240.56920098163752, 111.91879640513824),
|
||||
(283.7146242409017, 93.30582481805237),
|
||||
(268.9820406889578, 129.202270021718),
|
||||
(259.51109411985107, 155.79222943184064),
|
||||
(296.34255299971073, 137.17925784475477)]
|
||||
landmarks5 = [ [ld5[0],ld5[1]] for ld5 in landmarks5]
|
||||
image = cv2.imread("/home/bns/seetaface6Python/seetaFace6Python/asserts/1.jpg")
|
||||
fa.align(image = image, landmarks5=landmarks5)
|
||||
|
||||
|
|
@ -0,0 +1,409 @@
|
|||
"""
|
||||
输入:原图
|
||||
输出:图片中face框
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
|
||||
|
||||
class Box:
|
||||
def __init__(self, x1, y1, x2, y2, score, label=1, label_text = 'face' ,flag=True):
|
||||
self.x1 = x1
|
||||
self.y1 = y1
|
||||
self.x2 = x2
|
||||
self.y2 = y2
|
||||
self.score = score
|
||||
self.label = label
|
||||
self.label_text = label_text
|
||||
self.flag = flag
|
||||
|
||||
def iou_of(self, other):
|
||||
inter_x1 = max(self.x1, other.x1)
|
||||
inter_y1 = max(self.y1, other.y1)
|
||||
inter_x2 = min(self.x2, other.x2)
|
||||
inter_y2 = min(self.y2, other.y2)
|
||||
|
||||
if inter_x1 < inter_x2 and inter_y1 < inter_y2:
|
||||
inter_area = (inter_x2 - inter_x1 + 1.0) * (inter_y2 - inter_y1 + 1.0)
|
||||
self_area = (self.x2 - self.x1 + 1.0) * (self.y2 - self.y1 + 1.0)
|
||||
other_area = (other.x2 - other.x1 + 1.0) * (other.y2 - other.y1 + 1.0)
|
||||
union_area = self_area + other_area - inter_area
|
||||
return inter_area / union_area
|
||||
else:
|
||||
return 0
|
||||
|
||||
def area(self):
|
||||
return (self.x2 - self.x1 + 1) * (self.y2 - self.y1 + 1)
|
||||
|
||||
def hard_nms(boxes, iou_threshold, topk):
|
||||
if not boxes:
|
||||
return []
|
||||
boxes.sort(key=lambda x: x.score, reverse=True)
|
||||
|
||||
merged = [0] * len(boxes)
|
||||
output = []
|
||||
|
||||
count = 0
|
||||
for i in range(len(boxes)):
|
||||
if merged[i]:
|
||||
continue
|
||||
buf = [boxes[i]]
|
||||
merged[i] = 1
|
||||
|
||||
for j in range(i + 1, len(boxes)):
|
||||
if merged[j]:
|
||||
continue
|
||||
|
||||
iou = boxes[i].iou_of(boxes[j])
|
||||
if iou > iou_threshold:
|
||||
merged[j] = 1
|
||||
buf.append(boxes[j])
|
||||
|
||||
output.append(buf[0])
|
||||
|
||||
count += 1
|
||||
if count >= topk:
|
||||
break
|
||||
return output
|
||||
|
||||
def blending_nms(boxes, iou_threshold, topk):
|
||||
if not boxes:
|
||||
return []
|
||||
boxes.sort(key=lambda x: x.score, reverse=True)
|
||||
merged = [0] * len(boxes)
|
||||
output = []
|
||||
|
||||
count = 0
|
||||
for i in range(len(boxes)):
|
||||
if merged[i]:
|
||||
continue
|
||||
buf = [boxes[i]]
|
||||
merged[i] = 1
|
||||
|
||||
for j in range(i + 1, len(boxes)):
|
||||
if merged[j]:
|
||||
continue
|
||||
|
||||
iou = boxes[i].iou_of(boxes[j])
|
||||
if iou > iou_threshold:
|
||||
merged[j] = 1
|
||||
buf.append(boxes[j])
|
||||
|
||||
total = sum([np.exp(box.score) for box in buf])
|
||||
rects = Box(0, 0, 0, 0, 0)
|
||||
for box in buf:
|
||||
rate = np.exp(box.score) / total
|
||||
rects.x1 += box.x1 * rate
|
||||
rects.y1 += box.y1 * rate
|
||||
rects.x2 += box.x2 * rate
|
||||
rects.y2 += box.y2 * rate
|
||||
rects.score += box.score * rate
|
||||
rects.flag = True
|
||||
output.append(rects)
|
||||
|
||||
count += 1
|
||||
if count >= topk:
|
||||
break
|
||||
return output
|
||||
|
||||
def offset_nms(boxes, iou_threshold, topk):
|
||||
if not boxes:
|
||||
return []
|
||||
boxes.sort(key=lambda x: x.score, reverse=True)
|
||||
merged = [0] * len(boxes)
|
||||
offset = 4096.0
|
||||
|
||||
for box in boxes:
|
||||
box.x1 += box.label * offset
|
||||
box.y1 += box.label * offset
|
||||
box.x2 += box.label * offset
|
||||
box.y2 += box.label * offset
|
||||
|
||||
output = []
|
||||
count = 0
|
||||
for i in range(len(boxes)):
|
||||
if merged[i]:
|
||||
continue
|
||||
buf = [boxes[i]]
|
||||
merged[i] = 1
|
||||
|
||||
for j in range(i + 1, len(boxes)):
|
||||
if merged[j]:
|
||||
continue
|
||||
|
||||
iou = boxes[i].iou_of(boxes[j])
|
||||
if iou > iou_threshold:
|
||||
merged[j] = 1
|
||||
buf.append(boxes[j])
|
||||
|
||||
output.append(buf[0])
|
||||
|
||||
count += 1
|
||||
if count >= topk:
|
||||
break
|
||||
|
||||
for box in output:
|
||||
box.x1 -= box.label * offset
|
||||
box.y1 -= box.label * offset
|
||||
box.x2 -= box.label * offset
|
||||
box.y2 -= box.label * offset
|
||||
|
||||
return output
|
||||
|
||||
def draw_rectface(img, box):
|
||||
x = max(0,int(box.x1))
|
||||
y = max(0,int(box.y1))
|
||||
w = min(img.shape[1]-x, int(box.x2-x+1))
|
||||
h = min(img.shape[0]-y, int(box.y2-y+1))
|
||||
cv2.rectangle(img,(x,y),(x+w,y+h),(0,0,255),3)
|
||||
# return img
|
||||
|
||||
def cut_rectface(img, box):
|
||||
x = max(0,int(box.x1))
|
||||
y = max(0,int(box.y1))
|
||||
w = min(img.shape[1]-x, int(box.x2-x+1))
|
||||
h = min(img.shape[0]-y, int(box.y2-y+1))
|
||||
return img[y:y+h,x:x+w]
|
||||
|
||||
def normalize_inplace(mat, mean, scale):
|
||||
mat = mat.astype(np.float32)
|
||||
mat -= mean
|
||||
mat *= scale
|
||||
return mat
|
||||
|
||||
def create_tensor(mat, tensor_dims, memory_info_handler, data_format):
|
||||
rows, cols, channels = mat.shape
|
||||
if len(tensor_dims) != 4:
|
||||
raise RuntimeError("dims mismatch.")
|
||||
if tensor_dims[0] != 1:
|
||||
raise RuntimeError("batch != 1")
|
||||
|
||||
if data_format == "CHW":
|
||||
target_height = tensor_dims[2]
|
||||
target_width = tensor_dims[3]
|
||||
target_channel = tensor_dims[1]
|
||||
# target_tensor_size = target_channel * target_height * target_width
|
||||
if target_channel != channels:
|
||||
raise RuntimeError("channel mismatch.")
|
||||
|
||||
if target_height != rows or target_width != cols:
|
||||
print("in create_tensor, resize mat...")
|
||||
mat = cv2.resize(mat, (target_width, target_height))
|
||||
|
||||
mat = mat.transpose(2, 0, 1) # HWC -> CHW # 这儿存疑。
|
||||
mat = np.expand_dims(mat, axis=0)
|
||||
return ort.OrtValue.ortvalue_from_numpy(mat, 'cpu')
|
||||
|
||||
elif data_format == "HWC":
|
||||
target_height = tensor_dims[1]
|
||||
target_width = tensor_dims[2]
|
||||
target_channel = tensor_dims[3]
|
||||
target_tensor_size = target_channel * target_height * target_width
|
||||
if target_channel != channels:
|
||||
raise RuntimeError("channel mismatch.")
|
||||
|
||||
if target_height != rows or target_width != cols:
|
||||
mat = cv2.resize(mat, (target_width, target_height))
|
||||
|
||||
return ort.OrtValue.ortvalue_from_numpy(mat, 'cpu')
|
||||
|
||||
class BasicOrtHandler:
|
||||
def __init__(self, onnx_path, num_threads=1):
|
||||
self.onnx_path = onnx_path
|
||||
self.num_threads = num_threads
|
||||
self.initialize_handler()
|
||||
|
||||
def initialize_handler(self):
|
||||
# self.ort_env = ort.Env(ort.logging.ERROR)
|
||||
session_options = ort.SessionOptions()
|
||||
session_options.intra_op_num_threads = self.num_threads
|
||||
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
|
||||
|
||||
# self.ort_session = ort.InferenceSession(self.onnx_path, session_options)
|
||||
# self.memory_info_handler = ort.OrtMemoryInfo("cpu", ort.OrtAllocatorType.ORT_ARENA_ALLOCATOR)
|
||||
|
||||
# Initialize session
|
||||
self.ort_session = ort.InferenceSession(self.onnx_path, session_options)
|
||||
self.memory_info_handler = ort.OrtMemoryInfo("Cpu", ort.OrtAllocatorType.ORT_ARENA_ALLOCATOR, 0, ort.OrtMemType.DEFAULT)
|
||||
|
||||
|
||||
self.input_node_names = [self.ort_session.get_inputs()[0].name]
|
||||
self.input_node_dims = self.ort_session.get_inputs()[0].shape # 获取输入张量的shape
|
||||
self.input_tensor_size = np.prod(self.input_node_dims)
|
||||
|
||||
self.output_node_names = [out.name for out in self.ort_session.get_outputs()]
|
||||
self.output_node_dims = [out.shape for out in self.ort_session.get_outputs()]
|
||||
self.num_outputs = len(self.output_node_names)
|
||||
|
||||
def __del__(self):
|
||||
del self.ort_session
|
||||
|
||||
class FaceBoxesV2(BasicOrtHandler):
|
||||
def __init__(self, onnx_path, num_threads=1):
|
||||
super().__init__(onnx_path, num_threads)
|
||||
self.mean_vals = np.array([104.0, 117.0, 123.0], dtype=np.float32)
|
||||
self.scale_vals = np.array([1.0, 1.0, 1.0], dtype=np.float32)
|
||||
self.variance = [0.1, 0.2]
|
||||
self.steps = [32, 64, 128]
|
||||
self.min_sizes = [
|
||||
[32, 64, 128],
|
||||
[256],
|
||||
[512]
|
||||
]
|
||||
self.max_nms = 30000
|
||||
|
||||
def transform(self, mat):
|
||||
canvas = cv2.resize(mat, (self.input_node_dims[3], self.input_node_dims[2]))
|
||||
canvas = normalize_inplace(canvas, self.mean_vals, self.scale_vals)
|
||||
return create_tensor(canvas, self.input_node_dims, self.memory_info_handler, "CHW")
|
||||
|
||||
def detect(self, mat, score_threshold=0.35, iou_threshold=0.45, topk=300, nms_type=0):
|
||||
if mat is None or mat.size == 0:
|
||||
return
|
||||
|
||||
img_height = float(mat.shape[0])
|
||||
img_width = float(mat.shape[1])
|
||||
|
||||
# 1. make input tensor
|
||||
input_tensor = self.transform(mat)
|
||||
# 2. inference scores & boxes.
|
||||
output_tensors = self.ort_session.run(self.output_node_names, {self.input_node_names[0]: input_tensor})
|
||||
|
||||
# 3. rescale & exclude.
|
||||
bbox_collection = []
|
||||
bbox_collection = self.generate_bboxes(output_tensors, score_threshold, img_height, img_width)
|
||||
|
||||
# 4. hard|blend|offset nms with topk. return detected_boxes
|
||||
return self.nms(bbox_collection, iou_threshold, topk, nms_type)
|
||||
|
||||
def generate_bboxes(self, output_tensors, score_threshold, img_height, img_width):
|
||||
bboxes = output_tensors[0] # e.g (1,n,4)
|
||||
probs = output_tensors[1] # e.g (1,n,2) after softmax
|
||||
bbox_dims = self.output_node_dims[0] # (1,n,4)
|
||||
bbox_num = bbox_dims[1] # n = ?
|
||||
input_height = self.input_node_dims[2] # e.g 640
|
||||
input_width = self.input_node_dims[3] # e.g 640
|
||||
|
||||
anchors = self.generate_anchors(input_height, input_width)
|
||||
|
||||
num_anchors = len(anchors)
|
||||
if num_anchors != bbox_num:
|
||||
print(f"num_anchors={num_anchors} but detected bbox_num={bbox_num}")
|
||||
raise RuntimeError("mismatch num_anchors != bbox_num")
|
||||
|
||||
bbox_collection = []
|
||||
count = 0
|
||||
for i in range(num_anchors):
|
||||
conf = probs[0, i, 1]
|
||||
if conf < score_threshold:
|
||||
continue # filter first.
|
||||
|
||||
# prior_cx = anchors[i].cx
|
||||
# prior_cy = anchors[i].cy
|
||||
# prior_s_kx = anchors[i].s_kx
|
||||
# prior_s_ky = anchors[i].s_ky
|
||||
prior_cx, prior_cy, prior_s_kx, prior_s_ky = anchors[i]
|
||||
|
||||
dx = bboxes[0, i, 0]
|
||||
dy = bboxes[0, i, 1]
|
||||
dw = bboxes[0, i, 2]
|
||||
dh = bboxes[0, i, 3]
|
||||
|
||||
cx = prior_cx + dx * self.variance[0] * prior_s_kx
|
||||
cy = prior_cy + dy * self.variance[0] * prior_s_ky
|
||||
w = prior_s_kx * np.exp(dw * self.variance[1])
|
||||
h = prior_s_ky * np.exp(dh * self.variance[1]) # norm coor (0.,1.)
|
||||
|
||||
box = Box(
|
||||
x1=(cx - w / 2.0) * img_width,
|
||||
y1=(cy - h / 2.0) * img_height,
|
||||
x2=(cx + w / 2.0) * img_width,
|
||||
y2=(cy + h / 2.0) * img_height,
|
||||
score=conf,
|
||||
label=1,
|
||||
label_text="face",
|
||||
flag=True
|
||||
)
|
||||
bbox_collection.append(box)
|
||||
|
||||
count += 1 # limit boxes for nms.
|
||||
if count > self.max_nms:
|
||||
break
|
||||
|
||||
return bbox_collection
|
||||
|
||||
def nms(self, input_boxes, iou_threshold, topk, nms_type):
|
||||
if nms_type == 1:
|
||||
output_boxes = blending_nms(input_boxes, iou_threshold, topk)
|
||||
elif nms_type == 2:
|
||||
output_boxes = offset_nms(input_boxes, iou_threshold, topk)
|
||||
elif nms_type == 0:
|
||||
output_boxes = hard_nms(input_boxes, iou_threshold, topk)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
return output_boxes
|
||||
|
||||
def generate_anchors(self, target_height, target_width):
|
||||
feature_maps = []
|
||||
for step in self.steps:
|
||||
feature_maps.append([
|
||||
int(np.ceil(target_height / step)),
|
||||
int(np.ceil(target_width / step))
|
||||
])
|
||||
|
||||
anchors = []
|
||||
for k, f_map in enumerate(feature_maps):
|
||||
tmp_min_sizes = self.min_sizes[k]
|
||||
f_h, f_w = f_map
|
||||
|
||||
offset_32 = [0.0, 0.25, 0.5, 0.75]
|
||||
offset_64 = [0.0, 0.5]
|
||||
|
||||
for i in range(f_h):
|
||||
for j in range(f_w):
|
||||
for min_size in tmp_min_sizes:
|
||||
s_kx = min_size / target_width
|
||||
s_ky = min_size / target_height
|
||||
|
||||
if min_size == 32:
|
||||
for offset_y in offset_32:
|
||||
for offset_x in offset_32:
|
||||
cx = (j + offset_x) * self.steps[k] / target_width
|
||||
cy = (i + offset_y) * self.steps[k] / target_height
|
||||
anchors.append([cx, cy, s_kx, s_ky])
|
||||
elif min_size == 64:
|
||||
for offset_y in offset_64:
|
||||
for offset_x in offset_64:
|
||||
cx = (j + offset_x) * self.steps[k] / target_width
|
||||
cy = (i + offset_y) * self.steps[k] / target_height
|
||||
anchors.append([cx, cy, s_kx, s_ky])
|
||||
else:
|
||||
cx = (j + 0.5) * self.steps[k] / target_width
|
||||
cy = (i + 0.5) * self.steps[k] / target_height
|
||||
anchors.append([cx, cy, s_kx, s_ky])
|
||||
|
||||
return anchors
|
||||
|
||||
|
||||
|
||||
# Usage example
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
import os
|
||||
img_path = sys.argv[1]
|
||||
reta = FaceBoxesV2(r"./checkpoints/faceboxesv2-640x640.onnx",4)
|
||||
img = cv2.imread(img_path)
|
||||
detected_boxes = reta.detect(img)
|
||||
count = 0
|
||||
for box in detected_boxes:
|
||||
print(f"({box.x1:.3f},{box.y1:.3f},{box.x2:.3f},{box.y2:.3f})", end=" ")
|
||||
count += 1
|
||||
print("total face number:",count)
|
||||
|
||||
for box in detected_boxes:
|
||||
draw_rectface(img, box)
|
||||
|
||||
filename = os.path.basename(img_path)
|
||||
cv2.imwrite("./" + filename, img)
|
||||
|
|
@ -0,0 +1,192 @@
|
|||
"""
|
||||
输入:原图,图中face框
|
||||
输出:每张face的5点特征点的位置
|
||||
"""
|
||||
# from common.face_landmark_points_util import shape_index_process
|
||||
# from common.face_detector_util import prior_box_forward, decode, nms_sorted
|
||||
|
||||
|
||||
import cv2
|
||||
import onnxruntime as ort
|
||||
import numpy as np
|
||||
|
||||
# 设置ONNX Runtime的日志级别为ERROR
|
||||
ort.set_default_logger_severity(3) # 3表示ERROR级别
|
||||
m_origin_patch = [15, 15]
|
||||
m_origin = [112, 112]
|
||||
|
||||
class HypeShape:
|
||||
def __init__(self, shape):
|
||||
self.m_shape = shape
|
||||
self.m_weights = [0]*len(self.m_shape)
|
||||
size = len(self.m_shape)
|
||||
self.m_weights[size - 1] = self.m_shape[size - 1]
|
||||
for times in range(size - 1):
|
||||
self.m_weights[size - 1 - times - 1] = self.m_weights[size - 1 - times] * self.m_shape[size - 1 - times - 1]
|
||||
|
||||
def to_index(self, coordinate):
|
||||
if len(coordinate) == 0:
|
||||
return 0
|
||||
size = len(coordinate)
|
||||
weight_start = len(self.m_weights) - size + 1
|
||||
index = 0
|
||||
for times in range(size - 1):
|
||||
index += self.m_weights[weight_start + times] * coordinate[times]
|
||||
index += coordinate[size - 1]
|
||||
return index
|
||||
|
||||
|
||||
def shape_index_process(feat_data, pos_data):
|
||||
feat_h = feat_data.shape[2]
|
||||
feat_w = feat_data.shape[3]
|
||||
|
||||
landmarkx2 = pos_data.shape[1]
|
||||
x_patch_h = int( m_origin_patch[0] * feat_data.shape[2] / float( m_origin[0] ) + 0.5 )
|
||||
x_patch_w = int( m_origin_patch[1] * feat_data.shape[3] / float( m_origin[1] ) + 0.5 )
|
||||
|
||||
feat_patch_h = x_patch_h
|
||||
feat_patch_w = x_patch_w
|
||||
|
||||
num = feat_data.shape[0]
|
||||
channels = feat_data.shape[1]
|
||||
|
||||
r_h = ( feat_patch_h - 1 ) / 2.0
|
||||
r_w = ( feat_patch_w - 1 ) / 2.0
|
||||
landmark_num = int(landmarkx2 * 0.5)
|
||||
|
||||
pos_offset = HypeShape([pos_data.shape[0], pos_data.shape[1]])
|
||||
feat_offset = HypeShape([feat_data.shape[0], feat_data.shape[1], feat_data.shape[2], feat_data.shape[3]])
|
||||
nmarks = int( landmarkx2 * 0.5 )
|
||||
out_shape = [feat_data.shape[0], feat_data.shape[1], x_patch_h, nmarks, x_patch_w]
|
||||
out_offset = HypeShape([feat_data.shape[0], feat_data.shape[1], x_patch_h, nmarks, x_patch_w])
|
||||
buff = np.zeros(out_shape)
|
||||
zero = 0
|
||||
|
||||
buff = buff.reshape((-1))
|
||||
pos_data = pos_data.reshape((-1))
|
||||
feat_data = feat_data.reshape((-1))
|
||||
|
||||
for i in range(landmark_num):
|
||||
for n in range(num):
|
||||
# coordinate of the first patch pixel, scale to the feature map coordinate
|
||||
y = int( pos_data[pos_offset.to_index( [n, 2 * i + 1] )] * ( feat_h - 1 ) - r_h + 0.5 )
|
||||
x = int( pos_data[pos_offset.to_index( [n, 2 * i] )] * ( feat_w - 1 ) - r_w + 0.5 )
|
||||
|
||||
for c in range(channels):
|
||||
for ph in range(feat_patch_h):
|
||||
for pw in range(feat_patch_w):
|
||||
y_p = y + ph
|
||||
x_p = x + pw
|
||||
# set zero if exceed the img bound
|
||||
if y_p < 0 or y_p >= feat_h or x_p < 0 or x_p >= feat_w:
|
||||
buff[out_offset.to_index( [n, c, ph, i, pw] )] = zero
|
||||
else:
|
||||
buff[out_offset.to_index( [n, c, ph, i, pw] )] = feat_data[feat_offset.to_index( [n, c, y_p, x_p] )]
|
||||
|
||||
return buff.reshape((1,-1,1,1)).astype(np.float32)
|
||||
|
||||
|
||||
def crop_face(image:np.ndarray, face, H, W):
|
||||
"""
|
||||
Crop a face from an image with padding if the face is out of bounds.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image as a numpy array of shape (H, W, C).
|
||||
face (tuple): A tuple containing (x, y, w, h) for the face rectangle.
|
||||
padding (list): Padding data for padding the image.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Cropped and padded image.
|
||||
"""
|
||||
x0, y0, x1, y1 = int(round(face[0])), int(round(face[1])), int(round(face[2])), int(round(face[3]))
|
||||
|
||||
# Calculate padding
|
||||
pad_top = max(0, -y0)
|
||||
pad_bottom = max(0, y1 - H)
|
||||
pad_left = max(0, -x0)
|
||||
pad_right = max(0, x1 - W)
|
||||
|
||||
# Apply padding
|
||||
padded_image = np.pad(image, ((pad_top, pad_bottom), (pad_left, pad_right), (0, 0)), mode='constant', constant_values=0)
|
||||
|
||||
# Update new coordinates after padding
|
||||
new_x0, new_y0 = x0 + pad_left, y0 + pad_top
|
||||
new_x1, new_y1 = x1 + pad_left, y1 + pad_top
|
||||
|
||||
# Crop the face
|
||||
cropped_image = padded_image[new_y0:new_y1, new_x0:new_x1, :]
|
||||
|
||||
return cropped_image, (x0,y0,x1,y1)
|
||||
|
||||
|
||||
class Landmark5er():
|
||||
def __init__(self, onnx_path1, onnx_path2, num_threads=1) -> None:
|
||||
session_options = ort.SessionOptions()
|
||||
session_options.intra_op_num_threads = num_threads
|
||||
# 初始化 InferenceSession 时传入 SessionOptions 对象
|
||||
self.ort_session1 = ort.InferenceSession(onnx_path1, session_options=session_options)
|
||||
self.ort_session2 = ort.InferenceSession(onnx_path2, session_options=session_options)
|
||||
self.first_input_name = self.ort_session1.get_inputs()[0].name
|
||||
self.second_input_name = self.ort_session2.get_inputs()[0].name
|
||||
|
||||
def inference(self, image, box):
|
||||
# face_img = image[int(box[1]):int(box[3]), int(box[0]):int(box[2])] # 这种裁剪不对,不合适
|
||||
H,W,C = image.shape
|
||||
face_img, box = crop_face(image,box,H,W)
|
||||
|
||||
x1,y1,x2,y2 = int(box[0]) ,int(box[1]), int(box[2]),int(box[3])
|
||||
if x1 < 0 or x1 > W-1 or x2 < 0 or x2 > W:
|
||||
print("x超出边界")
|
||||
if y1 < 0 or y1 > H-1 or y2 < 0 or y2 > H:
|
||||
print("y超出边界")
|
||||
|
||||
face_img = cv2.resize(face_img,(112,112))
|
||||
|
||||
gray_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2GRAY)
|
||||
gray_img = gray_img.reshape((1, 1, 112, 112)).astype(np.float32) # 输入必须是(1,1,112,112)
|
||||
# points5 net1
|
||||
results_1 = self.ort_session1.run([], {self.first_input_name: gray_img})
|
||||
|
||||
# shape index process
|
||||
feat_data = results_1[0]
|
||||
pos_data = results_1[1]
|
||||
shape_index_results = shape_index_process(feat_data, pos_data)
|
||||
results_2 = self.ort_session2.run([], {self.second_input_name: shape_index_results})
|
||||
|
||||
landmarks = (results_2[0] + results_1[1]) * 112
|
||||
# print("results_2[0] , results_1[1]: ",results_2[0], results_1[1])
|
||||
# print("in find_landmarks, landmarks:", landmarks)
|
||||
landmarks = landmarks.reshape((-1)).astype(np.int32)
|
||||
|
||||
scale_x = (box[2] - box[0]) / 112.0
|
||||
scale_y = (box[3] - box[1]) / 112.0
|
||||
mapped_landmarks = []
|
||||
for i in range(landmarks.size // 2):
|
||||
x = box[0] + landmarks[2 * i] * scale_x
|
||||
y = box[1] + landmarks[2 * i + 1] * scale_y
|
||||
x = max(0.01, min(x,W-0.01))
|
||||
y = max(0.01, min(y,H-0.01))
|
||||
mapped_landmarks.append((x, y))
|
||||
|
||||
return mapped_landmarks
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
ld5 = Landmark5er(onnx_path1="./checkpoints/face_landmarker_pts5_net1.onnx",
|
||||
onnx_path2="./checkpoints/face_landmarker_pts5_net2.onnx",
|
||||
num_threads=1)
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
jpg_path = sys.argv[1]
|
||||
else:
|
||||
jpg_path = "asserts/1.jpg"
|
||||
|
||||
image = cv2.imread(jpg_path)
|
||||
if image is None:
|
||||
print("Error: Could not load image.")
|
||||
exit()
|
||||
|
||||
box = (201.633087308643, 42.78490193881931, 319.49375572419393, 191.68867463550623)
|
||||
landmarks5 = ld5.inference(image,box)
|
||||
print(landmarks5)
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
输入:aligned cropped face
|
||||
输出:face feature
|
||||
注意: 提供计算两个feature的相似度的函数
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import onnxruntime as ort
|
||||
import numpy as np
|
||||
|
||||
# 设置ONNX Runtime的日志级别为ERROR
|
||||
ort.set_default_logger_severity(3) # 3表示ERROR级别
|
||||
|
||||
class FaceRecoger():
|
||||
def __init__(self, onnx_path, num_threads=1) -> None:
|
||||
session_options = ort.SessionOptions()
|
||||
session_options.intra_op_num_threads = num_threads
|
||||
# 初始化 InferenceSession 时传入 SessionOptions 对象
|
||||
self.ort_session = ort.InferenceSession(onnx_path, session_options=session_options)
|
||||
|
||||
output_node_dims = [out.shape for out in self.ort_session.get_outputs()]
|
||||
self.len_feat = output_node_dims[0][1] # feature 的长度为...
|
||||
|
||||
def inference(self, crop_img): # crop_img = cv2.imread(img_path) bgr
|
||||
input_feed = {}
|
||||
if crop_img.shape[:2] != (248,248): # 这里还有另一种方式 ,[4:252, 4:252,...]
|
||||
crop_img = cv2.resize(crop_img,(248,248))
|
||||
crop_img = crop_img[...,::-1]
|
||||
input_data = crop_img.transpose((2, 0, 1))
|
||||
input_feed['_input_123'] = input_data.reshape((1, 3, 248, 248)).astype(np.float32)
|
||||
pred_result = self.ort_session.run([], input_feed=input_feed)
|
||||
temp_result = np.sqrt(pred_result[0])
|
||||
norm = temp_result / np.linalg.norm(temp_result, axis=1)
|
||||
return norm.flatten() # return normalize feature
|
||||
|
||||
@staticmethod
|
||||
def compute_sim(feat1,feat2):
|
||||
feat1, feat2 = feat1.flatten(), feat2.flatten()
|
||||
assert feat1.shape == feat2.shape
|
||||
sim = np.sum(feat1 * feat2)
|
||||
return sim
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fr = FaceRecoger(onnx_path = "./checkpoints/face_recognizer.onnx", num_threads=1)
|
||||
import sys
|
||||
imgpath1 = sys.argv[1]
|
||||
imgpath2 = sys.argv[2]
|
||||
img1,img2 = cv2.imread(imgpath1),cv2.imread(imgpath2)
|
||||
feat1 = fr.inference(img1)
|
||||
feat2 = fr.inference(img2)
|
||||
|
||||
print("sim: ", FaceRecoger.compute_sim(feat1, feat2))
|
||||
|
|
@ -0,0 +1,221 @@
|
|||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
|
||||
|
||||
class QualityOfClarity:
|
||||
def __init__(self, low_thresh=0.10, high_thresh=0.20):
|
||||
self.low_thresh = low_thresh
|
||||
self.high_thresh = high_thresh
|
||||
|
||||
def reblur(self, data, width, height):
|
||||
data = np.array(data, dtype=np.float32).reshape((height, width))
|
||||
|
||||
# 创建一维核
|
||||
kernel = np.ones((9,), np.float32) / 9.0
|
||||
|
||||
# 垂直方向模糊处理
|
||||
BVer = cv2.filter2D(data, -1, kernel.reshape(-1, 1), borderType=cv2.BORDER_REPLICATE)
|
||||
|
||||
# 水平方向模糊处理
|
||||
BHor = cv2.filter2D(data, -1, kernel.reshape(1, -1), borderType=cv2.BORDER_REPLICATE)
|
||||
|
||||
s_FVer, s_FHor, s_Vver, s_VHor = 0.0, 0.0, 0.0, 0.0
|
||||
|
||||
# 计算垂直方向的差分
|
||||
D_Fver = np.abs(data[1:, :] - data[:-1, :])
|
||||
D_BVer = np.abs(BVer[1:, :] - BVer[:-1, :])
|
||||
|
||||
# 计算垂直方向的累积
|
||||
s_FVer = np.sum(D_Fver)
|
||||
s_Vver = np.sum(np.maximum(0.0, D_Fver - D_BVer))
|
||||
|
||||
# 计算水平方向的差分
|
||||
D_FHor = np.abs(data[:, 1:] - data[:, :-1])
|
||||
D_BHor = np.abs(BHor[:, 1:] - BHor[:, :-1])
|
||||
|
||||
# 计算水平方向的累积
|
||||
s_FHor = np.sum(D_FHor)
|
||||
s_VHor = np.sum(np.maximum(0.0, D_FHor - D_BHor))
|
||||
|
||||
|
||||
b_FVer = (s_FVer - s_Vver) / s_FVer
|
||||
b_FHor = (s_FHor - s_VHor) / s_FHor
|
||||
blur_val = max(b_FVer, b_FHor)
|
||||
|
||||
return blur_val
|
||||
|
||||
def grid_max_reblur(self, img, rows, cols):
|
||||
height, width = img.shape
|
||||
row_height = height // rows
|
||||
col_width = width // cols
|
||||
blur_val = float('-inf')
|
||||
|
||||
for y in range(rows):
|
||||
for x in range(cols):
|
||||
grid = img[y * row_height: (y + 1) * row_height, x * col_width: (x + 1) * col_width]
|
||||
this_grad_blur_val = self.reblur(grid, col_width, row_height)
|
||||
if this_grad_blur_val > blur_val:
|
||||
blur_val = this_grad_blur_val
|
||||
|
||||
return max(blur_val, 0.0)
|
||||
|
||||
def clarity_estimate(self, image):
|
||||
# x, y, w, h = rect
|
||||
# if w < 9 or h < 9:
|
||||
# return 0.0
|
||||
|
||||
src_data = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
# src_data = gray_data[y:y+h, x:x+w]
|
||||
blur_val = self.grid_max_reblur(src_data, 2, 2)
|
||||
clarity = 1.0 - blur_val
|
||||
|
||||
T1, T2 = 0.0, 1.0
|
||||
if clarity <= T1:
|
||||
clarity = 0.0
|
||||
elif clarity >= T2:
|
||||
clarity = 1.0
|
||||
else:
|
||||
clarity = (clarity - T1) / (T2 - T1)
|
||||
|
||||
return clarity
|
||||
|
||||
def check(self, image):
|
||||
clarity = self.clarity_estimate(image)
|
||||
if clarity < self.low_thresh:
|
||||
level = "LOW"
|
||||
elif self.low_thresh <= clarity < self.high_thresh:
|
||||
level = "MEDIUM"
|
||||
else:
|
||||
level = "HIGH"
|
||||
return level != 'LOW'
|
||||
# return {'level': level, "score": clarity}
|
||||
|
||||
|
||||
class QualityChecker: # check resolution and clarity of image
|
||||
def __init__(self, v0=70.0, v1=100.0, v2=210.0, v3=230.0, hw = (112,112)):
|
||||
self.bright_thresh0 = v0
|
||||
self.bright_thresh1 = v1
|
||||
self.bright_thresh2 = v2
|
||||
self.bright_thresh3 = v3
|
||||
self.middle_thresh = (self.bright_thresh1 + self.bright_thresh2) / 2
|
||||
self.rolu_thrds = hw # (h,w)
|
||||
|
||||
def get_bright_score(self, bright):
|
||||
bright_score = 1.0 / (abs(bright - self.middle_thresh) + 1)
|
||||
return bright_score
|
||||
|
||||
def grid_max_bright(self, img, rows, cols):
|
||||
row_height = img.shape[0] // rows
|
||||
col_width = img.shape[1] // cols
|
||||
|
||||
# 使用列表生成式获取所有网格的平均亮度
|
||||
grid_means = [
|
||||
np.mean(img[y * row_height:(y + 1) * row_height, x * col_width:(x + 1) * col_width])
|
||||
for y in range(rows)
|
||||
for x in range(cols)
|
||||
]
|
||||
|
||||
# 获取最大亮度值
|
||||
bright_val = max(grid_means)
|
||||
return max(bright_val, 0)
|
||||
|
||||
def check_bright(self, face_image):
|
||||
gray = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) if len(face_image.shape) == 3 else face_image
|
||||
bright_value = self.grid_max_bright(gray, 3, 3)
|
||||
|
||||
if bright_value < self.bright_thresh0 or bright_value > self.bright_thresh3:
|
||||
level = "LOW"
|
||||
elif (self.bright_thresh0 <= bright_value < self.bright_thresh1) or (self.bright_thresh2 < bright_value <= self.bright_thresh3):
|
||||
level = "MEDIUM"
|
||||
else:
|
||||
level = "HIGH"
|
||||
|
||||
return level == "HIGH"
|
||||
|
||||
def check_resolution(self, face_image):
|
||||
H, W = face_image.shape[:2]
|
||||
if H < self.rolu_thrds[0] or W < self.rolu_thrds[1]:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class QualityOfPose:
|
||||
def __init__(self, pad=0.3, yaw_thrd=30, pitch_thrd=25, var_onnx_path = './checkpoints/fsanet-var.onnx', conv_onnx_path='./checkpoints/fsanet-conv.onnx') -> None:
|
||||
self.pad = pad
|
||||
self.input_width = 64
|
||||
self.input_height = 64
|
||||
self.yaw_thrd = abs(yaw_thrd)
|
||||
self.pitch_thrd = abs(pitch_thrd)
|
||||
self.var_fsanet = ort.InferenceSession(var_onnx_path)
|
||||
self.conv_fsanet = ort.InferenceSession(conv_onnx_path)
|
||||
|
||||
self.var_input_names = [input_.name for input_ in self.var_fsanet.get_inputs()]
|
||||
self.var_output_names = [output.name for output in self.var_fsanet.get_outputs()]
|
||||
|
||||
self.conv_input_names = [input_.name for input_ in self.conv_fsanet.get_inputs()]
|
||||
self.conv_output_names = [output.name for output in self.conv_fsanet.get_outputs()]
|
||||
|
||||
def transform(self, mat):
|
||||
h, w = mat.shape[:2]
|
||||
nh = int(h + self.pad * h)
|
||||
nw = int(w + self.pad * w)
|
||||
nx1 = max(0, (nw - w) // 2)
|
||||
ny1 = max(0, (nh - h) // 2)
|
||||
|
||||
# Create a padded canvas and copy the image into the center
|
||||
canvas = np.zeros((nh, nw, 3), dtype=np.uint8)
|
||||
canvas[ny1:ny1 + h, nx1:nx1 + w] = mat
|
||||
|
||||
# Resize the image to the input dimensions
|
||||
canvas = cv2.resize(canvas, (self.input_width, self.input_height))
|
||||
|
||||
# Normalize the image in-place
|
||||
canvas = canvas.astype(np.float32)
|
||||
mean = 127.5
|
||||
scale = 1.0 / 127.5
|
||||
canvas = (canvas - mean) * scale
|
||||
|
||||
# Convert to CHW format
|
||||
canvas = np.transpose(canvas, (2, 0, 1))
|
||||
|
||||
# Create a tensor
|
||||
input_tensor = np.expand_dims(canvas, axis=0).astype(np.float32)
|
||||
return input_tensor
|
||||
|
||||
def detect_angle(self, img):
|
||||
input_tensor = self.transform(img)
|
||||
|
||||
var_output = self.var_fsanet.run(
|
||||
self.var_output_names, {self.var_input_names[0]: input_tensor}
|
||||
)[0]
|
||||
conv_output = self.conv_fsanet.run(
|
||||
self.conv_output_names, {self.conv_input_names[0]: input_tensor}
|
||||
)[0]
|
||||
yaw, pitch, roll = np.mean(np.vstack((var_output,conv_output)), axis=0)
|
||||
|
||||
return yaw, pitch, roll
|
||||
|
||||
def check(self, image):
|
||||
yaw, pitch, roll = self.detect_angle(image)
|
||||
|
||||
if abs(yaw) <= self.yaw_thrd and abs(pitch) <= self.pitch_thrd:
|
||||
return "frontFace"
|
||||
elif yaw < -1.0 * self.yaw_thrd:
|
||||
return "rightFace"
|
||||
elif yaw > self.yaw_thrd:
|
||||
return "leftFace"
|
||||
elif pitch > self.pitch_thrd:
|
||||
return "upFace"
|
||||
elif pitch < -1.0 * self.pitch_thrd:
|
||||
return "downFace"
|
||||
else:
|
||||
return "implementError"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
qp = QualityOfPose()
|
||||
img = cv2.imread("/home/bns/liteAitoolkit/Demos_onnx/examples/det_oeyecmouth.jpg")
|
||||
angles = qp.check(img)
|
||||
print(f"ONNXRuntime Version yaw: {angles[0]} pitch: {angles[1]} roll: {angles[2]}")
|
||||
pass
|
||||
|
|
@ -0,0 +1,588 @@
|
|||
# created on 2024/6/12
|
||||
# modified on 2024/6/12
|
||||
# description: tool file (.py)
|
||||
import cv2
|
||||
import os
|
||||
import hashlib
|
||||
import pickle
|
||||
import requests
|
||||
import base64
|
||||
import logging
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
import redis
|
||||
from redisLock import RedisReadWriteLock
|
||||
import onnxruntime
|
||||
import time
|
||||
import yaml
|
||||
from models import FaceRecoger, FaceBoxesV2, Landmark5er, FaceAlign, QualityOfClarity, QualityOfPose, QualityChecker
|
||||
|
||||
so = onnxruntime.SessionOptions()
|
||||
so.log_severity_level = 3 # 0=VERBOSE, 1=INFO, 2=WARNING, 3=ERROR, 4=FATAL
|
||||
|
||||
|
||||
# 获取workers
|
||||
if "NUM_WORKERS" not in os.environ:
|
||||
raise RuntimeError("Environment variable NUM_WORKERS is required but not set.")
|
||||
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 10))
|
||||
|
||||
# max readers
|
||||
max_readers = int(os.getenv("MAX_READERS",60))
|
||||
|
||||
# 连接到 Redis
|
||||
redis_host = str(os.getenv("REDIS_HOST", 'localhost'))
|
||||
redis_port = int(os.getenv("REDIS_PORT", 2012))
|
||||
redis_password = str(os.getenv("REDIS_PASSWORD", 'Xjsfzb@Redis123!'))
|
||||
# connected
|
||||
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=0)
|
||||
|
||||
PID_id = None
|
||||
NUM_WORKERS = int(os.getenv("NUM_WORKERS", 10))
|
||||
for i in range(NUM_WORKERS):
|
||||
if redis_client.setnx(f"worker_{i}", 0): # 设置为dirty
|
||||
PID_id = i
|
||||
break
|
||||
|
||||
# create ReadWriteLock
|
||||
rw_lock = RedisReadWriteLock(redis_client, max_readers=max_readers)
|
||||
|
||||
ErrorMsg = {
|
||||
"101":"no face in the database",
|
||||
"102":"invalid file path",
|
||||
"103":"invaild file suffix",
|
||||
|
||||
"201":"input file type is not support, only support: base64, url, local-path",
|
||||
"202":"this updateDB type is not support",
|
||||
|
||||
"300":"deal the image successfully.",
|
||||
"301":"no face in the image.",
|
||||
"302":"too many face in the image.",
|
||||
"303":"bad bright of face in the image.",
|
||||
"3041":"face shifted left/right, partially not captured.",
|
||||
"3042":"face shifted top/bottom, partially not captured.",
|
||||
"3051":"rightFace in the image.",
|
||||
"3052":"leftFace in the image.",
|
||||
"3053":"upFace in the image.",
|
||||
"3054":"downFace in the image.",
|
||||
"306":"too small resolution of face in the image.",
|
||||
"307":"bad clarity of face in the image.",
|
||||
|
||||
"401":"identity has exist. to pretect the db, reject opt of this time ",
|
||||
}
|
||||
|
||||
class FileError(Exception):
|
||||
def __init__(self, arg:str):
|
||||
self.code = arg
|
||||
self.args = [f"{str(self.__class__.__name__)} {str(arg)}: {ErrorMsg[arg]}"]
|
||||
|
||||
class NotImpltError(Exception):
|
||||
def __init__(self, arg:str):
|
||||
self.code = arg
|
||||
self.args = [f"{str(self.__class__.__name__)} {str(arg)}: {ErrorMsg[arg]}"]
|
||||
|
||||
class FaceError(Exception):
|
||||
def __init__(self, arg:str):
|
||||
self.code = arg
|
||||
self.args = [f"{str(self.__class__.__name__)} {str(arg)}: {ErrorMsg[arg]}"]
|
||||
|
||||
class UpdatedbError(Exception):
|
||||
def __init__(self, arg:str):
|
||||
self.code = arg
|
||||
self.args = [f"{str(self.__class__.__name__)} {str(arg)}: {ErrorMsg[arg]}"]
|
||||
|
||||
# setting Logger
|
||||
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
log_dir = f"{os.path.dirname(os.path.abspath(__file__))}/log"
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
logging.basicConfig(filename=f'{log_dir}/{current_time}.log', level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
logger = logging.getLogger(__name__) # @@@@
|
||||
print(log_dir)
|
||||
|
||||
def list_images(path: str):
|
||||
"""
|
||||
List images in a given path
|
||||
Args:
|
||||
path (str): path's location
|
||||
Returns:
|
||||
images (list): list of exact image paths
|
||||
"""
|
||||
images = []
|
||||
for r, _, f in os.walk(path, followlinks=True):
|
||||
for file in f:
|
||||
exact_path = os.path.join(r, file)
|
||||
|
||||
_, ext = os.path.splitext(exact_path)
|
||||
ext_lower = ext.lower()
|
||||
|
||||
if ext_lower not in {".jpg", ".jpeg", ".png"}:
|
||||
continue
|
||||
images.append(exact_path)
|
||||
|
||||
# with Image.open(exact_path) as img: # lazy
|
||||
# if img.format.lower() in ["jpeg", "png"]:
|
||||
# images.append(exact_path)
|
||||
return images
|
||||
|
||||
|
||||
def find_image_hash(file_path: str) -> str:
|
||||
"""
|
||||
Find the hash of given image file with its properties
|
||||
finding the hash of image content is costly operation
|
||||
Args:
|
||||
file_path (str): exact image path
|
||||
Returns:
|
||||
hash (str): digest with sha1 algorithm
|
||||
"""
|
||||
file_stats = os.stat(file_path)
|
||||
|
||||
# some properties
|
||||
file_size = file_stats.st_size
|
||||
creation_time = file_stats.st_ctime
|
||||
modification_time = file_stats.st_mtime
|
||||
|
||||
properties = f"{file_size}-{creation_time}-{modification_time}"
|
||||
|
||||
hasher = hashlib.sha1()
|
||||
hasher.update(properties.encode("utf-8"))
|
||||
return hasher.hexdigest()
|
||||
|
||||
# 支持base64 local-path url 等多种检索图片的方式,返回 numpy
|
||||
def load_img(img_path:str):
|
||||
image = None
|
||||
try:
|
||||
if img_path.startswith(("http","www")): # url
|
||||
response = requests.get(url=img_path, stream=True, timeout=60, proxies={"http": None, "https": None})
|
||||
response.raise_for_status()
|
||||
image_array = np.asarray(bytearray(response.raw.read()), dtype=np.uint8)
|
||||
image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
|
||||
elif img_path.startswith(("./","/","C:","D:","E:",".\\")) or os.path.isfile(img_path): # local-path
|
||||
if not os.path.isfile(img_path):
|
||||
raise FileError("102") # push: invalid file path
|
||||
elif not img_path.lower().endswith((".jpg",'.jpeg','.png')):
|
||||
raise FileError("103") # push: invaild file suffix
|
||||
else:
|
||||
image = cv2.imread(img_path)
|
||||
elif img_path.startswith("data:") and "base64" in img_path: # base64
|
||||
encoded_data_parts = img_path.split(",")
|
||||
if len(encoded_data_parts) <= 0:
|
||||
raise FileError("104") # push: base64 is empty
|
||||
print( "base64 is empty" )
|
||||
encoded_data = encoded_data_parts[-1]
|
||||
nparr = np.fromstring(base64.b64decode(encoded_data), np.uint8)
|
||||
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
||||
else:
|
||||
raise NotImpltError("201") # push: input file type is not support, only support: base64, url, local-path
|
||||
except Exception as e:
|
||||
logger.info(f"{e}")
|
||||
raise e
|
||||
# return e
|
||||
else:
|
||||
return image
|
||||
|
||||
|
||||
def encoder_img2base64(img:np.ndarray):
|
||||
success, encoded_img = cv2.imencode('.png', img)
|
||||
if success:
|
||||
img_base64 = base64.b64encode(encoded_img).decode("utf-8")
|
||||
|
||||
return ",".join(["data:image/jpg;base64", img_base64])
|
||||
|
||||
|
||||
# from seetaface.api import *
|
||||
class FaceHelper:
|
||||
|
||||
def __init__(self, db_dir, config_path = './config.yaml'):
|
||||
self.db_dir = os.path.abspath(db_dir)
|
||||
self.pid = PID_id
|
||||
self.db_embeddings = None
|
||||
self.db_identities = None
|
||||
|
||||
# 根据config_path 读取ymal配置文件,然后进行初始化
|
||||
with open(config_path, 'r') as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
self.sim_threshold = config['faceReg']['sim_threshold'] # 0.7
|
||||
self.rotclsifer = onnxruntime.InferenceSession( config['ck_paths']['rotifer'], so) # "./checkpoints/model_gray_mobilenetv2_rotcls.onnx"
|
||||
self.db_path = os.path.join( db_dir, "seetaface6.pkl" ).lower()
|
||||
|
||||
self.fd = FaceBoxesV2(config['ck_paths']['FcBx'], config['ck_paths']['num_threads'] ) # r"./checkpoints/faceboxesv2-640x640.onnx" 4
|
||||
self.ld5er = Landmark5er( onnx_path1 = config['ck_paths']['landmk1'], # "./checkpoints/face_landmarker_pts5_net1.onnx",
|
||||
onnx_path2 = config['ck_paths']['landmk2'], # "./checkpoints/face_landmarker_pts5_net2.onnx",
|
||||
num_threads=config['ck_paths']['num_threads'] # 4
|
||||
)
|
||||
self.fa = FaceAlign()
|
||||
self.fr = FaceRecoger(onnx_path = config['ck_paths']['FcReg'], num_threads= config['ck_paths']['num_threads'] ) # "./checkpoints/face_recognizer.onnx" 4
|
||||
self.qc = QualityChecker(config['brightness']['v0'], config['brightness']['v1'],
|
||||
config['brightness']['v2'], config['brightness']['v3'],
|
||||
hw = (config['resolution']['height'], config['resolution']['width'])
|
||||
) # v0=70.0, v1=100.0, v2=210.0, v3=230.0
|
||||
self.qpose = QualityOfPose(yaw_thrd=config['pose']['yaw_thrd'], pitch_thrd=config['pose']['pitch_thrd'],
|
||||
var_onnx_path = config['pose']['var_onnx_path'], # './checkpoints/fsanet-var.onnx',
|
||||
conv_onnx_path = config['pose']['conv_onnx_path'], # './checkpoints/fsanet-conv.onnx'
|
||||
)
|
||||
self.qclarity = QualityOfClarity(low_thresh=config['clarity']['low_thrd'], high_thresh=config['clarity']['high_thrd'])
|
||||
|
||||
# refresh the db
|
||||
try:
|
||||
self.updateDB(None, None, None, Onlyrefresh=True)
|
||||
except Exception as e:
|
||||
# raise e
|
||||
pass
|
||||
|
||||
print(f"db_dir: {self.db_dir}; PID: {self.pid}")
|
||||
logger.info(f"db_dir: {self.db_dir} ; PID: {self.pid}")
|
||||
|
||||
# 读操作
|
||||
def faceRecognition(self, img_path:str):
|
||||
rw_lock.acquire_read()
|
||||
if int(redis_client.get(f"worker_{self.pid}")) == 0: # 说明self中的db和磁盘中的db不同步
|
||||
with open(self.db_path, "rb") as f:
|
||||
representations = pickle.load(f)
|
||||
if representations == []:
|
||||
self.db_embeddings, self.db_identities = None, None
|
||||
else:
|
||||
self.db_embeddings = np.array([rep["embedding"] for rep in representations], dtype=np.float32)
|
||||
self.db_identities = [os.path.splitext(os.path.basename(rep["identity"]))[0] for rep in representations]
|
||||
redis_client.set(f"worker_{self.pid}", 1) # 同步完毕
|
||||
|
||||
try:
|
||||
if self.db_embeddings is None:
|
||||
raise FileError("101") # push: no face in the database
|
||||
|
||||
image = load_img(img_path) # get bgr numpy image
|
||||
|
||||
start = time.time()
|
||||
unknown_embeddings, cropped_images, names = [], [], []
|
||||
image = self.rotadjust(image) # 调整角度
|
||||
detect_result = self.fd.detect(image)
|
||||
detect_result = [(box.x1, box.y1, box.x2, box.y2) for box in detect_result]
|
||||
rett = {'code':" ", 'msg':" ", 'data':" "}
|
||||
if len(detect_result) == 0:
|
||||
logger.info(f"{img_path[:200]}: no face in the image")
|
||||
print(f"{img_path[:200]}: no face in the image")
|
||||
raise FaceError("301") # push: no face in the image
|
||||
elif len(detect_result) > 1:
|
||||
rett['code'] = '302'
|
||||
rett['data'] = {'code':" ", 'msg':" ", 'data':" "}
|
||||
for facebox in detect_result:
|
||||
landmarks5 = self.ld5er.inference(image, facebox) # return: [(),(),(),(),()] 左眼 右眼 鼻子 左嘴角 右嘴角
|
||||
# print("5点关键点:",landmarks5)
|
||||
# 输入image 和5点特征点位置(基于原图image的位置) , return all cropped aligned face (裁剪后的对齐后的人脸部分图像, 简写为aligned_faces)
|
||||
landmarks5 = [ [ld5[0],ld5[1]] for ld5 in landmarks5]
|
||||
cropped_face = self.fa.align(image, landmarks5=landmarks5)
|
||||
# 输入aligned_faces ,return all features of aligned_faces
|
||||
feature = self.fr.inference(cropped_face)
|
||||
cropped_images.append(cropped_face)
|
||||
unknown_embeddings.append(feature)
|
||||
|
||||
unknown_embeddings = np.vstack(unknown_embeddings)
|
||||
results = np.dot(unknown_embeddings, self.db_embeddings.T)
|
||||
|
||||
max_values = np.max(results,axis=1)
|
||||
max_idxs = np.argmax(results,axis=1)
|
||||
|
||||
for i, (idx, value) in enumerate(zip(max_idxs, max_values)):
|
||||
name = "unknown"
|
||||
if value > self.sim_threshold:
|
||||
name = self.db_identities[idx]
|
||||
names.append(name)
|
||||
|
||||
ret_data = []
|
||||
for i, (facebox, name) in enumerate(zip(detect_result, names)):
|
||||
if name != 'unknown':
|
||||
ret_data.append({'code':"300", 'msg': ErrorMsg["300"], 'data':name})
|
||||
else:
|
||||
code = self.check_face("None", image, facebox, prefix='facereg')
|
||||
ret_data.append({'code':code, 'msg': ErrorMsg[code], 'data':name})
|
||||
if len(ret_data) != 1:
|
||||
ret_data = {'code':"302", 'msg': ErrorMsg["302"], 'data': ret_data}
|
||||
else:
|
||||
ret_data = ret_data[0]
|
||||
|
||||
print("facereg runtime:", time.time() - start)
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"{e}")
|
||||
rw_lock.release_read()
|
||||
raise e
|
||||
else:
|
||||
rw_lock.release_read()
|
||||
return ret_data
|
||||
# return names, [ encoder_img2base64(det) for det in cropped_images]
|
||||
|
||||
# opt in ['add','delete','replace'] identity作为检索的标识符,img_path只是提供文件路径
|
||||
# 写操作
|
||||
def updateDB(self, img_path :str, opt :str, identity :str, Onlyrefresh=False):
|
||||
global rw_lock
|
||||
rw_lock.acquire_write() # 写锁定
|
||||
print("come in the updatedb")
|
||||
try:
|
||||
if not Onlyrefresh:
|
||||
if int(redis_client.get(f"worker_{self.pid}")) == 0: # 说明self中的db和磁盘中的db不同步
|
||||
with open(self.db_path, "rb") as f:
|
||||
representations = pickle.load(f)
|
||||
if representations == []:
|
||||
self.db_embeddings, self.db_identities = None, None
|
||||
else:
|
||||
self.db_embeddings = np.array([rep["embedding"] for rep in representations], dtype=np.float32)
|
||||
self.db_identities = [os.path.splitext(os.path.basename(rep["identity"]))[0] for rep in representations]
|
||||
redis_client.set(f"worker_{self.pid}", 1) # 同步完毕
|
||||
|
||||
img = load_img(img_path)
|
||||
img = self.rotadjust(img) # 调整角度
|
||||
if opt in ["add","replace"]:
|
||||
if opt == "add" and self.db_identities is not None and identity in self.db_identities:
|
||||
raise UpdatedbError("401") # push: identity has exist. to pretect the db, reject opt of this time
|
||||
else:
|
||||
|
||||
detect_result = self.fd.detect(img)
|
||||
if len(detect_result) == 0: # no face
|
||||
logger.info(f"{img_path[:200]}: when update, no face in the image")
|
||||
print(f"{img_path[:200]}: when update, no face in the image")
|
||||
raise FaceError("301") # push: no face in the image
|
||||
else: # 获取最大的face,然后进行check
|
||||
# H, W = img.shape[:2]
|
||||
areas = [ box.area() for box in detect_result]
|
||||
max_idx = areas.index(max(areas))
|
||||
facebox = detect_result[max_idx]
|
||||
facebox = (facebox.x1, facebox.y1, facebox.x2, facebox.y2) # top_left point, bottom_right point
|
||||
FaceError_number = self.check_face(img_path=img_path[:200], img=img, facebox=facebox, prefix='update')
|
||||
if FaceError_number != "300":
|
||||
raise FaceError(FaceError_number)
|
||||
|
||||
cv2.imwrite(os.path.join(self.db_dir, identity+'.jpg'),img,[cv2.IMWRITE_JPEG_QUALITY, 100]) # 如果file已经存在,则会替换它
|
||||
|
||||
elif opt == "delete":
|
||||
try:
|
||||
os.remove(os.path.join(self.db_dir, identity+'.jpg'))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
raise NotImpltError("202") # push: this updateDB type is not support
|
||||
|
||||
print("end the updateDB")
|
||||
logger.info(f"end the updateDB")
|
||||
|
||||
self.refresh_database(check = Onlyrefresh) # 结束时刷新下db, 并通知别的进程,dirty
|
||||
except Exception as e:
|
||||
logger.info(f"{e}")
|
||||
rw_lock.release_write()
|
||||
raise e
|
||||
else:
|
||||
rw_lock.release_write()
|
||||
return 0
|
||||
|
||||
def refresh_database(self, check = True):
|
||||
# ensure db exist
|
||||
os.makedirs(self.db_dir, exist_ok=True)
|
||||
if not os.path.exists(self.db_path):
|
||||
with open(self.db_path, "wb") as f:
|
||||
pickle.dump([], f)
|
||||
representations = [] # representations 最后要储存在db中
|
||||
# Load the representations from the pickle file
|
||||
with open(self.db_path, "rb") as f:
|
||||
representations = pickle.load(f)
|
||||
|
||||
# get identities of image
|
||||
pickle_images = [rep["identity"] for rep in representations]
|
||||
|
||||
# get the list of images on the dir
|
||||
storage_images = list_images(self.db_dir)
|
||||
|
||||
# transform all images in storage_images to `.jpg`
|
||||
for idx in range(len(storage_images)):
|
||||
img_path = storage_images[idx]
|
||||
base_path, ext = os.path.splitext(img_path)
|
||||
if ext == '.jpg':
|
||||
continue
|
||||
iimg = cv2.imread(img_path)
|
||||
cv2.imwrite(base_path+'.jpg', iimg, [cv2.IMWRITE_JPEG_QUALITY, 100])
|
||||
storage_images[idx] = base_path+'.jpg'
|
||||
|
||||
must_save_pickle = False
|
||||
new_images = []; old_images = []; replaced_images = []
|
||||
|
||||
new_images = list(set(storage_images) - set(pickle_images))
|
||||
old_images = list(set(pickle_images) - set(storage_images))
|
||||
|
||||
for current_representation in representations: # 找到被替换的images
|
||||
identity = current_representation["identity"]
|
||||
if identity in old_images:
|
||||
continue
|
||||
alpha_hash = current_representation["hash"]
|
||||
beta_hash = find_image_hash(identity)
|
||||
if alpha_hash != beta_hash:
|
||||
# logger.debug(f"Even though {identity} represented before, it's replaced later.")
|
||||
replaced_images.append(identity)
|
||||
|
||||
new_images = new_images + replaced_images
|
||||
old_images = old_images + replaced_images
|
||||
|
||||
# remove old images first
|
||||
if len(old_images) > 0:
|
||||
representations = [rep for rep in representations if rep["identity"] not in old_images]
|
||||
must_save_pickle = True
|
||||
|
||||
# find representations for new images
|
||||
if len(new_images) > 0:
|
||||
print("find new images")
|
||||
new_representations = []
|
||||
for new_image in new_images:
|
||||
image = cv2.imread(new_image)
|
||||
image = self.rotadjust(image) # 调整旋转角度
|
||||
detect_result = self.fd.detect(image)
|
||||
if len(detect_result) == 0:
|
||||
logger.info(f"{new_image}: when refresh, no face in the image, delete")
|
||||
print(f"{new_image}: when refresh, no face in the image, delete")
|
||||
else:
|
||||
if len(detect_result) > 1:
|
||||
logger.info(f"{new_image}: find too many face, get and extract the biggest face in them")
|
||||
else:
|
||||
logger.info(f"{new_image}: find one face, perfect!")
|
||||
|
||||
areas = [ box.area() for box in detect_result]
|
||||
max_idx = areas.index(max(areas))
|
||||
facebox = detect_result[max_idx]
|
||||
facebox = (facebox.x1, facebox.y1, facebox.x2, facebox.y2) # top_left point, bottom_right point
|
||||
|
||||
if check:
|
||||
FaceError_number = self.check_face(img_path=new_image[:200], img=image, facebox=facebox, prefix='refreshdb')
|
||||
if FaceError_number != "300":
|
||||
continue
|
||||
|
||||
landmarks5 = self.ld5er.inference(image, facebox) # return: [(),(),(),(),()] 左眼 右眼 鼻子 左嘴角 右嘴角
|
||||
landmarks5 = [ [ld5[0],ld5[1]] for ld5 in landmarks5]
|
||||
cropped_face = self.fa.align(image, landmarks5=landmarks5)
|
||||
feature = self.fr.inference(cropped_face)
|
||||
|
||||
new_representations.append({
|
||||
"identity": new_image,
|
||||
"hash": find_image_hash(new_image),
|
||||
"embedding": feature,
|
||||
"detected_face_base64": encoder_img2base64(cropped_face),
|
||||
})
|
||||
|
||||
representations += new_representations
|
||||
must_save_pickle = True
|
||||
|
||||
if must_save_pickle:
|
||||
print("must save the pickle")
|
||||
with open(self.db_path, "wb") as f:
|
||||
pickle.dump(representations, f)
|
||||
global redis_client, NUM_WORKERS
|
||||
for i in range(NUM_WORKERS):
|
||||
redis_client.set(f"worker_{i}", 0) # 通知别的进程db有更新
|
||||
|
||||
# 保证db_dir 中的图片和self.db["identity"] 一致
|
||||
remove_images = list(set(storage_images) - set([rep["identity"] for rep in representations]))
|
||||
for remove_img in remove_images:
|
||||
try:
|
||||
# os.remove(remove_img)
|
||||
fname = os.path.basename(remove_img)
|
||||
# os.rename( remove_img, os.path.join(self.db_dir, "..","images","remove",fname) )
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
if int(redis_client.get(f"worker_{self.pid}")) == 0:
|
||||
empty = False
|
||||
if len(representations) <= 0:
|
||||
self.db_embeddings = None
|
||||
empty = True
|
||||
# raise FileError("101") # push: no face in db
|
||||
else:
|
||||
self.db_embeddings = np.array([rep["embedding"] for rep in representations], dtype=np.float32)
|
||||
self.db_identities = [os.path.splitext(os.path.basename(rep["identity"]))[0] for rep in representations]
|
||||
redis_client.set(f"worker_{self.pid}", 1) # 当前进程已更新
|
||||
if empty:
|
||||
logger.info("no face in the database")
|
||||
raise FileError("101") # push: no face in db
|
||||
|
||||
def rotadjust(self, img: np.ndarray):
|
||||
image = img.copy()
|
||||
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 转换为灰度图像
|
||||
image = cv2.resize(image, (256, 256)) # resize (256,256)
|
||||
|
||||
# 中心裁剪到 224x224
|
||||
start = (256 - 224) // 2
|
||||
image = image[start:start+224, start:start+224]
|
||||
|
||||
# 将单通道灰度图像转换为三通道
|
||||
image = np.stack((image,)*3, axis=-1)
|
||||
|
||||
# 转换为符合 ONNX 需要的格式
|
||||
image = image.astype(np.float32) / 255.0 # 归一化
|
||||
image = image.transpose(2, 0, 1) # 将图像从 HWC 格式转换为 CHW 格式
|
||||
image = np.expand_dims(image, axis=0) # 增加一个批次维度
|
||||
|
||||
inputs = {self.rotclsifer.get_inputs()[0].name: image}
|
||||
probs = self.rotclsifer.run(None, inputs)
|
||||
|
||||
label = np.argmax(probs[0][0]) # 推理得到的逆时针旋转角度 [0,90,180,270]
|
||||
if label == 1:
|
||||
img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
|
||||
logger.info("img turn left, use `cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)` to repair")
|
||||
print("img turn left, use `cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)` to repair")
|
||||
elif label == 2:
|
||||
img = cv2.rotate(img, cv2.ROTATE_180)
|
||||
logger.info("img flip the image vertically, use `cv2.rotate(img, cv2.ROTATE_180)` to repair")
|
||||
print("img flip the image vertically, use `cv2.rotate(img, cv2.ROTATE_180)` to repair")
|
||||
elif label == 3:
|
||||
img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
||||
logger.info("img turn right, use `cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)` to repair")
|
||||
print("img turn right, use `cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)` to repair")
|
||||
|
||||
return img
|
||||
|
||||
def get_feature(self, img: np.ndarray):
|
||||
time.sleep(0.08)
|
||||
# assert img.shape[0] == img.shape[1] and img.shape[0] == 256 and img.shape[2] == 3
|
||||
img = cv2.resize( img, (256,256) )
|
||||
input_feed = {}
|
||||
# crop_img = cv2.resize(img,(248,248))
|
||||
# crop_img = crop_img[...,::-1]
|
||||
crop_img = img[4:252, 4:252, :][...,::-1] # 注意要考虑 长或宽 < 248的情况
|
||||
input_data = crop_img.transpose((2, 0, 1))
|
||||
# resize_img = cv2.resize(img, (248, 248))
|
||||
# input_data = resize_img.transpose((2, 0, 1))
|
||||
input_feed['_input_123'] = input_data.reshape((1, 3, 248, 248)).astype(np.float32)
|
||||
pred_result = self.FR.run([], input_feed=input_feed)
|
||||
# print(pred_result[0].shape)
|
||||
# post process
|
||||
# 1 sqrt feature
|
||||
temp_result = np.sqrt(pred_result[0])
|
||||
# 2 normalization feature
|
||||
norm = temp_result / np.linalg.norm(temp_result, axis=1)
|
||||
return norm.flatten()
|
||||
|
||||
def check_face(self, img_path, img, facebox, prefix="update"):
|
||||
H, W = img.shape[:2]
|
||||
if facebox[0] < 0 or facebox[2] >= W:
|
||||
logger.info(f"{img_path}: when {prefix}, face shifted left/right")
|
||||
print(f"{img_path}: when {prefix}, face shifted left/right")
|
||||
return "3041" # face shifted left/right, partially not captured.
|
||||
if facebox[1] < 0 or facebox[3] >= H:
|
||||
logger.info(f"{img_path}: when {prefix}, face shifted top/bottom")
|
||||
print(f"{img_path}: when {prefix}, face shifted top/bottom")
|
||||
return "3042" # face shifted top/bottom, partially not captured.
|
||||
face_img = img[ max(0,int(facebox[1])):int(facebox[3]), max(0,int(facebox[0])):int(facebox[2]) ]
|
||||
if not self.qc.check_bright(face_img):
|
||||
logger.info(f"{img_path}: when {prefix}, bad bright face in the image")
|
||||
print(f"{img_path}: when {prefix}, bad bright face in the image")
|
||||
return "303" # bad bright face in the image
|
||||
if not self.qc.check_resolution(face_img):
|
||||
logger.info(f"{img_path}: when {prefix}, too small resolution of face in the image")
|
||||
print(f"{img_path}: when {prefix}, too small resolution of face in the image")
|
||||
return "306" # small face in the image
|
||||
pose = self.qpose.check(face_img)
|
||||
if pose != "frontFace":
|
||||
logger.info(f"{img_path}: when {prefix}, {pose} in the image")
|
||||
print(f"{img_path}: when {prefix}, {pose} in the image")
|
||||
dictt = {"rightFace": "3051", "leftFace": "3052", "upFace": "3053", "downFace": "3054"}
|
||||
return dictt[pose] # pose of face in the image
|
||||
if not self.qclarity.check(face_img):
|
||||
logger.info(f"{img_path}: when {prefix}, bad clarity of face in the image")
|
||||
print(f"{img_path}: when {prefix}, bad clarity of face in the image")
|
||||
return "307" # bad clarity of face in the image
|
||||
|
||||
return "300"
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
# introduce
|
||||
这是人脸识别的web端代码,其依赖于onnx,fastapi,uvicorn,redis. 项目能在windows,linux 和macos(未测试) 上运行. 支持并发请求.
|
||||
项目细节详见docx文档.
|
||||
|
||||
# requirements
|
||||
fastapi 0.103.2
|
||||
uvicorn[standard] 0.22.0
|
||||
opencv-python 4.9.0.80
|
||||
requests 2.31.0
|
||||
Pillow 9.4.0
|
||||
redis 5.0.6
|
||||
onnxruntime 1.18.1
|
||||
推荐使用 `pip install` 安装上述库,可以不遵循其版本号(这里写出具体版本是为了复现原运行环境).
|
||||
|
||||
# file struct
|
||||
|
||||
./
|
||||
checkpoints 储存模型的目录
|
||||
models 方法函数目录
|
||||
webmain.py 入口函数
|
||||
process.py 入口函数和方法函数之间的桥接函数
|
||||
flushredis.py 处理redis数据库的函数(进而能使用redis)
|
||||
redisLock.py 基于redis的读写锁
|
||||
redis.yaml redis的配置文件
|
||||
config.yaml 项目的配置文件
|
||||
setup.sh 启动项目的脚本(linux版本)
|
||||
readme.md 本文件
|
||||
|
||||
# setup
|
||||
`bash setup.sh [ip] [port] [进程数] [并发工作线程数]`
|
||||
e.g.: `bash setup.sh 0.0.0.0 8017 1 60`
|
||||
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
REDIS_HOST=192.168.0.14
|
||||
REDIS_PORT=2012
|
||||
REDIS_PASSWORD=Xjsfzb@Redis123!
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
import time
|
||||
|
||||
class RedisReadWriteLock:
|
||||
def __init__(self, redis_client, lock_timeout=3600, max_readers=20):
|
||||
self.redis_client = redis_client
|
||||
self.read_lock_key = "read_lock"
|
||||
self.write_lock_key = "write_lock"
|
||||
self.lock_timeout = lock_timeout
|
||||
self.max_readers = max_readers
|
||||
|
||||
def acquire_read(self):
|
||||
while True:
|
||||
if not self.redis_client.get(self.write_lock_key) and \
|
||||
int(self.redis_client.get(self.read_lock_key) or 0) < self.max_readers:
|
||||
self.redis_client.incr(self.read_lock_key)
|
||||
return
|
||||
time.sleep(0.01)
|
||||
|
||||
def release_read(self):
|
||||
self.redis_client.decr(self.read_lock_key)
|
||||
|
||||
def acquire_write(self):
|
||||
while not self.redis_client.setnx(self.write_lock_key, 1):
|
||||
time.sleep(0.01)
|
||||
self.redis_client.expire(self.write_lock_key, self.lock_timeout)
|
||||
|
||||
while int(self.redis_client.get(self.read_lock_key) or 0) > 0:
|
||||
time.sleep(0.01)
|
||||
|
||||
def release_write(self):
|
||||
self.redis_client.delete(self.write_lock_key)
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
# 从 redis.yaml 文件中读取 Redis 相关配置信息
|
||||
REDIS_CONFIG=$(cat redis.yaml)
|
||||
|
||||
# 工作进程数 workers
|
||||
NUM_WORKERS=${3}
|
||||
|
||||
REDIS_HOST=$(echo $REDIS_CONFIG | grep -oP 'REDIS_HOST=\K[^ ]+')
|
||||
REDIS_PORT=$(echo $REDIS_CONFIG | grep -oP 'REDIS_PORT=\K[^ ]+')
|
||||
REDIS_PASSWORD=$(echo $REDIS_CONFIG | grep -oP 'REDIS_PASSWORD=\K[^ ]+')
|
||||
|
||||
echo "num_workers: $NUM_WORKERS"
|
||||
echo "redis_host: $REDIS_HOST; redis_port: $REDIS_PORT"
|
||||
|
||||
REDIS_HOST=$REDIS_HOST REDIS_PORT=$REDIS_PORT REDIS_PASSWORD=$REDIS_PASSWORD NUM_WORKERS=$NUM_WORKERS python flushredis.py
|
||||
|
||||
REDIS_HOST=$REDIS_HOST REDIS_PORT=$REDIS_PORT REDIS_PASSWORD=$REDIS_PASSWORD NUM_WORKERS=$NUM_WORKERS MAX_READERS=${4} uvicorn webmain:app --host ${1} --port ${2} --workers $NUM_WORKERS
|
||||
|
|
@ -0,0 +1,67 @@
|
|||
from fastapi import FastAPI
|
||||
from pydantic import BaseModel
|
||||
from process import FaceHelper, FileError, ErrorMsg
|
||||
import threading
|
||||
import os
|
||||
import time
|
||||
app = FastAPI()
|
||||
lock = threading.Lock()
|
||||
|
||||
|
||||
facehelper = FaceHelper(
|
||||
db_dir="./dbface",
|
||||
)
|
||||
|
||||
class face(BaseModel):
|
||||
img:str
|
||||
|
||||
class dbface(BaseModel):
|
||||
img:str
|
||||
optMode:str
|
||||
imgName:str
|
||||
|
||||
@app.post("/refreshdb")
|
||||
def refresh():
|
||||
global facehelper
|
||||
try:
|
||||
with lock:
|
||||
facehelper.updateDB(None, None, None, Onlyrefresh=True)
|
||||
except FileError as e:
|
||||
# return {"status":e.code, "detail":f"{e}"}
|
||||
return {'code': e.code, 'msg': f"{e}", 'data': 'None'}
|
||||
else:
|
||||
return {'code': "300", 'msg': ErrorMsg['300'], 'data': 'None'}
|
||||
|
||||
|
||||
@app.post("/facerecognition/")
|
||||
def faceRecognition(input:face):
|
||||
start = time.time()
|
||||
global facehelper
|
||||
try:
|
||||
ret_data = facehelper.faceRecognition(input.img)
|
||||
print("finished recognition...")
|
||||
end = time.time()
|
||||
print("runtime: ", end-start)
|
||||
except Exception as e:
|
||||
return {'code': e.code, 'msg': f"{e}", 'data': 'None'}
|
||||
# return {"status":f"{e.code}", "detail":f"{e}"}
|
||||
else:
|
||||
return ret_data
|
||||
return {"status":1, "name":identity, "resImg":res_img_base64}
|
||||
|
||||
|
||||
@app.post("/updatedb/")
|
||||
def updateDB(input:dbface):
|
||||
global facehelper
|
||||
# 在这儿加一句,只取imgName中第一个`.`前面的部分str
|
||||
input.imgName = os.path.splitext(os.path.basename(input.imgName))[0]
|
||||
|
||||
try:
|
||||
with lock:
|
||||
facehelper.updateDB(input.img, input.optMode, input.imgName)
|
||||
except Exception as e:
|
||||
return {'code': e.code, 'msg': f"{e}", 'data': 'None'}
|
||||
# return {"status":f"{e.code}", "detail":f"{e}"}
|
||||
else:
|
||||
return {'code': "300", 'msg': ErrorMsg['300'], 'data': 'None'}
|
||||
|
||||
Loading…
Reference in New Issue