From 7d85515b2e28132245571e5490e1727817f76b9c Mon Sep 17 00:00:00 2001 From: jiang Date: Tue, 5 Aug 2025 13:49:50 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A6=96=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 机具/bm_agreement_info.py | 129 +++++++++++++++++++++++++++++ 机具/bm_project.py | 150 +++++++++++++++++++++++++++++++++ 机具/bm_qrcode_info.py | 121 +++++++++++++++++++++++++++ 机具/bm_unit.py | 142 +++++++++++++++++++++++++++++++ 机具/config.ini | 16 ++++ 机具/ma_machine.py | 170 ++++++++++++++++++++++++++++++++++++++ 机具/ma_part_type.py | 133 +++++++++++++++++++++++++++++ 机具/ma_supplier_info.py | 126 ++++++++++++++++++++++++++++ 机具/ma_type.py | 143 ++++++++++++++++++++++++++++++++ 9 files changed, 1130 insertions(+) create mode 100644 机具/bm_agreement_info.py create mode 100644 机具/bm_project.py create mode 100644 机具/bm_qrcode_info.py create mode 100644 机具/bm_unit.py create mode 100644 机具/config.ini create mode 100644 机具/ma_machine.py create mode 100644 机具/ma_part_type.py create mode 100644 机具/ma_supplier_info.py create mode 100644 机具/ma_type.py diff --git a/机具/bm_agreement_info.py b/机具/bm_agreement_info.py new file mode 100644 index 0000000..101af61 --- /dev/null +++ b/机具/bm_agreement_info.py @@ -0,0 +1,129 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine +import configparser +import os +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_agreement(config_file_path): + """ + 从源数据库提取ba_ma_agreement数据,转换后加载到目标数据库bm_agreement_info + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + # 定义替换规则 + is_slt_mapping = { + 0: 1, # 已结算 → 1 + 1: 0 # 未结算 → 0 + } + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤COMPANY_ID=1的记录) + print("正在从源表ba_ma_agreement读取数据...") + source_query = """ + SELECT ID, \ + CODE, \ + SIGN_DATE, \ + LEASE_COMPANY, \ + PROJECT, \ + START_TIME, + AUTHORIZING_PERSON, \ + AUTHORIZING_PHONE, \ + CONTRACT_NUMBER, + IS_BALANCE, \ + is_push, CREATE_TIME + FROM ba_ma_agreement + WHERE COMPANY_ID = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 直接复制字段 + target_df['agreement_id'] = source_df['ID'] + target_df['agreement_code'] = source_df['CODE'] + target_df['sign_time'] = source_df['SIGN_DATE'] + target_df['unit_id'] = source_df['LEASE_COMPANY'] + target_df['project_id'] = source_df['PROJECT'] + target_df['plan_start_time'] = source_df['START_TIME'] + target_df['auth_person'] = source_df['AUTHORIZING_PERSON'] + target_df['phone'] = source_df['AUTHORIZING_PHONE'] + target_df['contract_code'] = source_df['CONTRACT_NUMBER'] + target_df['is_push'] = source_df['is_push'] + target_df['create_time'] = source_df['CREATE_TIME'] + + # 替换字段(IS_BALANCE → is_slt) + target_df['is_slt'] = source_df['IS_BALANCE'].map(is_slt_mapping) + + # 检查未映射的值 + if target_df['is_slt'].isna().any(): + unmapped = source_df[target_df['is_slt'].isna()]['IS_BALANCE'].unique() + print(f"警告: 发现未映射的IS_BALANCE值: {unmapped}") + + # 写入目标表 + print("正在写入目标表bm_agreement_info...") + target_df.to_sql( + 'bm_agreement_info', + target_engine, + if_exists='append', + index=False, + dtype={ + 'agreement_id': sqlalchemy.types.INTEGER(), + 'agreement_code': sqlalchemy.types.VARCHAR(length=50), + 'sign_time': sqlalchemy.types.DateTime(), + 'unit_id': sqlalchemy.types.INTEGER(), + 'project_id': sqlalchemy.types.INTEGER(), + 'plan_start_time': sqlalchemy.types.DateTime(), + 'auth_person': sqlalchemy.types.VARCHAR(length=50), + 'phone': sqlalchemy.types.VARCHAR(length=20), + 'contract_code': sqlalchemy.types.VARCHAR(length=50), + 'is_slt': sqlalchemy.types.SmallInteger(), + 'is_push': sqlalchemy.types.SmallInteger(), + 'create_time': sqlalchemy.types.DateTime() + } + ) + print(f"成功写入{len(target_df)}条数据") + + except Exception as e: + print(f"处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径(根据实际情况修改) + config_file = "config.ini" + + try: + transform_and_load_agreement(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/bm_project.py b/机具/bm_project.py new file mode 100644 index 0000000..6200713 --- /dev/null +++ b/机具/bm_project.py @@ -0,0 +1,150 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine +import configparser +import os +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """ + 从配置文件中构建数据库连接字符串 + :param config: 配置解析器对象 + :param section: 配置节名(source_db或target_db) + :return: 数据库连接字符串 + """ + db_type = 'mysql' # 假设使用MySQL数据库 + driver = 'pymysql' # MySQL驱动 + + return f"{db_type}+{driver}://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_bm_project(config_file_path): + """ + 从源数据库提取bm_project数据,转换后加载到目标数据库 + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + # 定义替换映射 + imp_unit_mapping = { + 1: 327, # 送电一分公司 → 327 + 2: 102, # 送电二分公司 → 102 + 3: 309, # 宏源变电工程处 → 309 + 5: 338, # 土建分公司 → 338 + 8: 309, # 宏源送电工程处 → 309 + 9: 100, # 变电分公司 → 100 + 10: 101, # 机具(物流)分公司 → 101 + 11: 345, # 外部往来单位 → 345 + 12: 344, # 机械化分公司 → 344 + 13: 346, # 运检分公司 → 346 + 15: 340, # 安徽顺全电力工程有限公司 → 340 + 16: 337, # 检修试验分公司 → 337 + 17: 339, # 安徽顺安电网建设有限公司 → 339 + 18: 342, # 公司机关 → 342 + 21: 341 # 班组管理中心 → 341 + } + + pro_type_mapping = { + 1: 0, # 线路工程 → 0 + 2: 1, # 变电工程 → 1 + 3: 2, # 业务工程 → 2 + 4: 3 # 其他工程 → 3 + } + + try: + # 获取数据库连接字符串 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + + # 创建数据库引擎 + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源数据库读取数据 + print("正在从源数据库读取bm_project表数据...") + source_query = """ + SELECT ID, NAME, NUM, PRO_ID, HTZT, time, COMPANY_ID, TYPE_ID, COMPANY, IS_ACTIVE + FROM bm_project + WHERE COMPANY = 1 AND IS_ACTIVE = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + target_df['pro_id'] = source_df['ID'] # 复制ID → pro_id + target_df['pro_name'] = source_df['NAME'] # 复制NAME → pro_name + target_df['pro_code'] = source_df['NUM'] # 复制NUM → pro_code + target_df['external_id'] = source_df['PRO_ID'] # 复制PRO_ID → external_id + target_df['contract_part'] = source_df['HTZT'] # 复制HTZT → contract_part + target_df['create_time'] = source_df['time'] # 复制time → create_time + + # 替换COMPANY_ID → imp_unit + target_df['imp_unit'] = source_df['COMPANY_ID'].map(imp_unit_mapping) + + # 替换TYPE_ID → pro_type_id + target_df['pro_type_id'] = source_df['TYPE_ID'].map(pro_type_mapping) + + # 检查是否有未映射的值 + if target_df['imp_unit'].isna().any(): + unmapped_units = source_df[target_df['imp_unit'].isna()]['COMPANY_ID'].unique() + print(f"警告: 发现未映射的COMPANY_ID值: {unmapped_units}") + + if target_df['pro_type_id'].isna().any(): + unmapped_types = source_df[target_df['pro_type_id'].isna()]['TYPE_ID'].unique() + print(f"警告: 发现未映射的TYPE_ID值: {unmapped_types}") + + # 写入目标数据库 + print("正在将数据写入目标数据库...") + target_df.to_sql( + 'bm_project', + target_engine, + if_exists='append', + index=False, + dtype={ + 'pro_id': sqlalchemy.types.INTEGER(), + 'pro_name': sqlalchemy.types.VARCHAR(length=255), + 'pro_code': sqlalchemy.types.VARCHAR(length=50), + 'external_id': sqlalchemy.types.VARCHAR(length=50), + 'contract_part': sqlalchemy.types.VARCHAR(length=50), + 'create_time': sqlalchemy.types.DateTime(), + 'imp_unit': sqlalchemy.types.INTEGER(), + 'pro_type_id': sqlalchemy.types.INTEGER() + } + ) + + print(f"成功写入{len(target_df)}条数据到目标数据库") + + except Exception as e: + print(f"处理过程中发生错误: {str(e)}") + raise + finally: + # 关闭数据库连接 + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +# 使用示例 +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" # 假设配置文件在当前目录下 + + try: + # 执行转换 + transform_and_load_bm_project(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/bm_qrcode_info.py b/机具/bm_qrcode_info.py new file mode 100644 index 0000000..3205014 --- /dev/null +++ b/机具/bm_qrcode_info.py @@ -0,0 +1,121 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine, text +import configparser +import os +from datetime import datetime +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_qrcode(config_file_path): + """ + 从源数据库提取qr_code数据,转换后加载到目标数据库bm_qrcode_info + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤COMPANY_ID=1的记录) + print("正在从源表qr_code读取数据...") + source_query = """ + SELECT code, ma_model, vender, is_bind, gen_month, nullif(task_id, '') as task_id + FROM bm_qrcode + WHERE COMPANY_ID = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 字段映射(源字段 → 目标字段) + field_mapping = { + 'code': 'qr_code', + 'ma_model': 'type_id', + 'vender': 'supplier_id', + 'is_bind': 'is_bind', + 'gen_month': 'create_time', + 'task_id': 'task_id' + } + + # 复制字段 + for source_field, target_field in field_mapping.items(): + target_df[target_field] = source_df[source_field] + + # 检查数据质量 + print("\n数据质量检查:") + print(f"- 空qr_code记录: {target_df['qr_code'].isna().sum()}") + print(f"- 空type_id记录: {target_df['type_id'].isna().sum()}") + + # 写入目标表(使用connection避免重复记录问题) + print("\n正在写入目标表bm_qrcode_info...") + with target_engine.begin() as conn: + # 先检查并删除可能重复的qr_code(根据业务需求决定是否保留) + existing_codes = pd.read_sql( + "SELECT qr_code FROM bm_qrcode_info", + conn + )['qr_code'].tolist() + + new_records = target_df[~target_df['qr_code'].isin(existing_codes)] + dup_count = len(target_df) - len(new_records) + if dup_count > 0: + print(f"发现{dup_count}条重复qr_code记录,将自动跳过") + + if not new_records.empty: + new_records.to_sql( + 'bm_qrcode_info', + conn, + if_exists='append', + index=False, + dtype={ + 'qr_code': sqlalchemy.types.VARCHAR(length=100), + 'type_id': sqlalchemy.types.INTEGER(), + 'supplier_id': sqlalchemy.types.INTEGER(), + 'is_bind': sqlalchemy.types.SmallInteger(), + 'create_time': sqlalchemy.types.DateTime(), + 'task_id': sqlalchemy.types.VARCHAR(length=50) + } + ) + print(f"成功写入{len(new_records)}条新数据") + else: + print("没有新数据需要写入") + + except Exception as e: + print(f"\n处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" + + try: + transform_and_load_qrcode(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/bm_unit.py b/机具/bm_unit.py new file mode 100644 index 0000000..9a9ce26 --- /dev/null +++ b/机具/bm_unit.py @@ -0,0 +1,142 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine +import configparser +import os +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """ + 从配置文件中构建数据库连接字符串 + :param config: 配置解析器对象 + :param section: 配置节名(source_db或target_db) + :return: 数据库连接字符串 + """ + db_type = 'mysql' # 假设使用MySQL数据库,可根据实际情况修改 + driver = 'pymysql' # MySQL驱动 + + return f"{db_type}+{driver}://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_bm_unit(config_file_path): + """ + 从源数据库提取bm_unit数据,转换后加载到目标数据库 + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + # 定义替换映射 + type_id_mapping = { + 1: 36, # 项目部 → 36 + 2: 33, # 施工队 → 33 + 3: 32, # 分包商 → 32 + 4: 1685, # 后勤科室 → 1685 + 5: 1704, # 外单位 → 1704 + 6: 1706 # 修试部门 → 1706 + } + + dept_id_mapping = { + 1: 327, # 送电一分公司 → 327 + 2: 102, # 送电二分公司 → 102 + 3: 309, # 宏源变电工程处 → 309 + 5: 338, # 土建分公司 → 338 + 8: 309, # 宏源送电工程处 → 309 + 9: 100, # 变电分公司 → 100 + 10: 101, # 机具(物流)分公司 → 101 + 11: 345, # 外部往来单位 → 345 + 12: 344, # 机械化分公司 → 344 + 13: 346, # 运检分公司 → 346 + 15: 340, # 安徽顺全电力工程有限公司 → 340 + 16: 337, # 检修试验分公司 → 337 + 17: 339, # 安徽顺安电网建设有限公司 → 339 + 18: 342, # 公司机关 → 342 + 21: 341 # 班组管理中心 → 341 + } + + try: + # 获取数据库连接字符串 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + + # 创建数据库引擎 + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源数据库读取数据 + print("正在从源数据库读取bm_unit表数据...") + source_query = """ + SELECT ID, NAME, TYPE_ID, COMPANY_ID, time, COMPANY, IS_ACTIVE + FROM bm_unit + WHERE COMPANY = 1 AND IS_ACTIVE = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + target_df['unit_id'] = source_df['ID'] + target_df['unit_name'] = source_df['NAME'] + target_df['type_id'] = source_df['TYPE_ID'].map(type_id_mapping) + target_df['dept_id'] = source_df['COMPANY_ID'].map(dept_id_mapping) + target_df['create_time'] = source_df['time'] + + # 检查是否有未映射的值 + if target_df['type_id'].isna().any(): + unmapped_types = source_df[target_df['type_id'].isna()]['TYPE_ID'].unique() + print(f"警告: 发现未映射的TYPE_ID值: {unmapped_types}") + + if target_df['dept_id'].isna().any(): + unmapped_depts = source_df[target_df['dept_id'].isna()]['COMPANY_ID'].unique() + print(f"警告: 发现未映射的COMPANY_ID值: {unmapped_depts}") + + # 写入目标数据库 + print("正在将数据写入目标数据库...") + target_df.to_sql( + 'bm_unit', + target_engine, + if_exists='append', + index=False, + dtype={ + 'unit_id': sqlalchemy.types.INTEGER(), + 'unit_name': sqlalchemy.types.VARCHAR(length=255), + 'type_id': sqlalchemy.types.INTEGER(), + 'dept_id': sqlalchemy.types.INTEGER(), + 'create_time': sqlalchemy.types.DateTime() + } + ) + + print(f"成功写入{len(target_df)}条数据到目标数据库") + + except Exception as e: + print(f"处理过程中发生错误: {str(e)}") + raise + finally: + # 关闭数据库连接 + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +# 使用示例 +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" # 假设配置文件在当前目录下 + + try: + # 执行转换 + transform_and_load_bm_unit(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/config.ini b/机具/config.ini new file mode 100644 index 0000000..73013e0 --- /dev/null +++ b/机具/config.ini @@ -0,0 +1,16 @@ +[source_db] +host = 192.168.1.114 +user = boot +password = boot@123 +database = newimt +port = 13306 + +[target_db] +host = 192.168.1.114 +user = boot +password = boot@123 +database = test_jiju +port = 13306 + +[settings] +rule_file = C:\Users\bonus-lvjilong\Desktop\机具系统数据迁移\ma_supplier_info.xlsx diff --git a/机具/ma_machine.py b/机具/ma_machine.py new file mode 100644 index 0000000..c13b1d5 --- /dev/null +++ b/机具/ma_machine.py @@ -0,0 +1,170 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine,text +import configparser +import os +from datetime import datetime +from urllib.parse import quote_plus + + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_ma_machines(config_file_path): + """ + 从源数据库提取ma_machines数据,转换后加载到目标数据库ma_machine + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + # 定义状态替换映射 + status_mapping = { + 1: 0, # 待通知 → 0 + 2: 0, # 待检验 → 0 + 3: 0, # 待打印 → 0 + 4: 0, # 待入库 → 0 + 5: 1, # 在库 → 1 + 6: 2, # 在用 → 2 + 7: 3, # 在修 → 3 + 8: 3, # 在试 → 3 + 9: 5, # 修试后待入库 → 5 + 10: 7, # 待报废 → 7 + 11: 8, # 已报废 → 8 + 12: 7, # 报废封存 → 7 + 13: 4, # 在检 → 4 + 14: 4, # 在审 → 4 + 15: None, # 移交 → NULL(根据业务需求处理) + 16: 17, # 报废检验 → 17 + 17: 17, # 封存检验 → 17 + 18: 18, # 报备丢失 → 18 + 19: 18 # 结算丢失 → 18 + } + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤COMPANY_ID=1的记录) + print("正在从源表ma_machines读取数据...") + source_query = """ + SELECT ID, \ + TYPE, \ + BATCH_STATUS, \ + DEVICE_CODE, \ + OUT_FAC_TIME, \ + model_name, + OUT_FAC_NUM, \ + VENDER_ID, \ + QRCODE, \ + ASSETS_NUM, \ + THIS_CHECK_TIME, + THIS_CHECK_MAN, \ + NEXT_CHECK_TIME, \ + CYCLE_NUM + FROM ma_machines + WHERE COMPANY_ID = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 直接复制字段 + target_df['ma_id'] = source_df['ID'] + target_df['type_id'] = source_df['TYPE'] + target_df['ma_code'] = source_df['DEVICE_CODE'] + target_df['out_fac_time'] = pd.to_datetime(source_df['OUT_FAC_TIME'], errors='coerce') + target_df['machine_name'] = source_df['model_name'] + target_df['out_fac_code'] = source_df['OUT_FAC_NUM'] + target_df['ma_vender'] = source_df['VENDER_ID'] + target_df['qr_code'] = source_df['QRCODE'] + target_df['assets_code'] = source_df['ASSETS_NUM'] + target_df['this_check_time'] = pd.to_datetime(source_df['THIS_CHECK_TIME'], errors='coerce') + target_df['check_man'] = source_df['THIS_CHECK_MAN'] + target_df['next_check_time'] = pd.to_datetime(source_df['NEXT_CHECK_TIME'], errors='coerce') + target_df['in_out_num'] = source_df['CYCLE_NUM'] + + # 替换状态字段 + target_df['ma_status'] = source_df['BATCH_STATUS'].map(status_mapping) + + # 检查数据质量 + print("\n数据质量检查:") + print(f"- 无效出厂时间记录: {target_df['out_fac_time'].isna().sum()}") + print(f"- 无效本次检验时间记录: {target_df['this_check_time'].isna().sum()}") + print(f"- 无效下次检验时间记录: {target_df['next_check_time'].isna().sum()}") + print(f"- 未映射的状态值: {source_df[target_df['ma_status'].isna()]['BATCH_STATUS'].unique()}") + + # 写入目标表 + print("\n正在写入目标表ma_machine...") + # 步骤1:清空目标表 + with target_engine.connect() as conn: + conn.execute(text("TRUNCATE TABLE ma_machine")) # 注意需要从sqlalchemy导入text + conn.commit() # 显式提交事务 + + # 步骤2:写入去重后的数据 + target_df.drop_duplicates(['type_id', 'ma_code']).to_sql( + 'ma_machine', + target_engine, + if_exists='append', + index=False + ) + + target_df.to_sql( + 'ma_machine', + target_engine, + if_exists='append', + index=False, + dtype={ + 'ma_id': sqlalchemy.types.INTEGER(), + 'type_id': sqlalchemy.types.INTEGER(), + 'ma_status': sqlalchemy.types.SmallInteger(), + 'ma_code': sqlalchemy.types.VARCHAR(length=50), + 'out_fac_time': sqlalchemy.types.DateTime(), + 'machine_name': sqlalchemy.types.VARCHAR(length=100), + 'out_fac_code': sqlalchemy.types.VARCHAR(length=50), + 'ma_vender': sqlalchemy.types.INTEGER(), + 'qr_code': sqlalchemy.types.VARCHAR(length=100), + 'assets_code': sqlalchemy.types.VARCHAR(length=50), + 'this_check_time': sqlalchemy.types.DateTime(), + 'check_man': sqlalchemy.types.VARCHAR(length=50), + 'next_check_time': sqlalchemy.types.DateTime(), + 'in_out_num': sqlalchemy.types.INTEGER() + } + ) + print(f"成功写入{len(target_df)}条数据") + + except Exception as e: + print(f"\n处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" + + try: + transform_and_load_ma_machines(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/ma_part_type.py b/机具/ma_part_type.py new file mode 100644 index 0000000..63b47a0 --- /dev/null +++ b/机具/ma_part_type.py @@ -0,0 +1,133 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine, text +import configparser +import os +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_part_type(config_file_path): + """ + 从源数据库提取pa_type数据,转换后加载到目标数据库ma_part_type + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤company_id=1且IS_ACTIVE=1的记录) + print("正在从源表pa_type读取数据...") + source_query = """ + SELECT id, \ + PARENT_ID, \ + NAME, \ + NUM, \ + PRICE, \ + UNIT, \ + LEVEL, \ + IS_CONSUMABLES + FROM pa_type + WHERE company_id = 1 \ + AND IS_ACTIVE = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 字段映射(源字段 → 目标字段) + field_mapping = { + 'id': 'pa_id', + 'PARENT_ID': 'parent_id', + 'NAME': 'pa_name', + 'NUM': 'storage_num', + 'PRICE': 'buy_price', + 'UNIT': 'unit_name', + 'LEVEL': 'level', + 'IS_CONSUMABLES': 'is_comsumable' + } + + # 复制字段 + for source_field, target_field in field_mapping.items(): + target_df[target_field] = source_df[source_field] + + # 检查数据质量 + print("\n数据质量检查:") + print(f"- 空ID记录: {target_df['pa_id'].isna().sum()}") + print(f"- 空名称记录: {target_df['pa_name'].isna().sum()}") + + # 写入目标表(使用事务确保原子性) + print("\n正在写入目标表ma_part_type...") + with target_engine.begin() as conn: + # 检查并处理可能的主键冲突 + existing_ids = pd.read_sql( + "SELECT pa_id FROM ma_part_type", + conn + )['pa_id'].tolist() + + new_records = target_df[~target_df['pa_id'].isin(existing_ids)] + dup_count = len(target_df) - len(new_records) + if dup_count > 0: + print(f"发现{dup_count}条重复主键记录,将自动跳过") + print("重复ID示例:", target_df[target_df['pa_id'].isin(existing_ids)]['pa_id'].head(5).tolist()) + + if not new_records.empty: + new_records.to_sql( + 'ma_part_type', + conn, + if_exists='append', + index=False, + dtype={ + 'pa_id': sqlalchemy.types.INTEGER(), + 'parent_id': sqlalchemy.types.INTEGER(), + 'pa_name': sqlalchemy.types.VARCHAR(length=100), + 'storage_num': sqlalchemy.types.VARCHAR(length=50), + 'buy_price': sqlalchemy.types.DECIMAL(10, 2), + 'unit_name': sqlalchemy.types.VARCHAR(length=20), + 'level': sqlalchemy.types.SmallInteger(), + 'is_comsumable': sqlalchemy.types.SmallInteger() + } + ) + print(f"成功写入{len(new_records)}条新数据") + else: + print("没有新数据需要写入") + + except Exception as e: + print(f"\n处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" + + try: + transform_and_load_part_type(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/ma_supplier_info.py b/机具/ma_supplier_info.py new file mode 100644 index 0000000..f20a27c --- /dev/null +++ b/机具/ma_supplier_info.py @@ -0,0 +1,126 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine, text +import configparser +import os +from urllib.parse import quote_plus + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_supplier(config_file_path): + """ + 从源数据库提取ma_vender数据,转换后加载到目标数据库ma_supplier_info + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤company_id=1且IS_ACTIVE=1的记录) + print("正在从源表ma_vender读取数据...") + source_query = """ + SELECT id, NAME, ADDRESS, COMPANY_MAN, MAIN_PERSON, PHONE, SCOPE_BUSINESS + FROM ma_vender + WHERE company_id = 1 \ + AND IS_ACTIVE = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换 + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 字段映射(源字段 → 目标字段) + field_mapping = { + 'id': 'supplier_id', + 'NAME': 'supplier', + 'ADDRESS': 'address', + 'COMPANY_MAN': 'legal_person', + 'MAIN_PERSON': 'primary_contact', + 'PHONE': 'phone', + 'SCOPE_BUSINESS': 'business_scope' + } + + # 复制字段 + for source_field, target_field in field_mapping.items(): + target_df[target_field] = source_df[source_field] + + # 检查数据质量 + print("\n数据质量检查:") + print(f"- 空供应商ID记录: {target_df['supplier_id'].isna().sum()}") + print(f"- 空供应商名称记录: {target_df['supplier'].isna().sum()}") + print(f"- 无效电话号码记录: {target_df['phone'].str.contains('[^0-9-]').sum()}") + + # 写入目标表(使用事务确保原子性) + print("\n正在写入目标表ma_supplier_info...") + with target_engine.begin() as conn: + # 检查并处理可能的主键冲突 + existing_ids = pd.read_sql( + "SELECT supplier_id FROM ma_supplier_info", + conn + )['supplier_id'].tolist() + + new_records = target_df[~target_df['supplier_id'].isin(existing_ids)] + dup_count = len(target_df) - len(new_records) + if dup_count > 0: + print(f"发现{dup_count}条重复供应商记录,将自动跳过") + print("重复ID示例:", + target_df[target_df['supplier_id'].isin(existing_ids)]['supplier_id'].head(3).tolist()) + + if not new_records.empty: + new_records.to_sql( + 'ma_supplier_info', + conn, + if_exists='append', + index=False, + dtype={ + 'supplier_id': sqlalchemy.types.INTEGER(), + 'supplier': sqlalchemy.types.VARCHAR(length=100), + 'address': sqlalchemy.types.VARCHAR(length=200), + 'legal_person': sqlalchemy.types.VARCHAR(length=50), + 'primary_contact': sqlalchemy.types.VARCHAR(length=50), + 'phone': sqlalchemy.types.VARCHAR(length=20), + 'business_scope': sqlalchemy.types.TEXT() + } + ) + print(f"成功写入{len(new_records)}条新供应商数据") + else: + print("没有新供应商数据需要写入") + + except Exception as e: + print(f"\n处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径 + config_file = "config.ini" + + try: + transform_and_load_supplier(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file diff --git a/机具/ma_type.py b/机具/ma_type.py new file mode 100644 index 0000000..b97aa1e --- /dev/null +++ b/机具/ma_type.py @@ -0,0 +1,143 @@ +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine +import configparser +import os +from urllib.parse import quote_plus + +from datetime import datetime + +def fix_date(date_str): + try: + # 处理只有年月的情况(如"2018-07") + if isinstance(date_str, str) and len(date_str) == 7 and date_str[4] == '-': + return datetime.strptime(date_str + '-01', '%Y-%m-%d') + return date_str + except: + return None # 对于无法解析的日期返回NULL + +def get_db_connection_string(config, section): + """从配置文件中构建数据库连接字符串""" + return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ + f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" + + +def transform_and_load_ma_type(config_file_path): + """ + 从源数据库提取ma_type数据,转换后加载到目标数据库ma_type + :param config_file_path: 配置文件路径 + """ + # 读取配置文件 + if not os.path.exists(config_file_path): + raise FileNotFoundError(f"配置文件不存在: {config_file_path}") + + config = configparser.ConfigParser() + config.read(config_file_path) + + try: + # 获取数据库连接 + source_conn_str = get_db_connection_string(config, 'source_db') + target_conn_str = get_db_connection_string(config, 'target_db') + source_engine = create_engine(source_conn_str) + target_engine = create_engine(target_conn_str) + + # 从源表读取数据(过滤COMPANY_ID=1且IS_ACTIVE=1的记录) + print("正在从源表ma_type读取数据...") + source_query = """ + SELECT ID, \ + PARENT_ID, \ + NAME, \ + LEVEL, \ + UNIT, \ + NUM, \ + nullif(BUY_PRICE,0) as BUY_PRICE, \ + nullif(LEASE_PRICE,0) as LEASE_PRICE, \ + IS_COUNT, \ + RATED_LOAD, \ + TEST_LOAD, \ + HOLDING_TIME, \ + manage_type, + nullif(IS_TEST,0) as IS_TEST, + nullif(TIME,0) as TIME + FROM ma_type + WHERE COMPANY_ID = 1 AND IS_ACTIVE = 1 \ + """ + source_df = pd.read_sql(source_query, source_engine) + + if source_df.empty: + print("没有符合条件的数据需要转换") + return + print(f"读取到{len(source_df)}条待转换数据") + + # 数据转换(直接字段复制) + print("正在进行数据转换...") + target_df = pd.DataFrame() + + # 字段映射(源字段 → 目标字段) + field_mapping = { + 'ID': 'type_id', + 'PARENT_ID': 'parent_id', + 'NAME': 'type_name', + 'LEVEL': 'level', + 'UNIT': 'unit_name', + 'NUM': 'storage_num', + 'BUY_PRICE': 'buy_price', + 'LEASE_PRICE': 'lease_price', + 'IS_COUNT': 'manage_type', + 'RATED_LOAD': 'rated_load', + 'TEST_LOAD': 'test_load', + 'HOLDING_TIME': 'holding_time', + 'manage_type': 'is_enter', + 'IS_TEST': 'is_test', + 'TIME': 'create_time' + } + + for source_field, target_field in field_mapping.items(): + target_df[target_field] = source_df[source_field] + # 应用日期修正 + target_df['create_time'] = source_df['TIME'].apply(fix_date) + # 写入目标表 + print("正在写入目标表ma_type...") + target_df.to_sql( + 'ma_type', + target_engine, + if_exists='append', + index=False, + dtype={ + 'type_id': sqlalchemy.types.INTEGER(), + 'parent_id': sqlalchemy.types.INTEGER(), + 'type_name': sqlalchemy.types.VARCHAR(length=100), + 'level': sqlalchemy.types.SmallInteger(), + 'unit_name': sqlalchemy.types.VARCHAR(length=20), + 'storage_num': sqlalchemy.types.VARCHAR(length=50), + 'buy_price': sqlalchemy.types.DECIMAL(10, 2), + 'lease_price': sqlalchemy.types.DECIMAL(10, 2), + 'manage_type': sqlalchemy.types.SmallInteger(), + 'rated_load': sqlalchemy.types.DECIMAL(10, 2), + 'test_load': sqlalchemy.types.DECIMAL(10, 2), + 'holding_time': sqlalchemy.types.INTEGER(), + 'is_enter': sqlalchemy.types.SmallInteger(), + 'is_test': sqlalchemy.types.SmallInteger(), + 'create_time': sqlalchemy.types.DateTime() + } + ) + print(f"成功写入{len(target_df)}条数据") + + except Exception as e: + print(f"处理过程中发生错误: {str(e)}") + raise + finally: + if 'source_engine' in locals(): + source_engine.dispose() + if 'target_engine' in locals(): + target_engine.dispose() + + +if __name__ == "__main__": + # 配置文件路径(根据实际情况修改) + config_file = "config.ini" + + try: + transform_and_load_ma_type(config_file) + except Exception as e: + print(f"程序执行失败: {str(e)}") \ No newline at end of file