import pandas as pd import sqlalchemy from sqlalchemy import create_engine,text import configparser import os from datetime import datetime from urllib.parse import quote_plus def get_db_connection_string(config, section): """从配置文件中构建数据库连接字符串""" return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" def transform_and_load_ma_machines(config_file_path): """ 从源数据库提取ma_machines数据,转换后加载到目标数据库ma_machine :param config_file_path: 配置文件路径 """ # 读取配置文件 if not os.path.exists(config_file_path): raise FileNotFoundError(f"配置文件不存在: {config_file_path}") config = configparser.ConfigParser() config.read(config_file_path) # 定义状态替换映射 status_mapping = { 1: 0, # 待通知 → 0 2: 0, # 待检验 → 0 3: 0, # 待打印 → 0 4: 0, # 待入库 → 0 5: 1, # 在库 → 1 6: 2, # 在用 → 2 7: 3, # 在修 → 3 8: 3, # 在试 → 3 9: 5, # 修试后待入库 → 5 10: 7, # 待报废 → 7 11: 8, # 已报废 → 8 12: 7, # 报废封存 → 7 13: 4, # 在检 → 4 14: 4, # 在审 → 4 15: None, # 移交 → NULL(根据业务需求处理) 16: 17, # 报废检验 → 17 17: 17, # 封存检验 → 17 18: 18, # 报备丢失 → 18 19: 18 # 结算丢失 → 18 } try: # 获取数据库连接 source_conn_str = get_db_connection_string(config, 'source_db') target_conn_str = get_db_connection_string(config, 'target_db') source_engine = create_engine(source_conn_str) target_engine = create_engine(target_conn_str) # 从源表读取数据(过滤COMPANY_ID=1的记录) print("正在从源表ma_machines读取数据...") source_query = """ SELECT ID, \ TYPE, \ BATCH_STATUS, \ DEVICE_CODE, \ OUT_FAC_TIME, \ model_name, OUT_FAC_NUM, \ VENDER_ID, \ QRCODE, \ ASSETS_NUM, \ THIS_CHECK_TIME, THIS_CHECK_MAN, \ NEXT_CHECK_TIME, \ CYCLE_NUM FROM ma_machines WHERE COMPANY_ID = 1 \ """ source_df = pd.read_sql(source_query, source_engine) if source_df.empty: print("没有符合条件的数据需要转换") return print(f"读取到{len(source_df)}条待转换数据") # 数据转换 print("正在进行数据转换...") target_df = pd.DataFrame() # 直接复制字段 target_df['ma_id'] = source_df['ID'] target_df['type_id'] = source_df['TYPE'] target_df['ma_code'] = source_df['DEVICE_CODE'] target_df['out_fac_time'] = pd.to_datetime(source_df['OUT_FAC_TIME'], errors='coerce') target_df['machine_name'] = source_df['model_name'] target_df['out_fac_code'] = source_df['OUT_FAC_NUM'] target_df['ma_vender'] = source_df['VENDER_ID'] target_df['qr_code'] = source_df['QRCODE'] target_df['assets_code'] = source_df['ASSETS_NUM'] target_df['this_check_time'] = pd.to_datetime(source_df['THIS_CHECK_TIME'], errors='coerce') target_df['check_man'] = source_df['THIS_CHECK_MAN'] target_df['next_check_time'] = pd.to_datetime(source_df['NEXT_CHECK_TIME'], errors='coerce') target_df['in_out_num'] = source_df['CYCLE_NUM'] # 替换状态字段 target_df['ma_status'] = source_df['BATCH_STATUS'].map(status_mapping) # 检查数据质量 print("\n数据质量检查:") print(f"- 无效出厂时间记录: {target_df['out_fac_time'].isna().sum()}") print(f"- 无效本次检验时间记录: {target_df['this_check_time'].isna().sum()}") print(f"- 无效下次检验时间记录: {target_df['next_check_time'].isna().sum()}") print(f"- 未映射的状态值: {source_df[target_df['ma_status'].isna()]['BATCH_STATUS'].unique()}") # 写入目标表 print("\n正在写入目标表ma_machine...") # 步骤1:清空目标表 with target_engine.connect() as conn: conn.execute(text("TRUNCATE TABLE ma_machine")) # 注意需要从sqlalchemy导入text conn.commit() # 显式提交事务 # 步骤2:写入去重后的数据 target_df.drop_duplicates(['type_id', 'ma_code']).to_sql( 'ma_machine', target_engine, if_exists='append', index=False ) target_df.to_sql( 'ma_machine', target_engine, if_exists='append', index=False, dtype={ 'ma_id': sqlalchemy.types.INTEGER(), 'type_id': sqlalchemy.types.INTEGER(), 'ma_status': sqlalchemy.types.SmallInteger(), 'ma_code': sqlalchemy.types.VARCHAR(length=50), 'out_fac_time': sqlalchemy.types.DateTime(), 'machine_name': sqlalchemy.types.VARCHAR(length=100), 'out_fac_code': sqlalchemy.types.VARCHAR(length=50), 'ma_vender': sqlalchemy.types.INTEGER(), 'qr_code': sqlalchemy.types.VARCHAR(length=100), 'assets_code': sqlalchemy.types.VARCHAR(length=50), 'this_check_time': sqlalchemy.types.DateTime(), 'check_man': sqlalchemy.types.VARCHAR(length=50), 'next_check_time': sqlalchemy.types.DateTime(), 'in_out_num': sqlalchemy.types.INTEGER() } ) print(f"成功写入{len(target_df)}条数据") except Exception as e: print(f"\n处理过程中发生错误: {str(e)}") raise finally: if 'source_engine' in locals(): source_engine.dispose() if 'target_engine' in locals(): target_engine.dispose() if __name__ == "__main__": # 配置文件路径 config_file = "config.ini" try: transform_and_load_ma_machines(config_file) except Exception as e: print(f"程序执行失败: {str(e)}")