Bonus-Transfer-Machines/materialSite/data_comparison_update.py

325 lines
12 KiB
Python
Raw Normal View History

import configparser
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
# 获取数据库连接配置
source_config = {
'host': config.get('source_db', 'host'),
'user': config.get('source_db', 'user'),
'password': config.get('source_db', 'password'),
'database': config.get('source_db', 'database'),
'port': config.getint('source_db', 'port')
}
target_config = {
'host': config.get('target_db', 'host'),
'user': config.get('target_db', 'user'),
'password': config.get('target_db', 'password'),
'database': config.get('target_db', 'database'),
'port': config.getint('target_db', 'port')
}
# 创建数据库引擎
source_engine = create_engine(
f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}"
)
target_engine = create_engine(
f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}"
)
class DataComparisonUpdater:
"""数据对比和更新器"""
def __init__(self):
self.source_engine = source_engine
self.target_engine = target_engine
self.update_rules = {} # 存储更新规则
self.comparison_results = [] # 存储对比结果
def set_update_rules(self, rules):
"""
设置更新规则
rules: 字典格式包含表名条件要更新的字段等
示例
{
'table_name': 'ma_supplier_info',
'source_table': 'bm_supplier',
'join_conditions': {'supplier': 'NAME'},
'update_fields': ['address', 'phone'],
'where_conditions': {'status': 'active'}
}
"""
self.update_rules = rules
logger.info(f"已设置更新规则: {rules}")
def query_target_data(self, table_name, columns=None, conditions=None):
"""
从目标数据库查询数据
Args:
table_name: 表名
columns: 要查询的列默认为所有列
conditions: 查询条件字典格式
Returns:
DataFrame: 查询结果
"""
try:
# 构建查询SQL
if columns:
columns_str = ', '.join(columns)
else:
columns_str = '*'
sql = f"SELECT {columns_str} FROM {table_name}"
# 添加WHERE条件
if conditions:
where_clauses = []
for key, value in conditions.items():
if isinstance(value, str):
where_clauses.append(f"{key} = '{value}'")
else:
where_clauses.append(f"{key} = {value}")
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
logger.info(f"执行目标数据库查询: {sql}")
df = pd.read_sql(sql, self.target_engine)
logger.info(f"查询到 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"查询目标数据库时发生错误: {str(e)}")
return pd.DataFrame()
def query_source_data(self, table_name, columns=None, conditions=None):
"""
从源数据库查询数据
Args:
table_name: 表名
columns: 要查询的列默认为所有列
conditions: 查询条件字典格式
Returns:
DataFrame: 查询结果
"""
try:
# 构建查询SQL
if columns:
columns_str = ', '.join(columns)
else:
columns_str = '*'
sql = f"SELECT {columns_str} FROM {table_name}"
# 添加WHERE条件
if conditions:
where_clauses = []
for key, value in conditions.items():
if isinstance(value, str):
where_clauses.append(f"{key} = '{value}'")
else:
where_clauses.append(f"{key} = {value}")
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
logger.info(f"执行源数据库查询: {sql}")
df = pd.read_sql(sql, self.source_engine)
logger.info(f"查询到 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"查询源数据库时发生错误: {str(e)}")
return pd.DataFrame()
def compare_data(self, target_df, source_df, join_columns):
"""
对比目标数据库和源数据库的数据
Args:
target_df: 目标数据库数据
source_df: 源数据库数据
join_columns: 用于关联的列名字典格式 {'target_col': 'source_col'}
Returns:
DataFrame: 对比结果
"""
try:
# 重命名源数据库列名以便关联
source_df_renamed = source_df.copy()
for target_col, source_col in join_columns.items():
if source_col in source_df_renamed.columns:
source_df_renamed = source_df_renamed.rename(columns={source_col: target_col})
# 执行关联
merged_df = pd.merge(target_df, source_df_renamed,
on=list(join_columns.keys()),
how='inner',
suffixes=('_target', '_source'))
logger.info(f"数据对比完成,找到 {len(merged_df)} 条匹配记录")
return merged_df
except Exception as e:
logger.error(f"数据对比时发生错误: {str(e)}")
return pd.DataFrame()
def update_source_data(self, comparison_df, update_fields, table_name):
"""
根据对比结果更新源数据库
Args:
comparison_df: 对比结果数据
update_fields: 要更新的字段列表
table_name: 要更新的表名
Returns:
int: 更新的记录数
"""
try:
update_count = 0
with self.source_engine.connect() as conn:
for _, row in comparison_df.iterrows():
# 构建UPDATE语句
set_clauses = []
for field in update_fields:
target_field = f"{field}_target"
if target_field in row and pd.notna(row[target_field]):
if isinstance(row[target_field], str):
set_clauses.append(f"{field} = '{row[target_field]}'")
else:
set_clauses.append(f"{field} = {row[target_field]}")
if set_clauses:
# 构建WHERE条件使用主键或唯一标识
where_conditions = []
for key in comparison_df.columns:
if key.endswith('_source') and not key.endswith('_target'):
if pd.notna(row[key]):
if isinstance(row[key], str):
where_conditions.append(f"{key.replace('_source', '')} = '{row[key]}'")
else:
where_conditions.append(f"{key.replace('_source', '')} = {row[key]}")
if where_conditions:
sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {' AND '.join(where_conditions)}"
logger.debug(f"执行更新SQL: {sql}")
result = conn.execute(text(sql))
update_count += result.rowcount
conn.commit()
logger.info(f"成功更新 {update_count} 条记录")
return update_count
except Exception as e:
logger.error(f"更新源数据库时发生错误: {str(e)}")
return 0
def execute_comparison_update(self):
"""
执行完整的数据对比和更新流程
"""
if not self.update_rules:
logger.error("未设置更新规则,请先调用 set_update_rules()")
return False
try:
rules = self.update_rules
# 1. 查询目标数据库数据
target_df = self.query_target_data(
rules['table_name'],
rules.get('target_columns'),
rules.get('target_conditions')
)
if target_df.empty:
logger.warning("目标数据库查询结果为空")
return False
# 2. 查询源数据库数据
source_df = self.query_source_data(
rules['source_table'],
rules.get('source_columns'),
rules.get('source_conditions')
)
if source_df.empty:
logger.warning("源数据库查询结果为空")
return False
# 3. 对比数据
comparison_df = self.compare_data(
target_df,
source_df,
rules['join_conditions']
)
if comparison_df.empty:
logger.warning("没有找到匹配的数据")
return False
# 4. 更新源数据库
update_count = self.update_source_data(
comparison_df,
rules['update_fields'],
rules['source_table']
)
logger.info(f"数据对比和更新完成,共更新 {update_count} 条记录")
return True
except Exception as e:
logger.error(f"执行数据对比更新时发生错误: {str(e)}")
return False
def main():
"""主函数 - 示例用法"""
updater = DataComparisonUpdater()
# 示例:设置更新规则
# 这里您需要根据具体需求设置规则
example_rules = {
'table_name': 'ma_supplier_info', # 目标表
'source_table': 'bm_supplier', # 源表
'join_conditions': {'supplier': 'NAME'}, # 关联条件
'update_fields': ['address', 'phone'], # 要更新的字段
'target_conditions': {'status': 'active'}, # 目标表查询条件
'source_conditions': None, # 源表查询条件
'target_columns': ['supplier', 'address', 'phone'], # 目标表查询列
'source_columns': ['NAME', 'ADDRESS', 'PHONE_NUMBER'] # 源表查询列
}
# 设置更新规则
updater.set_update_rules(example_rules)
# 执行对比和更新
success = updater.execute_comparison_update()
if success:
print("✅ 数据对比和更新完成")
else:
print("❌ 数据对比和更新失败")
if __name__ == "__main__":
main()