Bonus-Transfer-Machines/materialSite/data_comparison_update.py

325 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import configparser
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import quote_plus
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
# 获取数据库连接配置
source_config = {
'host': config.get('source_db', 'host'),
'user': config.get('source_db', 'user'),
'password': config.get('source_db', 'password'),
'database': config.get('source_db', 'database'),
'port': config.getint('source_db', 'port')
}
target_config = {
'host': config.get('target_db', 'host'),
'user': config.get('target_db', 'user'),
'password': config.get('target_db', 'password'),
'database': config.get('target_db', 'database'),
'port': config.getint('target_db', 'port')
}
# 创建数据库引擎
source_engine = create_engine(
f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}"
)
target_engine = create_engine(
f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}"
)
class DataComparisonUpdater:
"""数据对比和更新器"""
def __init__(self):
self.source_engine = source_engine
self.target_engine = target_engine
self.update_rules = {} # 存储更新规则
self.comparison_results = [] # 存储对比结果
def set_update_rules(self, rules):
"""
设置更新规则
rules: 字典格式,包含表名、条件、要更新的字段等
示例:
{
'table_name': 'ma_supplier_info',
'source_table': 'bm_supplier',
'join_conditions': {'supplier': 'NAME'},
'update_fields': ['address', 'phone'],
'where_conditions': {'status': 'active'}
}
"""
self.update_rules = rules
logger.info(f"已设置更新规则: {rules}")
def query_target_data(self, table_name, columns=None, conditions=None):
"""
从目标数据库查询数据
Args:
table_name: 表名
columns: 要查询的列,默认为所有列
conditions: 查询条件,字典格式
Returns:
DataFrame: 查询结果
"""
try:
# 构建查询SQL
if columns:
columns_str = ', '.join(columns)
else:
columns_str = '*'
sql = f"SELECT {columns_str} FROM {table_name}"
# 添加WHERE条件
if conditions:
where_clauses = []
for key, value in conditions.items():
if isinstance(value, str):
where_clauses.append(f"{key} = '{value}'")
else:
where_clauses.append(f"{key} = {value}")
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
logger.info(f"执行目标数据库查询: {sql}")
df = pd.read_sql(sql, self.target_engine)
logger.info(f"查询到 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"查询目标数据库时发生错误: {str(e)}")
return pd.DataFrame()
def query_source_data(self, table_name, columns=None, conditions=None):
"""
从源数据库查询数据
Args:
table_name: 表名
columns: 要查询的列,默认为所有列
conditions: 查询条件,字典格式
Returns:
DataFrame: 查询结果
"""
try:
# 构建查询SQL
if columns:
columns_str = ', '.join(columns)
else:
columns_str = '*'
sql = f"SELECT {columns_str} FROM {table_name}"
# 添加WHERE条件
if conditions:
where_clauses = []
for key, value in conditions.items():
if isinstance(value, str):
where_clauses.append(f"{key} = '{value}'")
else:
where_clauses.append(f"{key} = {value}")
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
logger.info(f"执行源数据库查询: {sql}")
df = pd.read_sql(sql, self.source_engine)
logger.info(f"查询到 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"查询源数据库时发生错误: {str(e)}")
return pd.DataFrame()
def compare_data(self, target_df, source_df, join_columns):
"""
对比目标数据库和源数据库的数据
Args:
target_df: 目标数据库数据
source_df: 源数据库数据
join_columns: 用于关联的列名,字典格式 {'target_col': 'source_col'}
Returns:
DataFrame: 对比结果
"""
try:
# 重命名源数据库列名以便关联
source_df_renamed = source_df.copy()
for target_col, source_col in join_columns.items():
if source_col in source_df_renamed.columns:
source_df_renamed = source_df_renamed.rename(columns={source_col: target_col})
# 执行关联
merged_df = pd.merge(target_df, source_df_renamed,
on=list(join_columns.keys()),
how='inner',
suffixes=('_target', '_source'))
logger.info(f"数据对比完成,找到 {len(merged_df)} 条匹配记录")
return merged_df
except Exception as e:
logger.error(f"数据对比时发生错误: {str(e)}")
return pd.DataFrame()
def update_source_data(self, comparison_df, update_fields, table_name):
"""
根据对比结果更新源数据库
Args:
comparison_df: 对比结果数据
update_fields: 要更新的字段列表
table_name: 要更新的表名
Returns:
int: 更新的记录数
"""
try:
update_count = 0
with self.source_engine.connect() as conn:
for _, row in comparison_df.iterrows():
# 构建UPDATE语句
set_clauses = []
for field in update_fields:
target_field = f"{field}_target"
if target_field in row and pd.notna(row[target_field]):
if isinstance(row[target_field], str):
set_clauses.append(f"{field} = '{row[target_field]}'")
else:
set_clauses.append(f"{field} = {row[target_field]}")
if set_clauses:
# 构建WHERE条件使用主键或唯一标识
where_conditions = []
for key in comparison_df.columns:
if key.endswith('_source') and not key.endswith('_target'):
if pd.notna(row[key]):
if isinstance(row[key], str):
where_conditions.append(f"{key.replace('_source', '')} = '{row[key]}'")
else:
where_conditions.append(f"{key.replace('_source', '')} = {row[key]}")
if where_conditions:
sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {' AND '.join(where_conditions)}"
logger.debug(f"执行更新SQL: {sql}")
result = conn.execute(text(sql))
update_count += result.rowcount
conn.commit()
logger.info(f"成功更新 {update_count} 条记录")
return update_count
except Exception as e:
logger.error(f"更新源数据库时发生错误: {str(e)}")
return 0
def execute_comparison_update(self):
"""
执行完整的数据对比和更新流程
"""
if not self.update_rules:
logger.error("未设置更新规则,请先调用 set_update_rules()")
return False
try:
rules = self.update_rules
# 1. 查询目标数据库数据
target_df = self.query_target_data(
rules['table_name'],
rules.get('target_columns'),
rules.get('target_conditions')
)
if target_df.empty:
logger.warning("目标数据库查询结果为空")
return False
# 2. 查询源数据库数据
source_df = self.query_source_data(
rules['source_table'],
rules.get('source_columns'),
rules.get('source_conditions')
)
if source_df.empty:
logger.warning("源数据库查询结果为空")
return False
# 3. 对比数据
comparison_df = self.compare_data(
target_df,
source_df,
rules['join_conditions']
)
if comparison_df.empty:
logger.warning("没有找到匹配的数据")
return False
# 4. 更新源数据库
update_count = self.update_source_data(
comparison_df,
rules['update_fields'],
rules['source_table']
)
logger.info(f"数据对比和更新完成,共更新 {update_count} 条记录")
return True
except Exception as e:
logger.error(f"执行数据对比更新时发生错误: {str(e)}")
return False
def main():
"""主函数 - 示例用法"""
updater = DataComparisonUpdater()
# 示例:设置更新规则
# 这里您需要根据具体需求设置规则
example_rules = {
'table_name': 'ma_supplier_info', # 目标表
'source_table': 'bm_supplier', # 源表
'join_conditions': {'supplier': 'NAME'}, # 关联条件
'update_fields': ['address', 'phone'], # 要更新的字段
'target_conditions': {'status': 'active'}, # 目标表查询条件
'source_conditions': None, # 源表查询条件
'target_columns': ['supplier', 'address', 'phone'], # 目标表查询列
'source_columns': ['NAME', 'ADDRESS', 'PHONE_NUMBER'] # 源表查询列
}
# 设置更新规则
updater.set_update_rules(example_rules)
# 执行对比和更新
success = updater.execute_comparison_update()
if success:
print("✅ 数据对比和更新完成")
else:
print("❌ 数据对比和更新失败")
if __name__ == "__main__":
main()