Bonus-Transfer-Machines/machines/tm_task.py

155 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import pandas as pd
from sqlalchemy import create_engine
from urllib.parse import quote_plus
# @author: 阮世耀
# 描述处理ba_ma_input_check表数据到tm_task表的迁移
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
# 获取数据库连接配置
source_config = {
'host': config.get('source_db', 'host'),
'user': config.get('source_db', 'user'),
'password': config.get('source_db', 'password'),
'database': config.get('source_db', 'database'),
'port': config.getint('source_db', 'port')
}
target_config = {
'host': config.get('target_db', 'host'),
'user': config.get('target_db', 'user'),
'password': config.get('target_db', 'password'),
'database': config.get('target_db', 'database'),
'port': config.getint('target_db', 'port')
}
# 创建数据库引擎
source_engine = create_engine(
f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}"
)
target_engine = create_engine(
f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}"
)
def process_tm_task():
"""处理tm_task表数据迁移"""
try:
# 源表查询SQL
sql = """
SELECT
bic.ID as task_id,
5 as task_type,
IF(bic.IS_SURE = 1, 11, 10) as task_status,
bmr.APPLY_NUMBER as code,
tt.CREATOR,
tt.CREATE_TIME
FROM
ba_ma_input_check bic
LEFT JOIN tm_task tt on bic.ID = tt.ID
LEFT JOIN ba_ma_repair bmr on bic.REPAIR_ID = bmr.ID
"""
# 执行查询获取源数据
df = pd.read_sql(sql, source_engine)
# 数据转换和字段映射
result = pd.DataFrame()
result['task_id'] = df['task_id']
result['task_type'] = df['task_type']
result['task_status'] = df['task_status']
result['code'] = df['code']
result['create_by'] = df['CREATOR']
result['create_time'] = df['CREATE_TIME']
# 数据清洗:移除空值行
result = result.dropna(subset=['task_id'])
# 写入目标表
result.to_sql('tm_task', target_engine,
if_exists='append', index=False)
print(f"成功转换并导入 {len(result)} 条记录到 tm_task")
print(f"任务类型分布:")
print(f" - task_type = 5: {len(result)}")
print(f"任务状态分布:")
status_counts = result['task_status'].value_counts()
for status, count in status_counts.items():
status_desc = "已确认" if status == 11 else "待确认"
print(f" - task_status = {status} ({status_desc}): {count}")
return True
except Exception as e:
print(f"处理 tm_task 时发生错误: {str(e)}")
return False
def validate_data():
"""验证迁移数据的完整性"""
try:
# 检查源数据总数
source_count_sql = """
SELECT COUNT(*) as total_count
FROM ba_ma_input_check bic
LEFT JOIN tm_task tt on bic.ID = tt.ID
LEFT JOIN ba_ma_repair bmr on bic.REPAIR_ID = bmr.ID
"""
source_count = pd.read_sql(source_count_sql, source_engine)['total_count'].iloc[0]
# 检查目标数据总数
target_count_sql = "SELECT COUNT(*) as total_count FROM tm_task WHERE task_type = 5"
target_count = pd.read_sql(target_count_sql, target_engine)['total_count'].iloc[0]
print(f"\n数据验证结果:")
print(f"源数据记录数: {source_count}")
print(f"目标数据记录数: {target_count}")
if source_count == target_count:
print("✅ 数据迁移完整性验证通过")
return True
else:
print("❌ 数据迁移完整性验证失败,记录数不匹配")
return False
except Exception as e:
print(f"数据验证时发生错误: {str(e)}")
return False
def clear_existing_data():
"""清理目标表中的现有数据(可选)"""
try:
confirmation = input("是否要清理目标表中 task_type=5 的现有数据?(y/N): ")
if confirmation.lower() == 'y':
with target_engine.connect() as conn:
result = conn.execute("DELETE FROM tm_task WHERE task_type = 5")
print(f"已清理 {result.rowcount} 条现有数据")
return True
else:
print("跳过数据清理")
return True
except Exception as e:
print(f"清理数据时发生错误: {str(e)}")
return False
if __name__ == "__main__":
print("=== tm_task 审核表数据迁移 ===")
print("开始执行数据迁移...")
# 可选:清理现有数据
if clear_existing_data():
# 执行数据迁移
if process_tm_task():
# 验证数据完整性
validate_data()
print("\n=== 迁移完成 ===")
else:
print("\n=== 迁移失败 ===")
else:
print("\n=== 迁移中止 ===")