from enum import Enum from typing import cast, Callable from rapidfuzz import process, fuzz from rapidfuzz.fuzz import WRatio import json from pypinyin import lazy_pinyin from constants import USELESS_COMPANY_WORDS, USELESS_PROJECT_WORDS # 数字转换表(1-20,常见数字) digit_to_chinese = { "1": "一", "2": "二", "3": "三", "4": "四", "5": "五", "6": "六", "7": "七", "8": "八", "9": "九", "10": "十", "11": "十一", "12": "十二", "13": "十三", "14": "十四", "15": "十五", "16": "十六", "17": "十七", "18": "十八", "19": "十九", "20": "二十" } def arabic_to_chinese_number(text): """ 将文中阿拉伯数字转换为中文数字 :param text: 输入文本 :return: 转换后的文本 """ cn_to_arabic = {'一': '1', '二': '2', '三': '3', '四': '4', '五': '5', '六': '6', '七': '7', '八': '8', '九': '9', '零': '0'} arabic_to_cn = {v: k for k, v in cn_to_arabic.items()} # 反向映射 for num, cn in arabic_to_cn.items(): text = text.replace(num, cn) return text def text_to_pinyin(text): """将文本转换为拼音字符串""" return ''.join(lazy_pinyin(text)) def load_standard_data(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def extract_number(text): """ 提取项目部中的数字(支持阿拉伯数字和中文数字),并转换为统一格式(中文数字)。 """ match = re.search(r'(第?[一二三四五六七八九十百千零\d]+)', text) if match: num_str = match.group(1).replace("第", "") if num_str.isdigit(): return digit_to_chinese.get(num_str, num_str) # 阿拉伯数字转中文 return num_str # 中文数字直接返回 return None def fuzzy_match_and_filter(input_key, match_pool, mapping_dict, lower_score=70, high_score=85, top_k=3): """ 对输入字符串在候选池中执行模糊匹配,并返回匹配程度高的映射原始值。 :param input_key: 清洗后的用于匹配的关键词(如简化名或拼音) :param match_pool: 可用于匹配的候选集合(一般是映射表的 key) :param mapping_dict: 匹配项到标准原始名的映射字典 :param lower_score: 匹配分数的下限,低于该分数视为无效 :param high_score: 高置信度匹配分数,超过则直接返回所有等分结果 :param top_k: 如果不满足高置信度,则返回前 top_k 个结果 :return: 标准原始名的列表,可能为空 """ match_results = process.extract(input_key, match_pool, scorer=cast(Callable, WRatio), limit=len(match_pool)) high_conf_matches = [(m[0], m[1]) for m in match_results if m[1] >= lower_score] if not high_conf_matches: return [] max_score = max(high_conf_matches, key=lambda x: x[1]) best_matches = [m for m in high_conf_matches if m[1] == max_score[1]] if max_score[1] >= high_score: return [mapping_dict[m[0]] for m in best_matches] else: return [mapping_dict[m[0]] for m in high_conf_matches[:top_k]] def standardize_name(input_name, clean_func, simply_map, pinyin_map, lower_score=70, high_score=85): """ 通用名称标准化函数,按中文 → 清洗 → 简化匹配 → 拼音匹配 的顺序进行处理。 :param input_name: 用户输入的原始中文名 :param clean_func: 清洗函数(针对不同实体类型,如工程名/公司名) :param simply_map: 简化后的名 → 原始标准名 映射 :param pinyin_map: 拼音名 → 原始标准名 映射 :param lower_score: 模糊匹配最低置信分数 :param high_score: 模糊匹配高置信分数阈值 :return: 标准名列表,可能为空 """ simply_input = clean_func(input_name) result = fuzzy_match_and_filter(simply_input, list(simply_map.keys()), simply_map, lower_score, high_score) if result: return result # 拼音匹配 pinyin_input = text_to_pinyin(simply_input) result = fuzzy_match_and_filter(pinyin_input, list(pinyin_map.keys()), pinyin_map, lower_score, high_score) return result def standardize_sub_company(input_name, simply_map, pinyin_map, lower_score=55, high_score=80): """ 对用户输入的子公司名称进行标准化,返回最匹配的标准公司名列表。 :param input_name: 原始中文子公司名 :param simply_map: 清洗后的公司名 → 标准公司名映射 :param pinyin_map: 洗后公司的拼音 → 标准公司名映射 :param lower_score: 模糊匹配分数下限 :param high_score: 高置信匹配分数阈值 :return: 匹配的标准公司名列表 """ return standardize_name(input_name, clean_useless_company_name, simply_map, pinyin_map, lower_score, high_score) def standardize_project_name(input_name, simply_map, pinyin_map, lower_score=70, high_score=90): """ 对用户输入的项目名称进行标准化,返回最匹配的标准项目名列表。 :param input_name: 原始中文项目名 :param simply_map: 清洗后的项目名 → 标准项目名映射 :param pinyin_map: 清洗后项目的拼音 → 标准项目名映射 :param lower_score: 模糊匹配分数下限 :param high_score: 高置信匹配分数阈值 :return: 匹配的标准项目名列表 """ return standardize_name(input_name, clean_useless_project_name, simply_map, pinyin_map, lower_score, high_score) #标准化项目部名 def standardize_projectDepartment(standard_company, input_project, company_project_department_map, high_score=90): """ 将口语化的公司名和项目部名转换为标准化名称。 参数: standard_company (str): 标准化公司名。 input_project (str): 用户输入的项目部名(可能是口语化或不完整的名称)。 company_project_department_map (dict): 标准化的公司名和项目部名数据,格式为 {公司名: [项目部名1, 项目部名2, ...]}。 pinyin_to_original_map:分公司拼音和分公司原始名的映射 返回: tuple: (标准化公司名, 匹配的项目部名列表)。如果无法匹配,返回 (None, None)。 """ try: # **2. 先尝试直接匹配最相似的项目名** project_match = process.extractOne(input_project, company_project_department_map[standard_company], scorer=cast(Callable, WRatio)) print(f"项目部名称最相似:{project_match[0]},{project_match[1]}", flush=True) if project_match and project_match[1] >= high_score: return [project_match[0]] # 直接返回匹配的项目名 # **3. 提取项目部的数字部分** query_number = extract_number(input_project) # **4. 过滤所有符合数字的项目部** matched_projects = [] for project in company_project_department_map[standard_company]: project_number = extract_number(project) if query_number and query_number == project_number: matched_projects.append(project) return matched_projects except Exception as e: print(f"standardize_projectDepartment:{e}", flush=True) return None def multiple_standardize_single_name(origin_input_name, origin_name_list, pinyin_name_list=None, pinyin_to_original_map=None, lower_score=70, high_score=85, isArabicNumConv=False): """ 使用拼音 + rapidfuzz 进行关键词模糊匹配,并返回原始的标准名 :param origin_input_name: 口语化的名称(中文) :param origin_name_list: 关键词列表(中文) :pinyin_name_list: 关键词列表(拼音) :param pinyin_to_original_map: 拼音到原始标准名的映射 :param lower_score: 低匹配分数阈值(默认70) :param high_score: 高匹配分数阈值(默认85) :return: 最匹配的原始关键词,或 None """ #First round, 原始标准名的匹配性查找,能找到直接返回 if isArabicNumConv: origin_input_name = arabic_to_chinese_number(origin_input_name) match_results = process.extract(origin_input_name, origin_name_list, scorer=fuzz.token_sort_ratio, limit=len(origin_name_list)) # 找到所有相似度 > 80 的匹配项 original_high_confidence_matches = [(match[0], match[1]) for match in match_results if match[1] >= lower_score] print(f"standardize_pinyin_single_name 原始名匹配, high_confidence_matches:{original_high_confidence_matches[:3]}", flush=True) combined_low_confidence_matches = [] if original_high_confidence_matches: origin_best_match = max(original_high_confidence_matches, key=lambda x: x[1], default=None) if origin_best_match and origin_best_match[1] > high_score: return [origin_best_match[0]] else: combined_low_confidence_matches = [match[0] for match in original_high_confidence_matches[:3]] else: if not pinyin_name_list or not pinyin_to_original_map: return None # return list(dict.fromkeys(combined_low_confidence_matches)) def generate_project_prompt(matched_projects, original_name="", type="项目部名"): """ 生成提示信息,用于让用户确认匹配的项目名或分公司名或项目名。 参数: matched_projects (list): 匹配的项目或分公司名称列表。 type (str): 提示信息的类型(例如 "项目名" 或 "分公司名"),默认值为 "项目名"。 返回: str: 生成的提示信息。如果未找到匹配项,返回提示用户提供更准确信息的字符串。 """ if not matched_projects: return f"

