Intention/api/utils.py

216 lines
9.5 KiB
Python
Raw Normal View History

from enum import Enum
from rapidfuzz import process, fuzz
import re
import json
from pypinyin import lazy_pinyin
# 数字转换表1-20常见数字
digit_to_chinese = {
"1": "", "2": "", "3": "", "4": "", "5": "",
"6": "", "7": "", "8": "", "9": "", "10": "",
"11": "十一", "12": "十二", "13": "十三", "14": "十四",
"15": "十五", "16": "十六", "17": "十七", "18": "十八",
"19": "十九", "20": "二十"
}
def arabic_to_chinese_number(text):
"""
将文中阿拉伯数字转换为中文数字
:param text: 输入文本
:return: 转换后的文本
"""
cn_to_arabic = {'': '1', '': '2', '': '3', '': '4', '': '5',
'': '6', '': '7', '': '8', '': '9', '': '0'}
arabic_to_cn = {v: k for k, v in cn_to_arabic.items()} # 反向映射
for num, cn in arabic_to_cn.items():
text = text.replace(num, cn)
return text
def text_to_pinyin(text):
"""将文本转换为拼音字符串"""
return ''.join(lazy_pinyin(text))
def load_standard_data(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def extract_number(text):
"""
提取项目部中的数字支持阿拉伯数字和中文数字并转换为统一格式中文数字
"""
match = re.search(r'(第?[一二三四五六七八九十百千零\d]+)', text)
if match:
num_str = match.group(1).replace("", "")
if num_str.isdigit():
return digit_to_chinese.get(num_str, num_str) # 阿拉伯数字转中文
return num_str # 中文数字直接返回
return None
def standardize_company_and_projectDepartment(input_company, input_project, origianl_company_list , company_project_department_map, company_pinyin_to_original_map = None):
"""
将口语化的公司名和项目部名转换为标准化名称
参数:
input_company (str): 用户输入的公司名可能是口语化或不完整的名称
input_project (str): 用户输入的项目部名可能是口语化或不完整的名称
company_project_department_map (dict): 标准化的公司名和项目部名数据格式为 {公司名: [项目部名1, 项目部名2, ...]}
pinyin_to_original_map:分公司拼音和分公司原始名的映射
返回:
tuple: (标准化公司名, 匹配的项目部名列表)如果无法匹配返回 (None, None)
"""
try:
# **1. 标准化公司名**
best_company_match = multiple_standardize_single_name(input_company, origianl_company_list,list(company_pinyin_to_original_map.keys()),company_pinyin_to_original_map,50,85,True)
if not best_company_match:
return None, None
else:
standard_company = best_company_match[0]
# **2. 先尝试直接匹配最相似的项目名**
project_match = process.extractOne(input_project, company_project_department_map[standard_company], scorer=fuzz.ratio)
print(f"项目部名称最相似:{project_match[0]},{project_match[1]}", flush=True)
if project_match and project_match[1] >= 85:
return standard_company, [project_match[0]] # 直接返回匹配的项目名
# **3. 提取项目部的数字部分**
query_number = extract_number(input_project)
# **4. 过滤所有符合数字的项目部**
matched_projects = []
for project in company_project_department_map[standard_company]:
project_number = extract_number(project)
if query_number and query_number == project_number:
matched_projects.append(project)
return standard_company, matched_projects
except Exception as e:
print(f"standardize_company_and_projectDepartment{e}", flush=True)
return None,None
def multiple_standardize_single_name(origin_input_name, origin_name_list, pinyin_name_list = None, pinyin_to_original_map = None, lower_score=70, high_score=85, isArabicNumConv = False):
"""
使用拼音 + rapidfuzz 进行关键词模糊匹配并返回原始的标准名
:param input_name: 口语化的名称中文
:param name_list: 关键词列表中文
:pinyin_name_list: 关键词列表拼音
:param pinyin_to_original_map: 拼音到原始标准名的映射
:param lower_score: 低匹配分数阈值默认70
:param high_score: 高匹配分数阈值默认85
:return: 最匹配的原始关键词 None
"""
#First round, 原始标准名的匹配性查找,能找到直接返回
if isArabicNumConv:
origin_input_name = arabic_to_chinese_number(origin_input_name)
match_results = process.extract(origin_input_name, origin_name_list, scorer=fuzz.token_sort_ratio, limit=len(origin_name_list))
# 找到所有相似度 > 80 的匹配项
original_high_confidence_matches = [(match[0], match[1]) for match in match_results if match[1] >= lower_score]
print(f"standardize_pinyin_single_name 原始名匹配, high_confidence_matches:{original_high_confidence_matches[:3]}", flush=True)
combined_low_confidence_matches = []
if original_high_confidence_matches:
origin_best_match = max(original_high_confidence_matches, key=lambda x: x[1], default=None)
# 直接返回最高相似度的单个匹配项
# print(f"原始名匹配: {origin_best_match}", flush=True)
if origin_best_match and origin_best_match[1] >= high_score:
return [origin_best_match[0]]
else:
combined_low_confidence_matches = [match[0] for match in original_high_confidence_matches[:3]]
else:
if not pinyin_name_list or not pinyin_to_original_map:
return None #
#第二轮, 拼音名的匹配性查找,能找到直接返回
pinyin_input_name = text_to_pinyin(origin_input_name)
match_results = process.extract(pinyin_input_name, pinyin_name_list, scorer=fuzz.ratio, limit=len(pinyin_name_list))
# 筛选出匹配分数 > lower_score 的结果
pinyin_high_confidence_matches = [(match[0], match[1]) for match in match_results if match[1] >= lower_score]
print(f"standardize_pinyin_single_name 拼音匹配, input_name{pinyin_input_name}, high_confidence_matches:{pinyin_high_confidence_matches[:3]}", flush=True)
if not pinyin_high_confidence_matches:
return combined_low_confidence_matches # 没有找到匹配项
# 选择最高相似度的匹配项
pinyin_best_match = max(pinyin_high_confidence_matches, key=lambda x: x[1], default=None)
if pinyin_best_match and pinyin_best_match[1] >= high_score:
return [pinyin_to_original_map[pinyin_best_match[0]]] # 直接返回最高相似度的原始工程名
combined_low_confidence_matches.extend(
[pinyin_to_original_map[match[0]] for match in pinyin_high_confidence_matches[:3]]
)
# 返回所有匹配项对应的原始名最多返回最低匹配项的前5个
return list(dict.fromkeys(combined_low_confidence_matches))
def generate_project_prompt(matched_projects, original_name = "", type="项目部名"):
"""
生成提示信息用于让用户确认匹配的项目名或分公司名或项目名
参数:
matched_projects (list): 匹配的项目或分公司名称列表
type (str): 提示信息的类型例如 "项目名" "分公司名"默认值为 "项目名"
返回:
str: 生成的提示信息如果未找到匹配项返回提示用户提供更准确信息的字符串
"""
if not matched_projects:
return f"<p>未找到匹配的<strong>{type}</strong>{original_name},请提供更准确的{type}信息。</p>"
html_parts = [f"<p>您说的<strong>{type}</strong>可能是:</p>"]
for idx, item in enumerate(matched_projects, start=1):
html_parts.append(f"""
<div class="project-entry">
<text class="label"><strong>{idx}</strong>{item}</text><br>
</div>
""")
html_parts.append("<p>请确认您要选择哪一个?</p>")
return "\n".join(html_parts)
def load_standard_name(file_path: str):
"""
从指定文件中加载标准化的名称列表
参数:
file_path (str): 文件路径文件应包含标准化的名称列表每行一个名称
返回:
list: 从文件中读取的标准化名称列表
异常:
FileNotFoundError: 如果文件不存在抛出此异常
Exception: 如果读取文件时发生其他错误抛出此异常
"""
2025-02-27 17:32:47 +08:00
try:
2025-02-28 07:49:40 +08:00
with open(file_path, 'r', encoding='utf-8') as file:
lines = [line.strip() for line in file if line.strip()]
return lines
2025-02-27 17:32:47 +08:00
except FileNotFoundError:
print(f"错误:文件 {file_path} 不存在", flush=True)
2025-02-27 17:32:47 +08:00
raise FileNotFoundError(f"错误:文件 {file_path} 不存在")
except Exception as e:
print(f"读取文件时发生错误:{e}", flush=True)
2025-02-27 17:32:47 +08:00
raise Exception(f"错误:文件 {file_path} 不存在")
class CheckResult(Enum):
NO_MATCH = 0 # 不符合检查条件
MATCH_FOUND = 1 # 匹配到了值
NEEDS_MORE_ROUNDS = 2 # 需要多轮
class StandardType(Enum):
#工程名检查
PROJECT_CHECK = 0
#项目名检查
PROGRAM_CHECK = 1