import logging import time from flask import Flask, jsonify, request from pydantic import BaseModel, Field from werkzeug.exceptions import HTTPException from typing import List from pydantic import ValidationError from logger_util import setup_logger from intentRecognition import IntentRecognition from slotRecognition import SlotRecognition from utils import CheckResult, check_standard_name_slot_probability, check_lost, process_msg_content from config import * from globalData import GlobalData from apscheduler.schedulers.background import BackgroundScheduler MODEL_ERNIE_PATH = R"../ernie/output/checkpoint-14672" MODEL_UIE_PATH = R"../uie/output_temp/checkpoint-20860" # 类别名称列表 labels = [ "天气查询", "互联网查询", "页面切换", "日计划数量查询", "周计划数量查询", "日计划作业内容", "周计划作业内容", "施工人数", "作业考勤人数", "知识问答", "通用对话", "作业面查询", "班组人数查询", "班组数查询", "作业面内容", "班组详情", "工程进度查询", "人员查询", "分公司查询","工程数量查询","工程详情查询","项目部数量查询", "建管单位数量查询","建管单位详情","分包单位数量查询","分包单位详情" ] # 标签映射 label_map = { 0: 'O', # 非实体 1: 'B-date', 20: 'I-date', 2: 'B-projectName', 21: 'I-projectName', 3: 'B-projectType', 22: 'I-projectType', 4: 'B-constructionUnit', 23: 'I-constructionUnit', 5: 'B-implementationOrganization', 24: 'I-implementationOrganization', 6: 'B-projectDepartment', 25: 'I-projectDepartment', 7: 'B-projectManager', 26: 'I-projectManager', 8: 'B-subcontractor', 27: 'I-subcontractor', 9: 'B-teamLeader', 28: 'I-teamLeader', 10: 'B-riskLevel', 29: 'I-riskLevel', 11: 'B-page', 30: 'I-page', 12: 'B-operating', 31: 'I-operating', 13: 'B-teamName', 32: 'I-teamName', 14: 'B-constructionArea', 33: 'I-constructionArea', 15: 'B-personName', 34: 'I-personName', 16: 'B-personQueryType', 35: 'I-personQueryType', 17: 'B-projectStatus', 36: 'I-projectStatus', 18: 'B-picName', 37: 'I-picName', 19: 'B-designSpecificationName', 38: 'I-designSpecificationName' } logger = setup_logger("main", level=logging.DEBUG) # 初始化工具类 intent_recognizer = IntentRecognition(MODEL_ERNIE_PATH, labels) # 初始化槽位识别工具类 slot_recognizer = SlotRecognition(MODEL_UIE_PATH, label_map) # 设置Flask应用 app = Flask(__name__) def job(): logger.info(f"✅ [Info] Executing update_from_redis...at {time.strftime('%Y-%m-%d %H:%M:%S')}") GlobalData.update_from_redis() job() # 创建后台调度器 scheduler = BackgroundScheduler() scheduler.add_job(job, 'cron', hour=3, minute=0) # 每天凌晨3点执行 scheduler.start() # 统一的异常处理函数 @app.errorhandler(Exception) def handle_exception(e): """统一异常处理""" if isinstance(e, HTTPException): return jsonify({ "error": { "type": e.name, "message": e.description, "status_code": e.code } }), e.code return jsonify({ "error": { "type": "InternalServerError", "message": str(e) } }), 500 def validate_user(data): """验证用户ID""" if data.get("user_id") != '3bb66776-1722-4c36-b14a-73dd210fe750': return jsonify( code=401, msg='权限验证失败,请联系接口开发人员', label=-1, probability=-1 ), 401 return None class LabelMessage(BaseModel): text: str = Field(..., description="消息内容") user_id: str = Field(..., description="消息内容") # 每条消息的结构 class Message(BaseModel): role: str = Field(..., description="消息内容") content: str = Field(..., description="消息内容") # 请求数据的结构 class RequestData(BaseModel): messages: List[Message] = Field(..., description="消息列表") user_id: str = Field(..., description="用户ID") # 意图识别 @app.route('/intent_reco', methods=['POST']) def intent_reco(): """意图识别""" try: # 获取请求中的 JSON 数据 data = request.get_json() request_data = LabelMessage(**data) # Pydantic 会验证数据结构 text = request_data.text user_id = request_data.user_id # 检查必需字段 if not text: return jsonify({"error": "text is required"}), 400 if not user_id: return jsonify({"error": "user_id is required"}), 400 # 验证用户ID user_validation_error = validate_user(data) if user_validation_error: return user_validation_error # 调用predict方法进行意图识别 predicted_label, predicted_probability, predicted_id = intent_recognizer.predict(text) return jsonify( code=200, msg="成功", int=predicted_id, label=predicted_label, probability=float(predicted_probability) ) except Exception as e: logger.error(f"error:{e}") return jsonify({"error": str(e)}), 500 # 槽位抽取 @app.route('/slot_reco', methods=['POST']) def slot_reco(): """槽位识别""" try: # 获取请求中的 JSON 数据 data = request.get_json() request_data = LabelMessage(**data) # Pydantic 会验证数据结构 text = request_data.text user_id = request_data.user_id # 检查必需字段 if not text: return jsonify({"error": "text is required"}), 400 if not user_id: return jsonify({"error": "user_id is required"}), 400 # 验证用户ID user_validation_error = validate_user(data) if user_validation_error: return user_validation_error # 调用 recognize 方法进行槽位识别 # entities = slot_recognizer.recognize(text) entities, slot_probability = slot_recognizer.recognize_probability(text) logger.info(f"槽位抽取后的实体:{entities},实体后的可能值:{slot_probability}") return jsonify( code=200, msg="成功", slot=entities) except Exception as e: logger.error(f"error:{e}") return jsonify({"error": str(e)}), 500 @app.route('/agent', methods=['POST']) def agent(): try: data = request.get_json() except Exception as e: logger.error(f"body不是一个有效的json") return jsonify({"error": str(e)}), 500 # 捕捉其他错误并返回 try: # 使用 Pydantic 来验证数据结构 request_data = RequestData(**data) # Pydantic 会验证数据结构 messages = request_data.messages user_id = request_data.user_id # 检查必需字段是否存在 if not messages: return jsonify({"error": "messages is required"}), 400 if not user_id: return jsonify({"error": "user_id is required"}), 400 # 验证用户ID(假设这个函数已经定义) user_validation_error = validate_user(data) if user_validation_error: return user_validation_error if len(messages) == 1: # 首轮 query = messages[0].content # 使用 Message 对象的 .content 属性 # 先进行意图识别 predicted_label, predicted_probability, predicted_id = intent_recognizer.predict(query) # 再进行槽位抽取 entities, slot_probability = slot_recognizer.recognize_probability(query) logger.info( f"第一轮意图识别后的label:{predicted_label}, id:{predicted_id},槽位抽取后的实体:{entities},,slot_probability:{slot_probability},message:{messages}", ) # 多轮 else: res = extract_multi_chat(messages) predicted_label, predicted_probability, predicted_id = intent_recognizer.predict(res) #0:天气,1:互联网查询,9:知识问答,10:通用对话 if predicted_id in [0, 1, 9, 10]: logger.info(f"多轮意图识别后的label:{predicted_label}, id:{predicted_id},message:{messages}") return jsonify({ "code": 200, "msg": "成功", "answer": {"int": predicted_id, "label": predicted_label, "probability": predicted_probability}, "finalQuery": res }) # entities = slot_recognizer.recognize(res) entities, slot_probability = slot_recognizer.recognize_probability(res) logger.info( f"多轮意图识别后的label:{predicted_label}, id:{predicted_id},槽位抽取后的实体:{entities},slot_probability:{slot_probability},message:{messages}") #必须槽位缺失检查 status, sk = check_lost(predicted_id, entities) if status == CheckResult.NEEDS_MORE_ROUNDS: return jsonify({"code": 10001, "msg": "成功", "answer": {"miss": sk}, }) #工程名、分公司名和项目名标准化 result, information = check_standard_name_slot_probability(predicted_id, entities) if result == CheckResult.NEEDS_MORE_ROUNDS: return jsonify({ "code": 10001, "msg": "成功", "answer": {"miss": information}, }) return jsonify({ "code": 200, "msg": "成功", "answer": {"int": predicted_id, "label": predicted_label, "probability": predicted_probability, "slot": entities}, }) except ValidationError as e: return jsonify({"error": e.errors()}), 400 # 捕捉 Pydantic 错误并返回 except Exception as e: return jsonify({"error": str(e)}), 500 # 捕捉其他错误并返回 def format_chat_history(history_messages): reset_keywords = ["当前", "今天", "昨天", "本周", "下周", "明天", "今日", "打开","工程进度"] keep_index = 0 # Step 1: 查找最靠近当前的 user 消息,若其包含关键词,则记录其索引 for i in reversed(range(len(history_messages))): msg = history_messages[i] if msg.role == "user" and any(kw in msg.content and len(msg.content) > 5 for kw in reset_keywords): keep_index = i break # Step 2: 截取需要保留的历史消息 filtered_messages = history_messages[keep_index:] # Step 3: 构建格式化历史 formatted_history = "" for i, msg in enumerate(filtered_messages): formatted_history += f"\n\n" formatted_history += f"{msg.role}\n" formatted_history += f"{process_msg_content(msg.content)}\n" formatted_history += "\n" return formatted_history def extract_multi_chat(messages): from openai import OpenAI client = OpenAI(base_url=api_base_url, api_key=api_key) history_messages = messages[-7:] if len(messages) >= 7 else messages chat_history = format_chat_history(history_messages) logger.info(f"chat_history:{chat_history}") prompt = f''' 你是一个多轮对话理解和还原专家,擅长从复杂的上下文中提取关键信息,理清语义逻辑,最终还原出用户的真实意图。请你以"自然语言编程"的方式逐步思考并处理以下任务,最终生成完整明确的用户查询。 请严格按照以下步骤执行: ## 第一步:初始化变量 初始化以下变量(用于逻辑推理,请勿输出): - 当前_用户问题 = "" # 表示最终需要还原的完整问题 - 当前_实体 = "" # 表示用户所提及的公司、项目部、工程名称等具体业务实体 - 当前_时间 = "" # 表示与查询相关的时间点或时间段 - 下一步_操作 = "" # 表示当前对话中模型需要用户补充的信息类型 - 下一步_选择列表 = "" # 表示需要从中选择具体内容的候选项列表 ## 第二步:逐轮解析对话历史 从最早的对话轮次开始,依次处理每一轮对话。 ### 轮次 X(X从1开始递增) ### 如果当前角色为user: 请根据之前 assistant 的引导语,结合当前用户输入,判断是否需要进行补全操作,并按以下规则处理: - 如果 `下一步_操作 == "补充时间"`:调用函数 `补全时间(当前_用户问题, 用户输入)` 并更新 `当前_用户问题` - 否则 如果 `下一步_操作 == "补充分公司"`:调用函数 `补全分公司(当前_用户问题, 用户输入)` 并更新 `当前_用户问题` - 否则 如果 `下一步_操作 == "选择列表"`:调用函数 `替换序号为实体(当前_用户问题, 用户输入, 下一步_选择列表)` 并更新 `当前_用户问题` - 否则 如果 有完整的句意(用户输入):将当前用户输入作为 `当前_用户问题` - 否则 如果 包含模糊表达(用户输入):调用函数 `补全模糊表达(当前_用户问题, 用户输入)`并更新 `当前_用户问题` - 否则 如果 是查询新属性(用户输入):调用函数 `替换新属性(当前_用户问题, 用户输入)`并更新 `当前_用户问题` - 否则 将当前用户输入作为 `当前_用户问题` 处理完成后请清空 `下一步_操作` 和`下一步_选择列表` ### 如果当前角色为 assistant: 请根据 assistant 的输出内容,判断接下来用户是否被引导进行补全,并更新 `下一步_操作`: - 如果 assistant 的回复中包含"请问你想查询什么时间的"或类似引导时间的内容:设定 `下一步_操作 = "补充时间"` - 否则 如果包含"请补充该项目部所属的分公司名称":设定 `下一步_操作 = "补充分公司"` - 否则 如果包含"请确认您要选择哪一个":设定 `下一步_操作 = "选择列表"`,并将当前 assistant 回复中列出的选项存入 `下一步_选择列表` - 否则 什么都不做 ### 重复以上解析过程 请逐条处理每轮对话,直到所有历史对话处理完毕,然后进入第三步。 ## 第三步:输出还原后的最终用户问题 在完成全部历史消息处理后,请输出变量 `当前_用户问题`,它即为根据上下文补全后的完整查询。 --- 辅助函数说明: 函数 补全时间(文本, 时间词): 支持识别如"2025-5-15"、"昨天""等模糊时间 从时间词里提取时间 在文本开头添加提取到的时间并返回,保持其他内容不变 如果没有提取到时间则直接返回原文本内容 示例:补全时间(""第一项目部有多少作业计划", "今天的") 返回 "今天第一项目部有多少作业计划" 函数 补全分公司(文本, 分公司词): 从分公司词里提取分公司 在文本里的时间词之后添加提取到的分公司并返回,保持其他内容不变 如果没有提取到分公司信息则直接返回原文本内容 示例:补全分公司("今天第一项目有多少作业计划", "送二分公司") 返回 "今天送二分公司第一项目有多少作业计划" 函数 替换序号为实体(文本, 选择项, 选择列表): 从选择项如"第X个"中提取X的值并处理中文数字和阿拉伯数字作为序号 根据序号识别出"第1个:XXX,第2个:YYY"格式的选项列表提取出完整的实体名称 保留文本中的动作词和目标对象,将文本中的实体引用替换为完整实体名称 示例:替换序号为实体("中心变工程进度", "第1个", "第1个:燃气工程,第2个:给水工程") 返回 "燃气工程进度" 函数 包含模糊表达(文本): 检查是否包含"具体是哪些项"、"是哪两个"等模糊表达 返回布尔值表示是否包含 示例:包含模糊表达("具体是哪些项") 返回 True 函数 补全模糊表达(文本,模糊表达): 结合文本内容和模糊表达,补全语义并返回 示例:补全模糊表达("今天送一分公司有多少作业计划", "具体是哪些") 返回 "今天送一分公司具体有哪些作业计划" 函数 是查询新属性(文本, 新问题): 如果新问题中提取不到主体 且仅能提取到查询属性 且这个查询属性和文本中提取到的查询属性不同 则返回TRUE 其他情况均返回FALSE 示例:是查询新属性("今天送一分公司有多少作业计划", "作业内容") 返回 True 函数 替换新属性(文本,新查询属性): 先删除文本中的"有多少"等类似的表达数量表达, 再将文本里的查询属性替换为新查询属性,并保持其他内容不变并返回 且保持新查询属性的语气 示例:替换新属性("今天送一分公司有多少作业计划", "作业内容") 返回 "今天送一分公司的作业内容" 函数 有完整的句意(新问题): 如果新问题里有主体同时有操作对象或查询对象则返回TRUE 其他情况均返回FALSE 对话历史如下: {chat_history} 请你仅输出还原后的完整问题,不要输出任何变量、中间步骤或解释说明,确保结果自然通顺,语义完整。 ''' message = [ {"role": "user", "content": prompt} ] response = client.chat.completions.create( messages=message, model=model_name, max_tokens=100, temperature=0.1, # 降低随机性,提高确定性 stream=False ) res = response.choices[0].message.content.strip() logger.info(f"多轮意图后用户想要的问题是:{res}") return res if __name__ == '__main__': # 启动时立即执行一次 app.run(host='0.0.0.0', port=18073, debug=False)