import pandas as pd import sqlalchemy from sqlalchemy import create_engine, text import configparser import os from urllib.parse import quote_plus def get_db_connection_string(config, section): """从配置文件中构建数据库连接字符串""" return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \ f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}" def transform_and_load_supplier(config_file_path): """ 从源数据库提取ma_vender数据,转换后加载到目标数据库ma_supplier_info :param config_file_path: 配置文件路径 """ # 读取配置文件 if not os.path.exists(config_file_path): raise FileNotFoundError(f"配置文件不存在: {config_file_path}") config = configparser.ConfigParser() config.read(config_file_path) try: # 获取数据库连接 source_conn_str = get_db_connection_string(config, 'source_db') target_conn_str = get_db_connection_string(config, 'target_db') source_engine = create_engine(source_conn_str) target_engine = create_engine(target_conn_str) # 从源表读取数据(过滤company_id=1且IS_ACTIVE=1的记录) print("正在从源表ma_vender读取数据...") source_query = """ SELECT id, NAME, ADDRESS, COMPANY_MAN, MAIN_PERSON, PHONE, SCOPE_BUSINESS FROM ma_vender WHERE company_id = 1 \ AND IS_ACTIVE = 1 \ """ source_df = pd.read_sql(source_query, source_engine) if source_df.empty: print("没有符合条件的数据需要转换") return print(f"读取到{len(source_df)}条待转换数据") # 数据转换 print("正在进行数据转换...") target_df = pd.DataFrame() # 字段映射(源字段 → 目标字段) field_mapping = { 'id': 'supplier_id', 'NAME': 'supplier', 'ADDRESS': 'address', 'COMPANY_MAN': 'legal_person', 'MAIN_PERSON': 'primary_contact', 'PHONE': 'phone', 'SCOPE_BUSINESS': 'business_scope' } # 复制字段 for source_field, target_field in field_mapping.items(): target_df[target_field] = source_df[source_field] # 检查数据质量 print("\n数据质量检查:") print(f"- 空供应商ID记录: {target_df['supplier_id'].isna().sum()}") print(f"- 空供应商名称记录: {target_df['supplier'].isna().sum()}") print(f"- 无效电话号码记录: {target_df['phone'].str.contains('[^0-9-]').sum()}") # 写入目标表(使用事务确保原子性) print("\n正在写入目标表ma_supplier_info...") with target_engine.begin() as conn: # 检查并处理可能的主键冲突 existing_ids = pd.read_sql( "SELECT supplier_id FROM ma_supplier_info", conn )['supplier_id'].tolist() new_records = target_df[~target_df['supplier_id'].isin(existing_ids)] dup_count = len(target_df) - len(new_records) if dup_count > 0: print(f"发现{dup_count}条重复供应商记录,将自动跳过") print("重复ID示例:", target_df[target_df['supplier_id'].isin(existing_ids)]['supplier_id'].head(3).tolist()) if not new_records.empty: new_records.to_sql( 'ma_supplier_info', conn, if_exists='append', index=False, dtype={ 'supplier_id': sqlalchemy.types.INTEGER(), 'supplier': sqlalchemy.types.VARCHAR(length=100), 'address': sqlalchemy.types.VARCHAR(length=200), 'legal_person': sqlalchemy.types.VARCHAR(length=50), 'primary_contact': sqlalchemy.types.VARCHAR(length=50), 'phone': sqlalchemy.types.VARCHAR(length=20), 'business_scope': sqlalchemy.types.TEXT() } ) print(f"成功写入{len(new_records)}条新供应商数据") else: print("没有新供应商数据需要写入") except Exception as e: print(f"\n处理过程中发生错误: {str(e)}") raise finally: if 'source_engine' in locals(): source_engine.dispose() if 'target_engine' in locals(): target_engine.dispose() if __name__ == "__main__": # 配置文件路径 config_file = "config.ini" try: transform_and_load_supplier(config_file) except Exception as e: print(f"程序执行失败: {str(e)}")