diff --git a/安全/bm_unit.py b/安全/bm_unit.py index 80e5d77..68aad8b 100644 --- a/安全/bm_unit.py +++ b/安全/bm_unit.py @@ -70,6 +70,7 @@ def process_bm_project_dept(): result['unit_id'] = df['ID'] + 4000 result['unit_name'] = df['NAME'] result['type_id'] = 36 # 项目部固定值 + df['COMPANY_ID'] = df['COMPANY_ID'].astype(int) result['dept_id'] = df['COMPANY_ID'].map(dept_id_mapping) result['create_time'] = df['CREATE_TIME'] @@ -84,6 +85,7 @@ def process_bm_team_info(): result['unit_id'] = df['ID'] + 5000 result['unit_name'] = df['NAME'] result['type_id'] = 33 # 施工队固定值 + df['COMPANY_ID'] = df['COMPANY_ID'].astype(int) result['dept_id'] = df['COMPANY_ID'].map(dept_id_mapping) result['create_time'] = df['CREATE_TIME'] @@ -111,7 +113,7 @@ def process_bm_rear_service(): result['unit_id'] = df['id'] + 7000 result['unit_name'] = df['rear_name'] result['type_id'] = 1685 # 后勤科室固定值 - result['dept_id'] = df['company_id'] # 直接复制 + result['dept_id'] = df['company_id'].map(dept_id_mapping) # 直接复制 # 注意:此表没有create_time字段 return result diff --git a/安全/config.ini b/安全/config.ini index 23c7d63..267b5cf 100644 --- a/安全/config.ini +++ b/安全/config.ini @@ -9,7 +9,7 @@ port = 13306 host = localhost user = root password = Bonus@caiqi0802! -database = test_jiju +database = bns_cloud_material port = 13306 [settings] diff --git a/安全/ma_type.py b/安全/ma_type.py index 94c43db..74f363d 100644 --- a/安全/ma_type.py +++ b/安全/ma_type.py @@ -1,6 +1,7 @@ import configparser import pandas as pd from sqlalchemy import create_engine +from sqlalchemy import text from urllib.parse import quote_plus # 读取配置文件 config = configparser.ConfigParser() @@ -66,7 +67,7 @@ def process_machine_types(): # 按照规则转换数据 result = pd.DataFrame() result['type_id'] = df['ID'] + 6000 # ID加6000 - result['parent_id'] = df['PARENT_ID'] + result['parent_id'] = df['PARENT_ID'] + 6000 result['type_name'] = df['NAME'] result['level'] = df['LEVEL'] result['unit_name'] = df['UNIT_ID'].map(unit_mapping) # 单位映射 @@ -97,12 +98,16 @@ def process_machine_types(): # 更新现有库存和租赁单价(根据type_id) with target_engine.connect() as conn: # 更新storage_num - conn.execute(""" - UPDATE ma_type t, ma_machine_type s + # 更新 storage_num + conn.execute(text(f""" + UPDATE {target_config['database']}.ma_type t + JOIN {source_config['database']}.ma_machine_type s + ON t.type_id = s.ID + 6000 SET t.storage_num = s.STORAGE_NUM, - t.lease_price = s.LEASE_PRICE - WHERE t.type_id = s.ID + 6000 - """) + t.lease_price = s.RENTAL_PRICE + """)) + conn.commit() + print("已更新现有库存和租赁单价") return True diff --git a/安全/领料.py b/安全/领料.py index a3f794e..7decf8c 100644 --- a/安全/领料.py +++ b/安全/领料.py @@ -45,7 +45,6 @@ UNIT_TYPE_MAPPING = { def process_tm_task(): """处理任务表数据""" try: - # 执行转换SQL sql = """ SELECT (be.ID + 500000) as task_id, @@ -63,12 +62,9 @@ def process_tm_task(): """ df = pd.read_sql(sql, source_engine) - # 写入目标表 df.to_sql('tm_task', target_engine, if_exists='append', index=False) - print(f"成功导入 {len(df)} 条任务数据到tm_task表") return True - except Exception as e: print(f"处理任务数据时出错: {str(e)}") return False @@ -77,7 +73,6 @@ def process_tm_task(): def process_tm_task_agreement(): """处理任务协议关联表数据""" try: - # 第一步:查询基础数据 base_sql = """ SELECT (be.ID + 500000) as task_id, @@ -95,58 +90,55 @@ def process_tm_task_agreement(): """ base_df = pd.read_sql(base_sql, source_engine) - # 准备存储结果 results = [] skipped_records = 0 - # 对每条记录处理 + for _, row in base_df.iterrows(): try: - # 确定unit_id和type_code + # 转换字段为整数(遇到非数字自动变 NaN -> 转成 None) + dept_id = pd.to_numeric(row['DEPT_ID'], errors='coerce') + sub_id = pd.to_numeric(row['SUB_ID'], errors='coerce') + team_id = pd.to_numeric(row['TEAM_ID'], errors='coerce') + rear_id = pd.to_numeric(row['REAR_ID'], errors='coerce') + project_id = pd.to_numeric(row['PROJECT_ID'], errors='coerce') + unit_id = None type_code = None - if row['TYPE'] == 6: # 施工队 - if row['TEAM_ID'] > 0: - unit_id = int(row['TEAM_ID']) - type_code = 'sgd' - elif row['TYPE'] == 7: # 项目部 - if row['DEPT_ID'] > 0: - unit_id = int(row['DEPT_ID']) - type_code = 'xmb' - elif row['TYPE'] == 122: # 分包商 - if row['SUB_ID'] > 0: - unit_id = int(row['SUB_ID']) - type_code = 'fbs' - elif row['TYPE'] == 126: # 后勤 - if row['REAR_ID'] > 0: - unit_id = int(row['REAR_ID']) - type_code = 'hq' + if row['TYPE'] == 6 and pd.notna(team_id) and team_id > 0: + unit_id = int(team_id) + type_code = 'sgd' + elif row['TYPE'] == 7 and pd.notna(dept_id) and dept_id > 0: + unit_id = int(dept_id) + type_code = 'xmb' + elif row['TYPE'] == 122 and pd.notna(sub_id) and sub_id > 0: + unit_id = int(sub_id) + type_code = 'fbs' + elif row['TYPE'] == 126 and pd.notna(rear_id) and rear_id > 0: + unit_id = int(rear_id) + type_code = 'hq' - # 检查必要参数是否有效 - if not unit_id or not type_code or row['PROJECT_ID'] <= 0: + if not unit_id or not type_code or pd.isna(project_id) or project_id <= 0: skipped_records += 1 continue - # 使用参数化查询避免字符串拼接问题 - agreement_id = get_agreement_id(unit_id, int(row['PROJECT_ID']), type_code) + agreement_id = get_agreement_id(unit_id, int(project_id), type_code) if agreement_id: - if agreement_id: - results.append({ - 'task_id': int(row['task_id']), - 'agreement_id': int(agreement_id) - }) + results.append({ + 'task_id': int(row['task_id']), + 'agreement_id': int(agreement_id) + }) except Exception as e: print(f"查询agreement_id时出错: {str(e)}") continue - # 转换为DataFrame并写入 if results: result_df = pd.DataFrame(results) result_df.to_sql('tm_task_agreement', target_engine, if_exists='append', index=False) print(f"成功导入 {len(result_df)} 条任务协议关联数据到tm_task_agreement表") return True else: - print("没有找到匹配的任务协议关联数据") + print(f"没有找到匹配的任务协议关联数据,跳过 {skipped_records} 条记录") return False except Exception as e: @@ -175,7 +167,6 @@ def get_agreement_id(unit_id, project_id, type_code): def process_lease_apply_info(): """处理租赁申请信息表数据""" try: - # 执行转换SQL sql = """ SELECT (be.ID + 500000) as id, @@ -194,13 +185,9 @@ def process_lease_apply_info(): be.DEFINITION_ID = 1 """ df = pd.read_sql(sql, source_engine) - - # 写入目标表 df.to_sql('lease_apply_info', target_engine, if_exists='append', index=False) - print(f"成功导入 {len(df)} 条租赁申请信息数据") return True - except Exception as e: print(f"处理租赁申请信息时出错: {str(e)}") return False @@ -209,7 +196,6 @@ def process_lease_apply_info(): def process_lease_apply_details(): """处理租赁申请明细数据""" try: - # 执行转换SQL sql = """ SELECT (be.ID + 500000) as parent_id, @@ -227,13 +213,9 @@ def process_lease_apply_details(): bpf.ID """ df = pd.read_sql(sql, source_engine) - - # 写入目标表 df.to_sql('lease_apply_details', target_engine, if_exists='append', index=False) - print(f"成功导入 {len(df)} 条租赁申请明细数据") return True - except Exception as e: print(f"处理租赁申请明细时出错: {str(e)}") return False @@ -242,7 +224,6 @@ def process_lease_apply_details(): def process_lease_out_details(): """处理租赁出库明细数据""" try: - # 执行转换SQL sql = """ SELECT (bfd.EXAMPLE_ID + 500000) as parent_id, @@ -256,20 +237,15 @@ def process_lease_out_details(): LEFT JOIN pm_user pu on bfd.CREATOR = pu.ID """ df = pd.read_sql(sql, source_engine) - - # 写入目标表 df.to_sql('lease_out_details', target_engine, if_exists='append', index=False) - print(f"成功导入 {len(df)} 条租赁出库明细数据") return True - except Exception as e: print(f"处理租赁出库明细时出错: {str(e)}") return False if __name__ == "__main__": - # 执行所有转换流程 results = [ process_tm_task(), process_tm_task_agreement(), diff --git a/机具/bm_agreement_info.py b/机具/bm_agreement_info.py index 101af61..3fea15d 100644 --- a/机具/bm_agreement_info.py +++ b/机具/bm_agreement_info.py @@ -78,7 +78,8 @@ def transform_and_load_agreement(config_file_path): target_df['create_time'] = source_df['CREATE_TIME'] # 替换字段(IS_BALANCE → is_slt) - target_df['is_slt'] = source_df['IS_BALANCE'].map(is_slt_mapping) + # 替换字段(IS_BALANCE → is_slt) + target_df['is_slt'] = source_df['IS_BALANCE'].astype(int).map(is_slt_mapping) # 检查未映射的值 if target_df['is_slt'].isna().any(): diff --git a/机具/config.ini b/机具/config.ini index 73013e0..29dd872 100644 --- a/机具/config.ini +++ b/机具/config.ini @@ -1,16 +1,15 @@ [source_db] -host = 192.168.1.114 -user = boot -password = boot@123 +host = localhost +user = root +password = Bonus@caiqi0802! database = newimt port = 13306 [target_db] -host = 192.168.1.114 -user = boot -password = boot@123 -database = test_jiju +host = 10.138.55.133 +user = prod_jiju +password = bns@sbdahjj2025 +database = bns_jiju_material port = 13306 - [settings] rule_file = C:\Users\bonus-lvjilong\Desktop\机具系统数据迁移\ma_supplier_info.xlsx diff --git a/机具/新购机具.py b/机具/新购机具.py index f73a596..e917363 100644 --- a/机具/新购机具.py +++ b/机具/新购机具.py @@ -80,7 +80,7 @@ def process_purchase_check_info(): LEFT JOIN tm_task tt on bms.ID = tt.ID LEFT JOIN tm_task_status tts on tt.`STATUS` = tts.`CODE` LEFT JOIN tm_task_ma_type ttmt on bms.ID = ttmt.TASK_ID - WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() + WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() AND ttmt.MA_TYPE_ID IS NOT NULL GROUP BY bms.ID """ df = pd.read_sql(sql, source_engine) @@ -116,7 +116,7 @@ def process_purchase_check_details(): ba_ma_shop bms LEFT JOIN tm_task tt on bms.ID = tt.ID LEFT JOIN tm_task_ma_type ttmt on bms.ID = ttmt.TASK_ID - WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() + WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() AND ttmt.MA_TYPE_ID IS NOT NULL GROUP BY bms.ID,ttmt.MA_TYPE_ID """ df = pd.read_sql(sql, source_engine) diff --git a/机具/新购配件.py b/机具/新购配件.py index 5277128..c3679f0 100644 --- a/机具/新购配件.py +++ b/机具/新购配件.py @@ -84,7 +84,8 @@ def process_purchase_part_info(): GROUP BY bps.ID """ df = pd.read_sql(sql, source_engine) - + df['purchase_time'] = df['purchase_time'].replace({'': None}) + df['arrival_time'] = df['arrival_time'].replace({'': None}) # 写入目标表 df.to_sql('purchase_part_info', target_engine, if_exists='append', index=False) @@ -110,7 +111,7 @@ def process_purchase_part_details(): ttmt.MANUFACTURER_ID as supplier_id, IF(ttmt.MACHINES_NUM - IFNULL(ttmt.INPUT_NUM,0) = 0,1,0) as status, bps.BUYER as create_by, - tt.CREATE_TIME as as create_time + tt.CREATE_TIME as create_time FROM ba_pa_shop bps LEFT JOIN tm_task tt on bps.ID = tt.ID diff --git a/机具/维修.py b/机具/维修.py index 4d97539..ed01617 100644 --- a/机具/维修.py +++ b/机具/维修.py @@ -106,7 +106,7 @@ def process_repair_apply_details(): SELECT bmr.ID as task_id, ttmt.MA_TYPE_ID as type_id, - NULL as ma_id, + null as ma_id, ttmt.MACHINES_NUM as repair_num, ttmt.ACTUAL_NUM as repaired_num, ttmt2.MACHINES_NUM as scrap_num, @@ -121,7 +121,7 @@ def process_repair_apply_details(): LEFT JOIN tm_task_ma_type ttmt on bmr.ID = ttmt.TASK_ID LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID - WHERE ttmt.IS_COUNT = 1 + WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519) GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID """ @@ -147,6 +147,7 @@ FROM LEFT JOIN ba_ma_repair bmr ON brp.repair_id = bmr.ID LEFT JOIN tm_task tt ON bmr.ID = tt.ID LEFT JOIN ma_machines mm ON brp.ma_id = mm.id + WHERE bmr.ID not in(519) """ # 来源3:编码申请维修含报废 @@ -169,22 +170,65 @@ FROM LEFT JOIN tm_task tt on bmr.ID = tt.ID LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID LEFT JOIN ma_machines mm on tmsr.ma_id = mm.id - WHERE tmsr.IS_COUNT = 0 + WHERE tmsr.IS_COUNT = 0 and bmr.ID not in(519) """ # 合并三个来源的数据 - df1 = pd.read_sql(sql1, source_engine) + #df1 = pd.read_sql(sql1, source_engine) df2 = pd.read_sql(sql2, source_engine) df3 = pd.read_sql(sql3, source_engine) - df = pd.concat([df1, df2, df3], ignore_index=True) + df = pd.concat([ df2, df3], ignore_index=True) + + # 过滤掉type_id为NULL的记录 + initial_count = len(df) + df = df[df['type_id'].notna()] + filtered_count = len(df) + + if initial_count > filtered_count: + print(f"过滤掉了 {initial_count - filtered_count} 条type_id为NULL的记录") # 转换数值字段 num_cols = ['repair_num', 'repaired_num', 'scrap_num', 'status', 'is_ds'] for col in num_cols: df[col] = safe_convert_to_int(df[col]) - df.to_sql('repair_apply_details', target_engine, if_exists='append', index=False) - print(f"成功导入 {len(df)} 条维修申请明细(合并3个来源)") + # 检查目标表中已存在的记录 + check_existing_sql = """ + SELECT task_id, ma_id, type_id + FROM repair_apply_details + """ + existing_records = pd.read_sql(check_existing_sql, target_engine) + + # 创建唯一标识符进行比较 + if not existing_records.empty: + # 为现有记录创建复合键 + existing_records['composite_key'] = existing_records['task_id'].astype(str) + '_' + \ + existing_records['ma_id'].astype(str) + '_' + \ + existing_records['type_id'].astype(str) + + # 为新记录创建复合键 + df['composite_key'] = df['task_id'].astype(str) + '_' + \ + df['ma_id'].astype(str) + '_' + \ + df['type_id'].astype(str) + + # 过滤掉已存在的记录 + before_dedup_count = len(df) + df = df[~df['composite_key'].isin(existing_records['composite_key'])] + after_dedup_count = len(df) + + # 移除临时列 + df = df.drop(columns=['composite_key']) + + if before_dedup_count > after_dedup_count: + print(f"去除了 {before_dedup_count - after_dedup_count} 条已存在的记录") + + # 插入新记录 + if not df.empty: + df.to_sql('repair_apply_details', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条维修申请明细(合并3个来源)") + else: + print("没有新的维修申请明细需要导入") + return True except Exception as e: print(f"处理维修申请明细时出错: {str(e)}") @@ -212,7 +256,7 @@ def process_repair_apply_record(): LEFT JOIN tm_task_ma_type ttmt on bmr.ID = ttmt.TASK_ID LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID - WHERE ttmt.IS_COUNT = 1 + WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519) GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID HAVING repair_num > 0 """ @@ -236,7 +280,7 @@ def process_repair_apply_record(): LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID - WHERE ttmt.IS_COUNT = 1 + WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519) GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID HAVING scrap_num > 0 """ @@ -258,6 +302,7 @@ def process_repair_apply_record(): ba_ma_repair_pass brp LEFT JOIN ba_ma_repair bmr on brp.repair_id = bmr.ID LEFT JOIN ma_machines mm on brp.ma_id = mm.id + WHERE bmr.ID not in(519) """ # 来源4:编码维修报废 @@ -278,7 +323,7 @@ def process_repair_apply_record(): LEFT JOIN ba_ma_repair bmr on bms.repair_id = bmr.ID LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID LEFT JOIN ma_machines mm on tmsr.ma_id = mm.id - WHERE tmsr.IS_COUNT = 0 + WHERE tmsr.IS_COUNT = 0 and bmr.ID not in(519) """ # 合并四个来源的数据 @@ -336,11 +381,11 @@ def process_repair_cost(): if __name__ == "__main__": # 执行所有转换流程 processes = [ - process_tm_task, - process_tm_task_agreement, + #process_tm_task, + #process_tm_task_agreement, process_repair_apply_details, - process_repair_apply_record, - process_repair_cost + #process_repair_apply_record, + #process_repair_cost ] success = all([p() for p in processes])