wjs/yolo_safehat_det_flask_stream/app.py

195 lines
6.7 KiB
Python
Raw Permalink Normal View History

2024-05-15 14:46:56 +08:00
from importlib import import_module
import os
from typing import Union
import cv2
from flask import Flask, render_template, Response, request, redirect, url_for, jsonify, stream_with_context
# import requests
from flask_cors import CORS
from flask_socketio import SocketIO, emit
# import json
# import time
# import ijson
from req import ApiRequest
api = ApiRequest()
if os.environ.get('CAMERA'):
Camera = import_module('camera_' + os.environ['CAMERA']).Camera
else:
from camera import Camera
app = Flask(__name__)
CORS(app) # 允许跨域请求
socketio = SocketIO(app)
NAME = ""
FILE_FLAG = False
CAMERA_FLAG = False
@app.route('/')
def index():
"""Video streaming home page."""
return render_template('index.html')
def video_gen(camera):
"""Video streaming generator function."""
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
def camera_gen():
vid = cv2.VideoCapture(1)
while True:
return_value, frame = vid.read()
image = cv2.imencode('.jpg', frame)[1].tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n')
@app.route('/video_start')
def video_feed():
"""Video streaming route. Put this in the src attribute of an img tag."""
if FILE_FLAG:
return Response(video_gen(Camera(NAME, True if NAME.endswith(('.jpg', '.png', '.jpeg', '.bmp')) else False)),
mimetype='multipart/x-mixed-replace; boundary=frame')
elif CAMERA_FLAG:
return Response(video_gen(Camera("0", False)), # 选择你的摄像头ID
mimetype='multipart/x-mixed-replace; boundary=frame')
else:
return Response(mimetype='multipart/x-mixed-replace; boundary=frame')
# pass
@app.route('/video', methods=['POST'])
def upload():
f = request.files['file']
basepath = os.path.dirname(__file__) # 当前文件所在路径
upload_path = './static/uploads'
if not os.path.exists(upload_path):
os.mkdir(upload_path)
upload_file_path = os.path.join(basepath, upload_path, (f.filename)) # 注意:没有的文件夹一定要先创建,不然会提示没有该路径
f.save(upload_file_path)
global NAME, FILE_FLAG, CAMERA_FLAG
NAME = upload_file_path
FILE_FLAG = True
CAMERA_FLAG = False
print('----------------', Camera.actions)
return redirect(url_for('index'))
# WebSocket
@socketio.on('connect')
def handle_connect():
print('Client connected')
def check_error_msg(data: Union[str, dict, list], key: str = "errorMsg") -> str:
'''
return error message if error occured when requests API
'''
if isinstance(data, dict):
if key in data:
return data[key]
if "code" in data and data["code"] != 200:
return data["msg"]
return ""
@socketio.on('start_stream')
def handle_start_stream():
# url = 'http://127.0.0.1:7861/chat/knowledge_base_chat'
# url = 'http://192.168.0.21:17861/chat/knowledge_base_chat'
data2 = "作业现场人员未佩戴安全帽是几级违规行为?具体违反了哪些条款?条款内容是什么?"
data4 = "高处作业未佩戴安全带是几级违规行为?具体违反了哪些条款?条款内容是什么?"
data5 = "作业现场人员跨越围栏是几级违规行为?具体违反了哪些条款?条款内容是什么?"
# Camera.actions = []
actions = set(Camera.actions)
# print('----------', actions)
if not actions:
emit('stream_data', {'data': '无违规行为。'})
if 2 in actions:
for d in api.knowledge_base_chat(data2,
knowledge_base_name="violation",
top_k=1,
score_threshold=2.0,
history=[],
model="chatglm3-6b",
prompt_name="default",
temperature=0.01):
error_msg = check_error_msg(d)
if len(error_msg) > 0:
print(error_msg)
else:
chunk = d.get("answer")
if not chunk is None:
# print('----1', chunk)
emit('stream_data', {'data': chunk})
emit('stream_end', {'message': 'Stream ended'}) # 发送流结束的事件
if 4 in actions:
for d in api.knowledge_base_chat(data4,
knowledge_base_name="violation",
top_k=1,
score_threshold=2.0,
history=[],
model="chatglm3-6b",
prompt_name="default",
temperature=0.01):
error_msg = check_error_msg(d)
if len(error_msg) > 0:
print(error_msg)
else:
chunk = d.get("answer")
if not chunk is None:
# print('----4', chunk)
emit('stream_data', {'data': chunk})
emit('stream_end', {'message': 'Stream ended'}) # 发送流结束的事件
if 5 in actions:
for d in api.knowledge_base_chat(data5,
knowledge_base_name="violation",
top_k=1,
score_threshold=2.0,
history=[],
model="chatglm3-6b",
prompt_name="default",
temperature=0.01):
error_msg = check_error_msg(d)
if len(error_msg) > 0:
print(error_msg)
else:
chunk = d.get("answer")
if not chunk is None:
# print('----5', chunk)
emit('stream_data', {'data': chunk})
emit('stream_end', {'message': 'Stream ended'}) # 发送流结束的事件
# Camera.actions = []
# @app.route('/streams')
# def list_streams():
# actions = set(Camera.actions)
# return jsonify(list(actions))
@app.route('/camera', methods=['POST'])
def camera_get():
global CAMERA_FLAG, FILE_FLAG
CAMERA_FLAG = True
FILE_FLAG = False
# return redirect(url_for('index'))
return redirect('/')
if __name__ == '__main__':
# app.run(host='127.0.0.1', threaded=True, port=5001)
socketio.run(app, debug=True, allow_unsafe_werkzeug=True)