659 lines
31 KiB
Python
659 lines
31 KiB
Python
import logging
|
||
import os
|
||
from enum import Enum
|
||
from typing import cast, Callable
|
||
|
||
import cn2an
|
||
from rapidfuzz import process, fuzz
|
||
from rapidfuzz.fuzz import WRatio
|
||
import json
|
||
from pypinyin import lazy_pinyin, Style
|
||
import re
|
||
|
||
from globalData import GlobalData
|
||
from constants import USELESS_COMPANY_WORDS, USELESS_PROJECT_WORDS, CONSTRUCTION_UNIT, IMPLEMENTATION_ORG, \
|
||
SUBCONTRACTOR, PROJECT_NAME, PROJECT_DEPARTMENT, RISK_LEVEL, TEAM_NAME, USELESS_PROGRAM_DEPARTMENT_WORDS
|
||
|
||
from logger_util import setup_logger
|
||
|
||
logger = setup_logger("utils", level=logging.DEBUG)
|
||
|
||
# 数字转换表(1-20,常见数字)
|
||
digit_to_chinese = {
|
||
"1": "一", "2": "二", "3": "三", "4": "四", "5": "五",
|
||
"6": "六", "7": "七", "8": "八", "9": "九", "10": "十",
|
||
"11": "十一", "12": "十二", "13": "十三", "14": "十四",
|
||
"15": "十五", "16": "十六", "17": "十七", "18": "十八",
|
||
"19": "十九", "20": "二十"
|
||
}
|
||
|
||
|
||
def arabic_to_chinese_number(text):
|
||
"""
|
||
将文中阿拉伯数字转换为中文数字
|
||
:param text: 输入文本
|
||
:return: 转换后的文本
|
||
"""
|
||
cn_to_arabic = {'一': '1', '二': '2', '三': '3', '四': '4', '五': '5',
|
||
'六': '6', '七': '7', '八': '8', '九': '9', '零': '0'}
|
||
arabic_to_cn = {v: k for k, v in cn_to_arabic.items()} # 反向映射
|
||
|
||
for num, cn in arabic_to_cn.items():
|
||
text = text.replace(num, cn)
|
||
|
||
return text
|
||
|
||
|
||
def text_to_pinyin(text):
|
||
"""将文本转换为拼音字符串"""
|
||
return ''.join(lazy_pinyin(text, Style.TONE2))
|
||
|
||
|
||
#从文本文件路径加载json字符串
|
||
def load_standard_json_data(path):
|
||
if not os.path.exists(path):
|
||
# print(f"[Error] Local file not found: {path}")
|
||
logger.error("[Error] Local file not found:%s", path)
|
||
return {}
|
||
try:
|
||
with open(path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
logger.error("Failed to load local JSON file", exc_info=e)
|
||
# print(f"[Error] Failed to load local JSON file: {e}")
|
||
return {}
|
||
|
||
#将字典序列的josn 存入本地文件
|
||
def save_dict_to_file(data: dict, file_path: str):
|
||
"""
|
||
将字典数据保存为 JSON 文件。
|
||
|
||
参数:
|
||
data (dict): 要写入的数据。
|
||
file_path (str): 目标 JSON 文件路径。
|
||
"""
|
||
try:
|
||
os.makedirs(os.path.dirname(file_path), exist_ok=True) # 确保目录存在
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
logger.info("[Success] 数据已成功写入 JSON 文件:%s", file_path)
|
||
# print(f"[Success] 数据已成功写入 JSON 文件:{file_path}")
|
||
except Exception as e:
|
||
# print(f"[Error] 写入 JSON 文件失败:{e}")
|
||
logger.error("[Error] 写入 JSON 文件失败:", exc_info=e)
|
||
|
||
#从指定文件中加载标准化的名称列表。
|
||
def load_standard_name_list(file_path: str):
|
||
"""
|
||
从指定文件中加载标准化的名称列表。
|
||
|
||
参数:
|
||
file_path (str): 文件路径,文件应包含标准化的名称列表,每行一个名称。
|
||
|
||
返回:
|
||
list: 从文件中读取的标准化名称列表。
|
||
|
||
异常:
|
||
FileNotFoundError: 如果文件不存在,抛出此异常。
|
||
Exception: 如果读取文件时发生其他错误,抛出此异常。
|
||
"""
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
lines = [line.strip() for line in file if line.strip()]
|
||
return lines
|
||
except FileNotFoundError:
|
||
logger.info("错误:文件 %s 不存在", file_path)
|
||
# print(f"错误:文件 {file_path} 不存在", flush=True)
|
||
raise FileNotFoundError(f"错误:文件 {file_path} 不存在")
|
||
except Exception as e:
|
||
logger.error("读取文件时发生错误:", exc_info=e)
|
||
# print(f"读取文件时发生错误:{e}", flush=True)
|
||
raise Exception(f"错误:文件 {file_path} 不存在")
|
||
|
||
#将标准化名称列表写入指定文件中,每行一个名称。
|
||
def save_standard_name_list_to_file(name_list, file_path):
|
||
"""
|
||
将标准化名称列表写入指定文件中,每行一个名称。
|
||
|
||
参数:
|
||
name_list (list): 要写入文件的名称字符串列表。
|
||
file_path (str): 要写入的目标文件路径。
|
||
"""
|
||
try:
|
||
os.makedirs(os.path.dirname(file_path), exist_ok=True) # 确保文件夹存在
|
||
with open(file_path, 'w', encoding='utf-8') as file:
|
||
for name in name_list:
|
||
file.write(f"{name}\n")
|
||
logger.info("[Success] 名称列表已写入文件:%s", file_path)
|
||
# print(f"[Success] 名称列表已写入文件:{file_path}")
|
||
except Exception as e:
|
||
logger.error(f"[Error] 写入文件失败:{e}")
|
||
|
||
def extract_number(text):
|
||
"""
|
||
提取项目部中的数字(支持阿拉伯数字和中文数字),并转换为统一格式(中文数字)。
|
||
"""
|
||
match = re.search(r'(第?[一二三四五六七八九十百千零\d]+)', text)
|
||
if match:
|
||
num_str = match.group(1).replace("第", "")
|
||
if num_str.isdigit():
|
||
return digit_to_chinese.get(num_str, num_str) # 阿拉伯数字转中文
|
||
return num_str # 中文数字直接返回
|
||
return None
|
||
|
||
|
||
def replace_arabic_with_chinese(text):
|
||
"""
|
||
将字符串中所有连续的阿拉伯数字转换为对应的中文数字。
|
||
示例:2024年25号 -> 二千零二十四年二十五号
|
||
"""
|
||
def convert(match):
|
||
num_str = match.group()
|
||
try:
|
||
return cn2an.an2cn(num_str, "low") # "low" 使用小写中文数字
|
||
except Exception as e:
|
||
logger.error("转换失败,异常信息:", exc_info=e)
|
||
# print(f"转换失败,异常信息: {e}") # 打印异常信息
|
||
return num_str # 转换失败时保留原数字
|
||
|
||
return re.sub(r'\d+', convert, text)
|
||
|
||
|
||
def fuzzy_match_and_filter(input_key, match_pool, mapping_dict, lower_score=70, high_score=85, top_k=3):
|
||
"""
|
||
对输入字符串在候选池中执行模糊匹配,并返回匹配程度高的映射原始值。
|
||
:param input_key: 清洗后的用于匹配的关键词(如简化名或拼音)
|
||
:param match_pool: 可用于匹配的候选集合(一般是映射表的 key)
|
||
:param mapping_dict: 匹配项到标准原始名的映射字典
|
||
:param lower_score: 匹配分数的下限,低于该分数视为无效
|
||
:param high_score: 高置信度匹配分数,超过则直接返回所有等分结果
|
||
:param top_k: 如果不满足高置信度,则返回前 top_k 个结果
|
||
:return: 标准原始名的列表,可能为空
|
||
"""
|
||
match_results = process.extract(input_key, match_pool, scorer=cast(Callable, WRatio), limit=len(match_pool))
|
||
high_conf_matches = [(m[0], m[1]) for m in match_results if m[1] >= lower_score]
|
||
# print(f"{input_key}匹配结果:{high_conf_matches}")
|
||
logger.info(f"{input_key}匹配结果:{high_conf_matches}")
|
||
|
||
if not high_conf_matches:
|
||
return []
|
||
|
||
max_score = max(high_conf_matches, key=lambda x: x[1])
|
||
best_matches = [m for m in high_conf_matches if m[1] == max_score[1]]
|
||
|
||
if max_score[1] >= high_score:
|
||
return [mapping_dict[m[0]] for m in best_matches]
|
||
else:
|
||
return [mapping_dict[m[0]] for m in high_conf_matches[:top_k]]
|
||
|
||
def fuzzy_match_and_filter_only_high_score(input_key, match_pool, mapping_dict, high_score=90, top_k=3):
|
||
"""
|
||
对输入字符串在候选池中执行模糊匹配,并返回匹配程度高的映射原始值。
|
||
:param input_key: 清洗后的用于匹配的关键词(如简化名或拼音)
|
||
:param match_pool: 可用于匹配的候选集合(一般是映射表的 key)
|
||
:param mapping_dict: 匹配项到标准原始名的映射字典
|
||
:param high_score: 高置信度匹配分数,超过则直接返回所有等分结果
|
||
:param top_k: 如果不满足高置信度,则返回前 top_k 个结果
|
||
:return: 标准原始名的列表,可能为空
|
||
"""
|
||
match_results = process.extract(input_key, match_pool, scorer=cast(Callable, WRatio), limit=len(match_pool))
|
||
high_conf_matches = [(m[0], m[1]) for m in match_results if m[1] >= high_score]
|
||
# print(f"匹配结果:{high_conf_matches}")
|
||
logger.info(f"匹配结果:{high_conf_matches}")
|
||
if not high_conf_matches:
|
||
return []
|
||
|
||
max_score = max(high_conf_matches, key=lambda x: x[1])
|
||
best_matches = [m for m in high_conf_matches if m[1] == max_score[1]]
|
||
|
||
if max_score[1] >= high_score:
|
||
return [mapping_dict[m[0]] for m in best_matches]
|
||
else:
|
||
return [mapping_dict[m[0]] for m in high_conf_matches[:top_k]]
|
||
|
||
def standardize_name(input_name, clean_func, simply_map, pinyin_map, lower_score=70, high_score=85):
|
||
"""
|
||
通用名称标准化函数,按中文 → 清洗 → 简化匹配 → 拼音匹配 的顺序进行处理。
|
||
|
||
:param input_name: 用户输入的原始中文名
|
||
:param clean_func: 清洗函数(针对不同实体类型,如工程名/公司名)
|
||
:param simply_map: 简化后的名 → 原始标准名 映射
|
||
:param pinyin_map: 拼音名 → 原始标准名 映射
|
||
:param lower_score: 模糊匹配最低置信分数
|
||
:param high_score: 模糊匹配高置信分数阈值
|
||
:return: 标准名列表,可能为空
|
||
"""
|
||
simply_input = clean_func(input_name)
|
||
result = fuzzy_match_and_filter(simply_input, list(simply_map.keys()), simply_map, lower_score, high_score)
|
||
if result:
|
||
return result
|
||
|
||
# 拼音匹配
|
||
pinyin_input = text_to_pinyin(simply_input)
|
||
result = fuzzy_match_and_filter(pinyin_input, list(pinyin_map.keys()), pinyin_map, lower_score, high_score)
|
||
return result
|
||
|
||
def standardize_name_only_high_score(input_name, clean_func, simply_map, pinyin_map, high_score=90):
|
||
"""
|
||
通用名称标准化函数,按中文 → 清洗 → 简化匹配 → 拼音匹配 的顺序进行处理。
|
||
|
||
:param input_name: 用户输入的原始中文名
|
||
:param clean_func: 清洗函数(针对不同实体类型,如工程名/公司名)
|
||
:param simply_map: 简化后的名 → 原始标准名 映射
|
||
:param pinyin_map: 拼音名 → 原始标准名 映射
|
||
:param lower_score: 模糊匹配最低置信分数
|
||
:param high_score: 模糊匹配高置信分数阈值
|
||
:return: 标准名列表,可能为空
|
||
"""
|
||
simply_input = clean_func(input_name)
|
||
result = fuzzy_match_and_filter_only_high_score(simply_input, list(simply_map.keys()), simply_map, high_score)
|
||
if result:
|
||
return result
|
||
|
||
# 拼音匹配
|
||
pinyin_input = text_to_pinyin(simply_input)
|
||
result = fuzzy_match_and_filter_only_high_score(pinyin_input, list(pinyin_map.keys()), pinyin_map, high_score)
|
||
return result
|
||
|
||
#标准化班组名称
|
||
def standardize_team_name(input_name, simply_map, pinyin_map, lower_score=70, high_score=90):
|
||
"""
|
||
对用户输入的班组名称进行标准化,返回最匹配的标准班组名列表。
|
||
|
||
:param input_name: 原始中文班组名称
|
||
:param simply_map: 清洗后的班组名称 → 标准班组名称映射
|
||
:param pinyin_map: 洗后班组名称的拼音 → 标准班组名称名映射
|
||
:param lower_score: 模糊匹配分数下限
|
||
:param high_score: 高置信匹配分数阈值
|
||
:return: 匹配的标准公司名列表
|
||
"""
|
||
return standardize_name(input_name, clean_useless_team_leader_name, simply_map, pinyin_map, lower_score, high_score)
|
||
|
||
|
||
def standardize_sub_company(input_name, simply_map, pinyin_map, lower_score=55, high_score=80):
|
||
"""
|
||
对用户输入的子公司名称进行标准化,返回最匹配的标准公司名列表。
|
||
|
||
:param input_name: 原始中文子公司名
|
||
:param simply_map: 清洗后的公司名 → 标准公司名映射
|
||
:param pinyin_map: 洗后公司的拼音 → 标准公司名映射
|
||
:param lower_score: 模糊匹配分数下限
|
||
:param high_score: 高置信匹配分数阈值
|
||
:return: 匹配的标准公司名列表
|
||
"""
|
||
temp_input_name = replace_arabic_with_chinese(input_name)
|
||
return standardize_name(temp_input_name, clean_useless_company_name, simply_map, pinyin_map, lower_score, high_score)
|
||
|
||
def standardize_project_name(input_name, simply_map, pinyin_map, lower_score=70, high_score=90):
|
||
"""
|
||
对用户输入的项目名称进行标准化,返回最匹配的标准项目名列表。
|
||
|
||
:param input_name: 原始中文项目名
|
||
:param simply_map: 清洗后的项目名 → 标准项目名映射
|
||
:param pinyin_map: 清洗后项目的拼音 → 标准项目名映射
|
||
:param lower_score: 模糊匹配分数下限
|
||
:param high_score: 高置信匹配分数阈值
|
||
:return: 匹配的标准项目名列表
|
||
"""
|
||
return standardize_name(input_name, clean_useless_project_name, simply_map, pinyin_map, lower_score, high_score)
|
||
|
||
|
||
#标准化项目部名
|
||
def standardize_projectDepartment(standard_company, input_project, company_project_department_map, high_score=90):
|
||
"""
|
||
将口语化的公司名和项目部名转换为标准化名称。
|
||
参数:
|
||
standard_company (str): 标准化公司名。
|
||
input_project (str): 用户输入的项目部名(可能是口语化或不完整的名称)。
|
||
company_project_department_map (dict): 标准化的公司名和项目部名数据,格式为 {公司名: [项目部名1, 项目部名2, ...]}。
|
||
pinyin_to_original_map:分公司拼音和分公司原始名的映射
|
||
返回:
|
||
tuple: (标准化公司名, 匹配的项目部名列表)。如果无法匹配,返回 (None, None)。
|
||
"""
|
||
try:
|
||
# **2. 先尝试直接匹配最相似的项目名**
|
||
temp_input_project = replace_arabic_with_chinese(input_project)
|
||
|
||
temp_input_project = clean_useless_program_departement_name(temp_input_project)
|
||
|
||
program_list = company_project_department_map.get(standard_company, [])
|
||
cleaned_map = {clean_useless_program_departement_name(p): p for p in program_list}
|
||
|
||
project_match = process.extractOne(temp_input_project, list(cleaned_map.keys()),
|
||
scorer=cast(Callable, WRatio))
|
||
|
||
logger.info(f"{input_project} 名称最相似:{project_match[0]}, {project_match[1]}")
|
||
if project_match and project_match[1] >= high_score:
|
||
matched_raw = cleaned_map[project_match[0]]
|
||
return [matched_raw] # 返回原始名称
|
||
|
||
# **3. 提取项目部的数字部分**
|
||
query_number = extract_number(temp_input_project)
|
||
|
||
# **4. 过滤所有符合数字的项目部**
|
||
matched_projects = []
|
||
for project in company_project_department_map[standard_company]:
|
||
project_number = extract_number(project)
|
||
if query_number and query_number == project_number:
|
||
matched_projects.append(project)
|
||
|
||
return matched_projects
|
||
except Exception as e:
|
||
logger.error("standardize_projectDepartment:", exc_info=e)
|
||
# print(f"standardize_projectDepartment:{e}", flush=True)
|
||
return None
|
||
|
||
|
||
def multiple_standardize_single_name(origin_input_name, origin_name_list, pinyin_name_list=None,
|
||
pinyin_to_original_map=None, lower_score=70, high_score=85, isArabicNumConv=False):
|
||
"""
|
||
使用拼音 + rapidfuzz 进行关键词模糊匹配,并返回原始的标准名
|
||
:param origin_input_name: 口语化的名称(中文)
|
||
:param origin_name_list: 关键词列表(中文)
|
||
:pinyin_name_list: 关键词列表(拼音)
|
||
:param pinyin_to_original_map: 拼音到原始标准名的映射
|
||
:param lower_score: 低匹配分数阈值(默认70)
|
||
:param high_score: 高匹配分数阈值(默认85)
|
||
:return: 最匹配的原始关键词,或 None
|
||
"""
|
||
#First round, 原始标准名的匹配性查找,能找到直接返回
|
||
if isArabicNumConv:
|
||
origin_input_name = arabic_to_chinese_number(origin_input_name)
|
||
match_results = process.extract(origin_input_name, origin_name_list, scorer=fuzz.token_sort_ratio,
|
||
limit=len(origin_name_list))
|
||
# 找到所有相似度 > 80 的匹配项
|
||
original_high_confidence_matches = [(match[0], match[1]) for match in match_results if match[1] >= lower_score]
|
||
logger.info(f"standardize_pinyin_single_name 原始名匹配, high_confidence_matches:{original_high_confidence_matches[:3]}")
|
||
|
||
combined_low_confidence_matches = []
|
||
if original_high_confidence_matches:
|
||
origin_best_match = max(original_high_confidence_matches, key=lambda x: x[1], default=None)
|
||
if origin_best_match and origin_best_match[1] > high_score:
|
||
return [origin_best_match[0]]
|
||
|
||
else:
|
||
combined_low_confidence_matches = [match[0] for match in original_high_confidence_matches[:3]]
|
||
else:
|
||
if not pinyin_name_list or not pinyin_to_original_map:
|
||
return None #
|
||
|
||
return list(dict.fromkeys(combined_low_confidence_matches))
|
||
|
||
|
||
def generate_project_prompt_with_key(matched_projects, original_name="", slot_key = IMPLEMENTATION_ORG):
|
||
"""
|
||
生成提示信息,用于让用户确认匹配的项目名或分公司名或项目名。
|
||
|
||
参数:
|
||
matched_projects (list): 匹配的项目或分公司名称列表。
|
||
type (str): 提示信息的类型(例如 "项目名" 或 "分公司名"),默认值为 "项目名"。
|
||
|
||
返回:
|
||
str: 生成的提示信息。如果未找到匹配项,返回提示用户提供更准确信息的字符串。
|
||
"""
|
||
logger.info(f"generate_project_prompt_with_key slot_key:{slot_key},original_name:{original_name},matched_projects:{matched_projects} ")
|
||
type = ""
|
||
if slot_key == CONSTRUCTION_UNIT:
|
||
type = "建管单位名"
|
||
elif slot_key == IMPLEMENTATION_ORG:
|
||
type = "实施组织名"
|
||
elif slot_key == SUBCONTRACTOR:
|
||
type = "分包单位名"
|
||
elif slot_key == PROJECT_NAME:
|
||
type = "工程名"
|
||
elif slot_key == PROJECT_DEPARTMENT:
|
||
type = "项目名"
|
||
|
||
# print(f"generate_project_prompt_with_key type:{type} ")
|
||
logger.info(f"generate_project_prompt_with_key type:{type} ")
|
||
if not matched_projects:
|
||
if slot_key in (CONSTRUCTION_UNIT,IMPLEMENTATION_ORG,SUBCONTRACTOR):
|
||
return f"<p>未找到匹配的<strong>公司名</strong>:{original_name},请提供更准确的公司名信息。</p>"
|
||
else:
|
||
return f"<p>未找到匹配的:{original_name},请提供更准确的信息。</p>"
|
||
else:
|
||
html_parts = [f"<p>您说的<strong>{type}</strong>可能是:</p>"]
|
||
for idx, item in enumerate(matched_projects, start=1):
|
||
html_parts.append(f"""
|
||
<div class="project-entry">
|
||
<text class="label"><strong>第{idx}个:</strong>{item}</text><br>
|
||
</div>
|
||
""")
|
||
html_parts.append("<p>请确认您要选择哪一个?</p>")
|
||
return "\n".join(html_parts)
|
||
|
||
def generate_project_prompt(matched_projects, original_name="", type="项目部名"):
|
||
"""
|
||
生成提示信息,用于让用户确认匹配的项目名或分公司名或项目名。
|
||
|
||
参数:
|
||
matched_projects (list): 匹配的项目或分公司名称列表。
|
||
type (str): 提示信息的类型(例如 "项目名" 或 "分公司名"),默认值为 "项目名"。
|
||
|
||
返回:
|
||
str: 生成的提示信息。如果未找到匹配项,返回提示用户提供更准确信息的字符串。
|
||
"""
|
||
if not matched_projects:
|
||
return f"<p>未找到匹配的<strong>{type}</strong>:{original_name},请提供更准确的{type}信息。</p>"
|
||
|
||
html_parts = [f"<p>您说的<strong>{type}</strong>可能是:</p>"]
|
||
for idx, item in enumerate(matched_projects, start=1):
|
||
html_parts.append(f"""
|
||
<div class="project-entry">
|
||
<text class="label"><strong>第{idx}个:</strong>{item},</text><br>
|
||
</div>
|
||
""")
|
||
html_parts.append("<p>请确认您要选择哪一个?</p>")
|
||
return "\n".join(html_parts)
|
||
|
||
def generate_confirm_prompt(matched_projects, original_name="", type="项目部名"):
|
||
"""
|
||
生成提示信息,用于让用户确认匹配的项目名或分公司名或项目名。
|
||
|
||
参数:
|
||
matched_projects (list): 匹配的项目或分公司名称列表。
|
||
type (str): 提示信息的类型(例如 "项目名" 或 "分公司名"),默认值为 "项目名"。
|
||
|
||
返回:
|
||
str: 生成的提示信息。如果未找到匹配项,返回提示用户提供更准确信息的字符串。
|
||
"""
|
||
html_parts = [f"<p>您说的<strong>{type}</strong>可能是:</p>"]
|
||
for idx, item in enumerate(matched_projects, start=1):
|
||
html_parts.append(f"""
|
||
<div class="project-entry">
|
||
<text class="label"><strong>第{idx}个:</strong>{item},</text><br>
|
||
</div>
|
||
""")
|
||
html_parts.append("<p>请确认您要选择哪一个?</p>")
|
||
return "\n".join(html_parts)
|
||
|
||
|
||
class CheckResult(Enum):
|
||
NO_MATCH = 0 # 不符合检查条件
|
||
MATCH_FOUND = 1 # 匹配到了值
|
||
NEEDS_MORE_ROUNDS = 2 # 需要多轮
|
||
|
||
|
||
class StandardType(Enum):
|
||
#工程名检查
|
||
PROJECT_CHECK = 0
|
||
#项目名检查
|
||
PROGRAM_CHECK = 1
|
||
|
||
|
||
# 构建一个用于替换的正则表达式
|
||
useless_project_words_pattern = re.compile("|".join(USELESS_PROJECT_WORDS))
|
||
useless_company_words_pattern = re.compile("|".join(USELESS_COMPANY_WORDS))
|
||
useless_program_department_words_pattern = re.compile("|".join(USELESS_PROGRAM_DEPARTMENT_WORDS))
|
||
|
||
# 匹配所有数字、字母(含大小写)、特殊字符(包括空格、标点)
|
||
project_symbols_pattern = re.compile(r"[A-Za-z0-9\s\W_ⅰ-ⅷⅠ-Ⅻⅸⅹⅺⅻ]+", re.UNICODE)
|
||
#特殊字符
|
||
company_symbols_pattern = re.compile(r"[\s\W_]+")
|
||
|
||
useless_team_leader_words_pattern = re.compile("班组")
|
||
|
||
|
||
def clean_useless_project_name(name: str) -> str:
|
||
# 去掉无意义词
|
||
name = useless_project_words_pattern.sub("", name)
|
||
# 去掉数字、字母、符号
|
||
name = project_symbols_pattern.sub("", name)
|
||
return name.strip()
|
||
|
||
|
||
def clean_useless_company_name(name: str) -> str:
|
||
# 去掉无意义词
|
||
name = useless_company_words_pattern.sub("", name)
|
||
name = company_symbols_pattern.sub("", name)
|
||
return name.strip()
|
||
|
||
def clean_useless_team_leader_name(name: str) -> str:
|
||
# 去掉无意义词
|
||
name = useless_team_leader_words_pattern.sub("", name)
|
||
return name.strip()
|
||
|
||
#去掉项目部里面的不重要词汇
|
||
def clean_useless_program_departement_name(name: str) -> str:
|
||
# 去掉无意义词
|
||
name = useless_program_department_words_pattern.sub("", name)
|
||
# 去掉数字、字母、符号
|
||
name = project_symbols_pattern.sub("", name)
|
||
return name.strip()
|
||
|
||
#槽位缺失检查
|
||
def check_lost(int_res, slot):
|
||
#labels: ["天气查询","通用对话","页面切换","日计划数量查询","周计划数量查询","日计划作业内容","周计划作业内容","施工人数","作业考勤人数","知识问答"]
|
||
mapping = {
|
||
2: [['page'], ['app'], ['module']],
|
||
3: [['date']],
|
||
4: [['date']],
|
||
5: [['date']],
|
||
6: [['date']],
|
||
7: [['date']],
|
||
8: [['date']],
|
||
11: [['date']],
|
||
12: [['date']],
|
||
13: [['date']],
|
||
14: [['date']],
|
||
15: [['date']],
|
||
}
|
||
|
||
intention_mapping = {2: "页面切换", 3: "日计划数量查询", 4: "周计划数量查询", 5: "日计划作业内容",
|
||
6: "周计划作业内容", 7: "施工人数", 8: "作业考勤人数", 11: "作业面查询",
|
||
12: "班组人数查询", 13: "班组数查询", 14: "作业面内容", 15: "班组详情"}
|
||
if not mapping.__contains__(int_res):
|
||
return 0, ""
|
||
#提取的槽位信息
|
||
cur_k = list(slot.keys())
|
||
idx = -1
|
||
idx_len = 99
|
||
for i in range(len(mapping[int_res])):
|
||
sk = mapping[int_res][i]
|
||
#不在提取的槽位信息里,但是在必须槽位表里
|
||
miss_params = [x for x in sk if x not in cur_k]
|
||
#不在必须槽位表里,但是在提取的槽位信息里
|
||
extra_params = [x for x in cur_k if x not in sk]
|
||
if len(extra_params) >= 0 and len(miss_params) == 0:
|
||
idx = i
|
||
idx_len = 0
|
||
break
|
||
if len(miss_params) < idx_len:
|
||
idx = i
|
||
idx_len = len(miss_params)
|
||
|
||
if idx_len == 0: # 匹配通过
|
||
return CheckResult.NO_MATCH, cur_k
|
||
#符合当前意图的的必须槽位,但是不在提取的槽位信息里
|
||
left = [x for x in mapping[int_res][idx] if x not in cur_k]
|
||
logger.info(f"符合当前意图的的必须槽位,但是不在提取的槽位信息里, {left}")
|
||
apologize_str = "非常抱歉,"
|
||
if int_res == 2:
|
||
return CheckResult.NEEDS_MORE_ROUNDS, f"{apologize_str}请问你想查询哪个页面?"
|
||
elif int_res in [3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 15,16]:
|
||
return CheckResult.NEEDS_MORE_ROUNDS, f"{apologize_str}请问你想查询什么时间的{intention_mapping[int_res]}?"
|
||
|
||
|
||
def check_standard_name_slot_probability(int_res, slot) -> tuple:
|
||
intention_list = {3, 4, 5, 6, 7, 8, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}
|
||
if int_res not in intention_list:
|
||
return CheckResult.NO_MATCH, ""
|
||
|
||
#项目名 当项目名存在时需要一定存在分公司(实施组织)名
|
||
if PROJECT_DEPARTMENT in slot:
|
||
if IMPLEMENTATION_ORG not in slot:
|
||
return CheckResult.NEEDS_MORE_ROUNDS, "请补充该项目部所属的分公司名称"
|
||
|
||
#工程名和分公司名和项目名标准化
|
||
for key, value in slot.items():
|
||
if key == PROJECT_NAME:
|
||
logger.info(f"check_standard_name_slot_probability 原始工程名 : {slot[PROJECT_NAME]}")
|
||
match_results = standardize_project_name(value, GlobalData.simply_to_standard_project_name_map,
|
||
GlobalData.pinyin_simply_to_standard_project_name_map, 70, 90)
|
||
logger.info(f"check_standard_name_slot_probability 匹配后工程名 :result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt(match_results, original_name=slot[PROJECT_NAME], type="工程名")
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
|
||
if key == IMPLEMENTATION_ORG and slot[key] != "公司":
|
||
logger.info(f"check_standard_name_slot_probability 原始分公司名 : {slot[IMPLEMENTATION_ORG]}")
|
||
match_results = standardize_sub_company(value, GlobalData.simply_to_standard_company_name_map,
|
||
GlobalData.pinyin_simply_to_standard_company_name_map, 70, 90)
|
||
logger.info(f"check_standard_name_slot_probability 匹配后分公司名: result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt_with_key(match_results, original_name=slot[IMPLEMENTATION_ORG], slot_key= IMPLEMENTATION_ORG)
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
|
||
if key == CONSTRUCTION_UNIT:
|
||
logger.info(f"check_standard_name_slot_probability 原始建管单位名 : {slot[CONSTRUCTION_UNIT]}")
|
||
match_results = standardize_sub_company(value, GlobalData.simply_to_standard_construct_name_map,
|
||
GlobalData.pinyin_simply_to_standard_construct_name_map, 70, 90)
|
||
logger.info(f"check_standard_name_slot_probability 匹配后建管单位名: result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt_with_key(match_results, original_name=slot[CONSTRUCTION_UNIT], slot_key= CONSTRUCTION_UNIT)
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
|
||
if key == SUBCONTRACTOR:
|
||
logger.info(f"check_standard_name_slot_probability 原始分包单位名 : {slot[SUBCONTRACTOR]}")
|
||
match_results = standardize_sub_company(value, GlobalData.simply_to_standard_constractor_name_map,
|
||
GlobalData.pinyin_simply_to_standard_constractor_name_map, 70, 90)
|
||
logger.info(f"check_standard_name_slot_probability 匹配后分包单位名: result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt_with_key(match_results, original_name=slot[SUBCONTRACTOR], slot_key= SUBCONTRACTOR)
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
|
||
if key == PROJECT_DEPARTMENT:
|
||
logger.info(f"check_standard_name_slot 原始项目部名 : {slot[PROJECT_DEPARTMENT]}")
|
||
match_results = standardize_projectDepartment(slot[IMPLEMENTATION_ORG], value, GlobalData.standard_company_program,
|
||
high_score=90)
|
||
logger.info(f"check_standard_name_slot 匹配后项目部名: result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt(match_results, original_name=slot[PROJECT_DEPARTMENT], type="项目名")
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
if key == TEAM_NAME:
|
||
logger.info(f"check_standard_name_slot 原始班组名 : {slot[TEAM_NAME]}")
|
||
match_results = standardize_team_name(value, GlobalData.simply_to_standard_team_leader_name_map,
|
||
GlobalData.pinyin_simply_to_standard_team_leader_name_map, 70, 90)
|
||
logger.info(f"check_standard_name_slot 匹配后班组名: result:{match_results}")
|
||
if match_results and len(match_results) == 1:
|
||
slot[key] = match_results[0]
|
||
else:
|
||
prompt = generate_project_prompt(match_results, original_name=slot[TEAM_NAME], type="班组名称")
|
||
return CheckResult.NEEDS_MORE_ROUNDS, prompt
|
||
if key == RISK_LEVEL:
|
||
if slot[RISK_LEVEL] not in ["2级", "3级", "4级", "5级"] and slot[RISK_LEVEL] not in ["二级", "三级", "四级",
|
||
"五级"]:
|
||
return CheckResult.NEEDS_MORE_ROUNDS, "您查询的风险等级在系统中未找到,请确认风险等级后再次提问"
|
||
|
||
return CheckResult.NO_MATCH, ""
|
||
# |