From 871e41bb390316f812c486fa8e777cedb568262c Mon Sep 17 00:00:00 2001 From: syruan <15555146157@163.com> Date: Mon, 4 Aug 2025 19:40:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=BA=E5=85=B7=E5=AE=A1=E6=A0=B8=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- machines/repair_audit_details.py | 243 +++++++++++++++++++++++++++++++ machines/tm_task.py | 155 ++++++++++++++++++++ 2 files changed, 398 insertions(+) create mode 100644 machines/repair_audit_details.py create mode 100644 machines/tm_task.py diff --git a/machines/repair_audit_details.py b/machines/repair_audit_details.py new file mode 100644 index 0000000..79d2adf --- /dev/null +++ b/machines/repair_audit_details.py @@ -0,0 +1,243 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus + +# @author: 阮世耀 +# 描述:处理ba_ma_input_check表数据到repair_audit_details表的迁移 + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +def process_repair_audit_details(): + """处理ba_ma_input_check表数据到repair_audit_details表的迁移""" + try: + # 源表查询SQL(包含UNION的复杂查询) + sql = """ + SELECT + bic.ID as task_id, + bic.REPAIR_ID as repair_id, + mm.TYPE AS type_id, + ttm.MA_ID as ma_id, + 1 as repair_num, + IF(ttm.MA_ID = tmsr.MA_ID, 1, 0) as scrap_num, + 1 - IF(ttm.MA_ID = tmsr.MA_ID, 1, 0) as repaired_num, + bic.IS_SURE as status, + bic.CREATE_TIME, + bic.CREATOR as create_by + FROM + ba_ma_input_check bic + LEFT JOIN ba_ma_scarp bms on bic.REPAIR_ID = bms.REPAIR_ID + LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID and tmsr.IS_COUNT = 0 + LEFT JOIN tm_task_ma ttm on ttm.task_id = bic.id + LEFT JOIN ma_machines mm on ttm.ma_id = mm.id + WHERE mm.id is not null + + UNION + + SELECT + bic.ID as task_id, + bic.REPAIR_ID as repair_id, + ttmt.MA_TYPE_ID as type_id, + null as ma_id, + ttmt.MACHINES_NUM as repair_num, + ttmt2.MACHINES_NUM as scrap_num, + ttmt.MACHINES_NUM - ttmt2.MACHINES_NUM as repaired_num, + bic.IS_SURE as status, + bic.CREATE_TIME, + bic.CREATOR as create_by + FROM + ba_ma_input_check bic + LEFT JOIN tm_task_ma_type ttmt on bic.ID = ttmt.TASK_ID + LEFT JOIN ba_ma_scarp bms on bic.REPAIR_ID = bms.REPAIR_ID + LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID and ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID + WHERE ttmt.IS_COUNT = 1 + """ + + print("正在执行复杂查询,请稍候...") + + # 执行查询获取源数据 + df = pd.read_sql(sql, source_engine) + + print(f"查询完成,获取到 {len(df)} 条原始记录") + + # 数据转换和字段映射 + result = pd.DataFrame() + result['task_id'] = df['task_id'] + result['repair_id'] = df['repair_id'] + result['ma_id'] = df['ma_id'] # 可能为null + result['type_id'] = df['type_id'] + result['repair_num'] = df['repair_num'] + result['repaired_num'] = df['repaired_num'] + result['scrap_num'] = df['scrap_num'] + result['create_by'] = df['create_by'] + result['create_time'] = df['CREATE_TIME'] + result['status'] = df['status'] + + # 数据清洗 + # 移除关键字段为空的记录 + before_clean = len(result) + result = result.dropna(subset=['task_id', 'repair_id', 'type_id']) + after_clean = len(result) + + if before_clean != after_clean: + print(f"数据清洗:移除了 {before_clean - after_clean} 条关键字段为空的记录") + + # 处理数值字段的空值,设为0 + numeric_fields = ['repair_num', 'repaired_num', 'scrap_num'] + for field in numeric_fields: + result[field] = result[field].fillna(0) + + # 写入目标表 + result.to_sql('repair_audit_details', target_engine, + if_exists='append', index=False) + + print(f"成功转换并导入 {len(result)} 条记录到 repair_audit_details") + + # 生成统计报告 + generate_statistics_report(result) + + return True + + except Exception as e: + print(f"处理 repair_audit_details 时发生错误: {str(e)}") + import traceback + print("详细错误信息:") + traceback.print_exc() + return False + + +def generate_statistics_report(result): + """生成详细的统计报告""" + print("\n=== 数据统计报告 ===") + + # 基本统计 + print(f"总记录数: {len(result)}") + + # 按状态分组统计 + print(f"\n审核状态分布:") + status_counts = result['status'].value_counts() + for status, count in status_counts.items(): + status_desc = "已审核" if status == 1 else "待审核" + print(f" - status = {status} ({status_desc}): {count} 条") + + # 有ma_id和无ma_id的记录分布 + print(f"\n设备记录类型分布:") + ma_id_not_null = result['ma_id'].notna().sum() + ma_id_null = result['ma_id'].isna().sum() + print(f" - 具体设备记录 (ma_id不为空): {ma_id_not_null} 条") + print(f" - 设备类型记录 (ma_id为空): {ma_id_null} 条") + + # 数量统计 + print(f"\n维修数量统计:") + print(f" - 总维修数量: {result['repair_num'].sum()}") + print(f" - 总修复数量: {result['repaired_num'].sum()}") + print(f" - 总报废数量: {result['scrap_num'].sum()}") + + # 按repair_id分组统计 + repair_groups = result.groupby('repair_id').size() + print(f"\n维修单分布:") + print(f" - 涉及维修单数量: {len(repair_groups)}") + print(f" - 平均每个维修单记录数: {repair_groups.mean():.2f}") + + +def validate_data(): + """验证迁移数据的完整性和逻辑一致性""" + try: + print("\n=== 开始数据验证 ===") + + # 检查目标表记录数 + target_count_sql = "SELECT COUNT(*) as total_count FROM repair_audit_details" + target_count = pd.read_sql(target_count_sql, target_engine)['total_count'].iloc[0] + print(f"目标表总记录数: {target_count}") + + # 检查数据逻辑一致性 + validation_sql = """ + SELECT + COUNT(*) as total_records, + COUNT(CASE WHEN ma_id IS NOT NULL THEN 1 END) as with_ma_id, + COUNT(CASE WHEN ma_id IS NULL THEN 1 END) as without_ma_id, + SUM(repair_num) as total_repair, + SUM(repaired_num) as total_repaired, + SUM(scrap_num) as total_scrapped + FROM repair_audit_details + """ + + validation_result = pd.read_sql(validation_sql, target_engine) + print(f"\n逻辑验证结果:") + print(f" - 总记录数: {validation_result['total_records'].iloc[0]}") + print(f" - 有设备ID记录: {validation_result['with_ma_id'].iloc[0]}") + print(f" - 无设备ID记录: {validation_result['without_ma_id'].iloc[0]}") + print(f" - 总维修数: {validation_result['total_repair'].iloc[0]}") + print(f" - 总修复数: {validation_result['total_repaired'].iloc[0]}") + print(f" - 总报废数: {validation_result['total_scrapped'].iloc[0]}") + + # 检查数据完整性 + print(f"\n✅ 数据验证完成") + return True + + except Exception as e: + print(f"数据验证时发生错误: {str(e)}") + return False + + +def clear_existing_data(): + """清理目标表中的现有数据(可选)""" + try: + confirmation = input("是否要清理目标表 repair_audit_details 中的现有数据?(y/N): ") + if confirmation.lower() == 'y': + with target_engine.connect() as conn: + result = conn.execute("DELETE FROM repair_audit_details") + print(f"已清理 {result.rowcount} 条现有数据") + return True + else: + print("跳过数据清理") + return True + except Exception as e: + print(f"清理数据时发生错误: {str(e)}") + return False + + +if __name__ == "__main__": + print("=== repair_audit_details 审核明细数据迁移 ===") + print("注意:此脚本将执行复杂的UNION查询,可能需要较长时间") + print("开始执行数据迁移...") + + # 可选:清理现有数据 + if clear_existing_data(): + # 执行数据迁移 + if process_repair_audit_details(): + # 验证数据完整性 + validate_data() + print("\n=== 迁移完成 ===") + else: + print("\n=== 迁移失败 ===") + else: + print("\n=== 迁移中止 ===") \ No newline at end of file diff --git a/machines/tm_task.py b/machines/tm_task.py new file mode 100644 index 0000000..90186b9 --- /dev/null +++ b/machines/tm_task.py @@ -0,0 +1,155 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus + +# @author: 阮世耀 +# 描述:处理ba_ma_input_check表数据到tm_task表的迁移 + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +def process_tm_task(): + """处理tm_task表数据迁移""" + try: + # 源表查询SQL + sql = """ + SELECT + bic.ID as task_id, + 5 as task_type, + IF(bic.IS_SURE = 1, 11, 10) as task_status, + bmr.APPLY_NUMBER as code, + tt.CREATOR, + tt.CREATE_TIME + FROM + ba_ma_input_check bic + LEFT JOIN tm_task tt on bic.ID = tt.ID + LEFT JOIN ba_ma_repair bmr on bic.REPAIR_ID = bmr.ID + """ + + # 执行查询获取源数据 + df = pd.read_sql(sql, source_engine) + + # 数据转换和字段映射 + result = pd.DataFrame() + result['task_id'] = df['task_id'] + result['task_type'] = df['task_type'] + result['task_status'] = df['task_status'] + result['code'] = df['code'] + result['create_by'] = df['CREATOR'] + result['create_time'] = df['CREATE_TIME'] + + # 数据清洗:移除空值行 + result = result.dropna(subset=['task_id']) + + # 写入目标表 + result.to_sql('tm_task', target_engine, + if_exists='append', index=False) + + print(f"成功转换并导入 {len(result)} 条记录到 tm_task") + print(f"任务类型分布:") + print(f" - task_type = 5: {len(result)} 条") + print(f"任务状态分布:") + status_counts = result['task_status'].value_counts() + for status, count in status_counts.items(): + status_desc = "已确认" if status == 11 else "待确认" + print(f" - task_status = {status} ({status_desc}): {count} 条") + + return True + + except Exception as e: + print(f"处理 tm_task 时发生错误: {str(e)}") + return False + + +def validate_data(): + """验证迁移数据的完整性""" + try: + # 检查源数据总数 + source_count_sql = """ + SELECT COUNT(*) as total_count + FROM ba_ma_input_check bic + LEFT JOIN tm_task tt on bic.ID = tt.ID + LEFT JOIN ba_ma_repair bmr on bic.REPAIR_ID = bmr.ID + """ + source_count = pd.read_sql(source_count_sql, source_engine)['total_count'].iloc[0] + + # 检查目标数据总数 + target_count_sql = "SELECT COUNT(*) as total_count FROM tm_task WHERE task_type = 5" + target_count = pd.read_sql(target_count_sql, target_engine)['total_count'].iloc[0] + + print(f"\n数据验证结果:") + print(f"源数据记录数: {source_count}") + print(f"目标数据记录数: {target_count}") + + if source_count == target_count: + print("✅ 数据迁移完整性验证通过") + return True + else: + print("❌ 数据迁移完整性验证失败,记录数不匹配") + return False + + except Exception as e: + print(f"数据验证时发生错误: {str(e)}") + return False + + +def clear_existing_data(): + """清理目标表中的现有数据(可选)""" + try: + confirmation = input("是否要清理目标表中 task_type=5 的现有数据?(y/N): ") + if confirmation.lower() == 'y': + with target_engine.connect() as conn: + result = conn.execute("DELETE FROM tm_task WHERE task_type = 5") + print(f"已清理 {result.rowcount} 条现有数据") + return True + else: + print("跳过数据清理") + return True + except Exception as e: + print(f"清理数据时发生错误: {str(e)}") + return False + + +if __name__ == "__main__": + print("=== tm_task 审核表数据迁移 ===") + print("开始执行数据迁移...") + + # 可选:清理现有数据 + if clear_existing_data(): + # 执行数据迁移 + if process_tm_task(): + # 验证数据完整性 + validate_data() + print("\n=== 迁移完成 ===") + else: + print("\n=== 迁移失败 ===") + else: + print("\n=== 迁移中止 ===") \ No newline at end of file