import configparser import pandas as pd from sqlalchemy import create_engine from urllib.parse import quote_plus # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini') # 获取数据库连接配置 source_config = { 'host': config.get('source_db', 'host'), 'user': config.get('source_db', 'user'), 'password': config.get('source_db', 'password'), 'database': config.get('source_db', 'database'), 'port': config.getint('source_db', 'port') } target_config = { 'host': config.get('target_db', 'host'), 'user': config.get('target_db', 'user'), 'password': config.get('target_db', 'password'), 'database': config.get('target_db', 'database'), 'port': config.getint('target_db', 'port') } # 创建数据库引擎 source_engine = create_engine( f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" ) target_engine = create_engine( f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" ) # 定义UNIT_ID到unit_name的映射 unit_mapping = { 9: 'm', 90: '根', 91: '个', 92: '付', 93: '支', 94: '双', 95: '套', 96: '组', 97: '只', 98: '台', 99: '顶', 100: '件', 101: '米', 102: '面', 103: '片', 104: '盘', 105: '张', 106: '块', 107: '副', 108: '袋', 109: '条', 110: '节', 111: '瓶', 112: '斤', 113: '把', 114: '平方米', 115: '床', 123: 'kg', 124: '份', 125: '卷' } def clean_existing_data(): """清理目标表中需要更新的数据""" try: # 删除重复的地锚规格等不需要的数据(根据实际业务需求调整) with target_engine.connect() as conn: conn.execute(""" DELETE FROM ma_type WHERE type_name IN ('重复的地锚规格', '其他需要删除的规格名称') """) print("已清理目标表中的重复/无效数据") return True except Exception as e: print(f"清理数据时出错: {str(e)}") return False def process_machine_types(): """处理ma_machine_type表数据""" try: # 读取源数据 df = pd.read_sql("SELECT * FROM ma_machine_type", source_engine) # 按照规则转换数据 result = pd.DataFrame() result['type_id'] = df['ID'] + 6000 # ID加6000 result['parent_id'] = df['PARENT_ID'] result['type_name'] = df['NAME'] result['level'] = df['LEVEL'] result['unit_name'] = df['UNIT_ID'].map(unit_mapping) # 单位映射 result['storage_num'] = df['STORAGE_NUM'] result['buy_price'] = df['PURCHASE_PRICE'] result['notax_price'] = df['NOTAX_PRICE'] result['lease_price'] = df['RENTAL_PRICE'] result['manage_type'] = df['IS_COUNT'] result['rated_load'] = df['CONSUMABLE'] # 处理新增规格(示例,根据实际情况调整) new_types = [ {'type_id': 6601, 'type_name': '新增规格1', 'unit_name': '个', 'level': 3}, {'type_id': 6602, 'type_name': '新增规格2', 'unit_name': '套', 'level': 3} ] new_df = pd.DataFrame(new_types) result = pd.concat([result, new_df], ignore_index=True) # 先清理目标表 if not clean_existing_data(): return False # 写入目标表(使用更新模式) result.to_sql('ma_type', target_engine, if_exists='append', index=False) print(f"成功转换并导入 {len(result)} 条记录到 ma_type") # 更新现有库存和租赁单价(根据type_id) with target_engine.connect() as conn: # 更新storage_num conn.execute(""" UPDATE ma_type t, ma_machine_type s SET t.storage_num = s.STORAGE_NUM, t.lease_price = s.LEASE_PRICE WHERE t.type_id = s.ID + 6000 """) print("已更新现有库存和租赁单价") return True except Exception as e: print(f"处理 ma_machine_type 时发生错误: {str(e)}") return False if __name__ == "__main__": process_machine_types()