未找到匹配的{type}:{original_name},请提供更准确的{type}信息。

" html_parts = [f"

您说的{type}可能是:

"] for idx, item in enumerate(matched_projects, start=1): html_parts.append(f"""
第{idx}个:{item}
""") html_parts.append("

请确认您要选择哪一个?

") return "\n".join(html_parts) def load_standard_name(file_path: str): """ 从指定文件中加载标准化的名称列表。 参数: file_path (str): 文件路径,文件应包含标准化的名称列表,每行一个名称。 返回: list: 从文件中读取的标准化名称列表。 异常: FileNotFoundError: 如果文件不存在,抛出此异常。 Exception: 如果读取文件时发生其他错误,抛出此异常。 """ try: with open(file_path, 'r', encoding='utf-8') as file: lines = [line.strip() for line in file if line.strip()] return lines except FileNotFoundError: print(f"错误:文件 {file_path} 不存在", flush=True) raise FileNotFoundError(f"错误:文件 {file_path} 不存在") except Exception as e: print(f"读取文件时发生错误:{e}", flush=True) raise Exception(f"错误:文件 {file_path} 不存在") class CheckResult(Enum): NO_MATCH = 0 # 不符合检查条件 MATCH_FOUND = 1 # 匹配到了值 NEEDS_MORE_ROUNDS = 2 # 需要多轮 class StandardType(Enum): #工程名检查 PROJECT_CHECK = 0 #项目名检查 PROGRAM_CHECK = 1 import re # 构建一个用于替换的正则表达式 useless_project_words_pattern = re.compile("|".join(USELESS_PROJECT_WORDS)) useless_company_words_pattern = re.compile("|".join(USELESS_COMPANY_WORDS)) # 匹配所有数字、字母(含大小写)、特殊字符(包括空格、标点) project_symbols_pattern = re.compile(r"[A-Za-z0-9\s\W_]+") company_symbols_pattern = re.compile(r"[\s\W_]+") def clean_useless_project_name(name: str) -> str: # 去掉无意义词 name = useless_project_words_pattern.sub("", name) # 去掉数字、字母、符号 name = project_symbols_pattern.sub("", name) return name.strip() def clean_useless_company_name(name: str) -> str: # 去掉无意义词 name = useless_company_words_pattern.sub("", name) name = company_symbols_pattern.sub("", name) return name.strip()