diff --git a/materialSite/clz_bm_agreement_info_update.py b/materialSite/clz_bm_agreement_info_update.py new file mode 100644 index 0000000..63d6050 --- /dev/null +++ b/materialSite/clz_bm_agreement_info_update.py @@ -0,0 +1,268 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +class ClzBmAgreementInfoUpdater: + """clz_bm_agreement_info表数据对比和更新器""" + + def __init__(self): + self.source_engine = source_engine + self.target_engine = target_engine + + def query_target_data(self): + """ + 从目标数据库查询clz_bm_agreement_info数据 + """ + try: + sql = """ + SELECT + ca.agreement_id, + ca.agreement_code, + ca.project_id, + bu.unit_id, + bu.unit_name, + bp.pro_id, + bp.pro_name, + bp.external_id + FROM + clz_bm_agreement_info ca + LEFT JOIN bm_unit bu ON ca.unit_id = bu.unit_id + LEFT JOIN bm_project bp ON ca.project_id = bp.pro_id + WHERE + bp.external_id is not null + """ + + logger.info("执行目标数据库查询...") + df = pd.read_sql(sql, self.target_engine) + logger.info(f"目标数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询目标数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def query_source_data(self): + """ + 从源数据库查询clz_bm_agreement_info数据 + """ + try: + sql = """ + SELECT + ca.agreement_id, + ca.agreement_code, + ca.project_id, + bu.unit_id, + bu.unit_name, + bp.pro_id, + bp.pro_name, + bp.external_id + FROM + clz_bm_agreement_info ca + LEFT JOIN bm_unit bu ON ca.unit_id = bu.unit_id + LEFT JOIN bm_project bp ON ca.project_id = bp.pro_id + WHERE + bp.external_id is not null + """ + + logger.info("执行源数据库查询...") + df = pd.read_sql(sql, self.source_engine) + logger.info(f"源数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询源数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def compare_and_update_data(self, target_df, source_df): + """ + 对比数据并执行更新 + + Args: + target_df: 目标数据库数据 + source_df: 源数据库数据 + + Returns: + int: 更新的记录数 + """ + try: + update_count = 0 + + # 创建对比条件:external_id相同 + logger.info("开始数据对比...") + + # 使用pandas merge进行数据对比 + merged_df = pd.merge( + target_df, + source_df, + on=['external_id'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if merged_df.empty: + logger.warning("没有找到匹配的数据") + return 0 + + # 执行更新操作 + with self.source_engine.connect() as conn: + for _, row in merged_df.iterrows(): + # 检查目标表的project_id字段是否有值 + target_project_id = row.get('project_id_target') + source_agreement_id = row.get('agreement_id_source') + + if pd.notna(target_project_id) and pd.notna(source_agreement_id): + # 构建UPDATE语句 - 更新ca.project_id字段 + sql = f""" + UPDATE clz_bm_agreement_info + SET project_id = {target_project_id} + WHERE agreement_id = {source_agreement_id} + """ + + logger.debug(f"执行更新SQL: {sql}") + result = conn.execute(text(sql)) + update_count += result.rowcount + + conn.commit() + + logger.info(f"成功更新 {update_count} 条记录") + return update_count + + except Exception as e: + logger.error(f"数据对比和更新时发生错误: {str(e)}") + return 0 + + def execute_update(self): + """ + 执行完整的数据对比和更新流程 + """ + try: + logger.info("=== 开始clz_bm_agreement_info表数据对比和更新 ===") + + # 1. 查询目标数据库数据 + target_df = self.query_target_data() + if target_df.empty: + logger.error("目标数据库查询结果为空") + return False + + # 2. 查询源数据库数据 + source_df = self.query_source_data() + if source_df.empty: + logger.error("源数据库查询结果为空") + return False + + # 3. 对比数据并执行更新 + update_count = self.compare_and_update_data(target_df, source_df) + + if update_count > 0: + logger.info(f"✅ 数据对比和更新完成,共更新 {update_count} 条记录") + return True + else: + logger.warning("⚠️ 没有找到需要更新的数据") + return True + + except Exception as e: + logger.error(f"执行更新流程时发生错误: {str(e)}") + return False + + def preview_changes(self): + """ + 预览将要进行的更改(不实际执行更新) + """ + try: + logger.info("=== 预览数据对比结果 ===") + + # 查询数据 + target_df = self.query_target_data() + source_df = self.query_source_data() + + if target_df.empty or source_df.empty: + logger.warning("查询结果为空,无法预览") + return + + # 对比数据 + merged_df = pd.merge( + target_df, + source_df, + on=['external_id'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if not merged_df.empty: + logger.info("预览将要更新的记录:") + for idx, row in merged_df.head(10).iterrows(): # 只显示前10条 + logger.info(f"agreement_id: {row['agreement_id_source']}, " + f"external_id: {row['external_id']}, " + f"原project_id: {row.get('project_id_source', 'NULL')}, " + f"新project_id: {row.get('project_id_target', 'NULL')}") + + if len(merged_df) > 10: + logger.info(f"... 还有 {len(merged_df) - 10} 条记录") + + except Exception as e: + logger.error(f"预览时发生错误: {str(e)}") + + +def main(): + """主函数""" + updater = ClzBmAgreementInfoUpdater() + + # 询问用户是否要预览更改 + preview = input("是否要预览将要进行的更改?(y/N): ") + if preview.lower() == 'y': + updater.preview_changes() + + # 询问用户是否要执行更新 + confirm = input("是否要执行数据更新?(y/N): ") + if confirm.lower() == 'y': + success = updater.execute_update() + if success: + print("✅ 更新操作完成") + else: + print("❌ 更新操作失败") + else: + print("操作已取消") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/materialSite/clz_lease_apply_info_update.py b/materialSite/clz_lease_apply_info_update.py new file mode 100644 index 0000000..2c8f0bc --- /dev/null +++ b/materialSite/clz_lease_apply_info_update.py @@ -0,0 +1,267 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +class ClzLeaseApplyInfoUpdater: + """clz_lease_apply_info表数据对比和更新器""" + + def __init__(self): + self.source_engine = source_engine + self.target_engine = target_engine + + def query_target_data(self): + """ + 从目标数据库查询clz_lease_apply_info数据 + """ + try: + sql = """ + SELECT + cli.id, + cli.`code`, + cli.project_id, + bu.unit_id, + bu.unit_name, + bp.pro_id, + bp.pro_name, + bp.external_id + FROM + clz_lease_apply_info cli + LEFT JOIN bm_unit bu on cli.team_id = bu.unit_id + LEFT JOIN bm_project bp on cli.project_id = bp.pro_id + WHERE bp.external_id is not null + """ + + logger.info("执行目标数据库查询...") + df = pd.read_sql(sql, self.target_engine) + logger.info(f"目标数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询目标数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def query_source_data(self): + """ + 从源数据库查询clz_lease_apply_info数据 + """ + try: + sql = """ + SELECT + cli.id, + cli.`code`, + cli.project_id, + bu.unit_id, + bu.unit_name, + bp.pro_id, + bp.pro_name, + bp.external_id + FROM + clz_lease_apply_info cli + LEFT JOIN bm_unit bu on cli.team_id = bu.unit_id + LEFT JOIN bm_project bp on cli.project_id = bp.pro_id + WHERE bp.external_id is not null + """ + + logger.info("执行源数据库查询...") + df = pd.read_sql(sql, self.source_engine) + logger.info(f"源数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询源数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def compare_and_update_data(self, target_df, source_df): + """ + 对比数据并执行更新 + + Args: + target_df: 目标数据库数据 + source_df: 源数据库数据 + + Returns: + int: 更新的记录数 + """ + try: + update_count = 0 + + # 创建对比条件:external_id相同 + logger.info("开始数据对比...") + + # 使用pandas merge进行数据对比 + merged_df = pd.merge( + target_df, + source_df, + on=['external_id'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if merged_df.empty: + logger.warning("没有找到匹配的数据") + return 0 + + # 执行更新操作 + with self.source_engine.connect() as conn: + for _, row in merged_df.iterrows(): + # 检查目标表的project_id字段是否有值 + target_project_id = row.get('project_id_target') + source_id = row.get('id_source') + + if pd.notna(target_project_id) and pd.notna(source_id): + # 构建UPDATE语句 - 更新cli.project_id字段 + sql = f""" + UPDATE clz_lease_apply_info + SET project_id = {target_project_id} + WHERE id = {source_id} + """ + + logger.debug(f"执行更新SQL: {sql}") + result = conn.execute(text(sql)) + update_count += result.rowcount + + conn.commit() + + logger.info(f"成功更新 {update_count} 条记录") + return update_count + + except Exception as e: + logger.error(f"数据对比和更新时发生错误: {str(e)}") + return 0 + + def execute_update(self): + """ + 执行完整的数据对比和更新流程 + """ + try: + logger.info("=== 开始clz_lease_apply_info表数据对比和更新 ===") + + # 1. 查询目标数据库数据 + target_df = self.query_target_data() + if target_df.empty: + logger.error("目标数据库查询结果为空") + return False + + # 2. 查询源数据库数据 + source_df = self.query_source_data() + if source_df.empty: + logger.error("源数据库查询结果为空") + return False + + # 3. 对比数据并执行更新 + update_count = self.compare_and_update_data(target_df, source_df) + + if update_count > 0: + logger.info(f"✅ 数据对比和更新完成,共更新 {update_count} 条记录") + return True + else: + logger.warning("⚠️ 没有找到需要更新的数据") + return True + + except Exception as e: + logger.error(f"执行更新流程时发生错误: {str(e)}") + return False + + def preview_changes(self): + """ + 预览将要进行的更改(不实际执行更新) + """ + try: + logger.info("=== 预览数据对比结果 ===") + + # 查询数据 + target_df = self.query_target_data() + source_df = self.query_source_data() + + if target_df.empty or source_df.empty: + logger.warning("查询结果为空,无法预览") + return + + # 对比数据 + merged_df = pd.merge( + target_df, + source_df, + on=['external_id'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if not merged_df.empty: + logger.info("预览将要更新的记录:") + for idx, row in merged_df.head(10).iterrows(): # 只显示前10条 + logger.info(f"ID: {row['id_source']}, " + f"code: {row.get('code_source', 'NULL')}, " + f"external_id: {row['external_id']}, " + f"原project_id: {row.get('project_id_source', 'NULL')}, " + f"新project_id: {row.get('project_id_target', 'NULL')}") + + if len(merged_df) > 10: + logger.info(f"... 还有 {len(merged_df) - 10} 条记录") + + except Exception as e: + logger.error(f"预览时发生错误: {str(e)}") + + +def main(): + """主函数""" + updater = ClzLeaseApplyInfoUpdater() + + # 询问用户是否要预览更改 + preview = input("是否要预览将要进行的更改?(y/N): ") + if preview.lower() == 'y': + updater.preview_changes() + + # 询问用户是否要执行更新 + confirm = input("是否要执行数据更新?(y/N): ") + if confirm.lower() == 'y': + success = updater.execute_update() + if success: + print("✅ 更新操作完成") + else: + print("❌ 更新操作失败") + else: + print("操作已取消") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/materialSite/clz_lease_out_details_update.py b/materialSite/clz_lease_out_details_update.py new file mode 100644 index 0000000..e36344d --- /dev/null +++ b/materialSite/clz_lease_out_details_update.py @@ -0,0 +1,254 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +class ClzLeaseOutDetailsUpdater: + """clz_lease_out_details表数据对比和更新器""" + + def __init__(self): + self.source_engine = source_engine + self.target_engine = target_engine + + def query_target_data(self): + """ + 从目标数据库查询clz_lease_out_details数据 + """ + try: + sql = """ + SELECT + mm.type_id, + mm.ma_code, + mm.ma_id + FROM + ma_machine mm + """ + + logger.info("执行目标数据库查询...") + df = pd.read_sql(sql, self.target_engine) + logger.info(f"目标数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询目标数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def query_source_data(self): + """ + 从源数据库查询clz_lease_out_details数据 + """ + try: + sql = """ + SELECT + cod.id, + cod.type_id, + mm.ma_code, + mm.ma_id + FROM + clz_lease_out_details cod + LEFT JOIN ma_machine mm on cod.ma_id = mm.ma_id + WHERE cod.type_id > 6000 and cod.ma_id is not null AND mm.ma_code is not null + """ + + logger.info("执行源数据库查询...") + df = pd.read_sql(sql, self.source_engine) + logger.info(f"源数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询源数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def compare_and_update_data(self, target_df, source_df): + """ + 对比数据并执行更新 + + Args: + target_df: 目标数据库数据 + source_df: 源数据库数据 + + Returns: + int: 更新的记录数 + """ + try: + update_count = 0 + + # 创建对比条件:type_id和ma_code都相同 + logger.info("开始数据对比...") + + # 使用pandas merge进行数据对比 + merged_df = pd.merge( + target_df, + source_df, + on=['type_id', 'ma_code'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if merged_df.empty: + logger.warning("没有找到匹配的数据") + return 0 + + # 执行更新操作 + with self.source_engine.connect() as conn: + for _, row in merged_df.iterrows(): + # 检查目标表的ma_id字段是否有值 + target_ma_id = row.get('ma_id_target') + source_id = row.get('id') + + if pd.notna(target_ma_id) and pd.notna(source_id): + # 构建UPDATE语句 + sql = f""" + UPDATE clz_lease_out_details + SET ma_id = '{target_ma_id}' + WHERE id = {source_id} + """ + + logger.debug(f"执行更新SQL: {sql}") + result = conn.execute(text(sql)) + update_count += result.rowcount + + conn.commit() + + logger.info(f"成功更新 {update_count} 条记录") + return update_count + + except Exception as e: + logger.error(f"数据对比和更新时发生错误: {str(e)}") + return 0 + + def execute_update(self): + """ + 执行完整的数据对比和更新流程 + """ + try: + logger.info("=== 开始clz_lease_out_details表数据对比和更新 ===") + + # 1. 查询目标数据库数据 + target_df = self.query_target_data() + if target_df.empty: + logger.error("目标数据库查询结果为空") + return False + + # 2. 查询源数据库数据 + source_df = self.query_source_data() + if source_df.empty: + logger.error("源数据库查询结果为空") + return False + + # 3. 对比数据并执行更新 + update_count = self.compare_and_update_data(target_df, source_df) + + if update_count > 0: + logger.info(f"✅ 数据对比和更新完成,共更新 {update_count} 条记录") + return True + else: + logger.warning("⚠️ 没有找到需要更新的数据") + return True + + except Exception as e: + logger.error(f"执行更新流程时发生错误: {str(e)}") + return False + + def preview_changes(self): + """ + 预览将要进行的更改(不实际执行更新) + """ + try: + logger.info("=== 预览数据对比结果 ===") + + # 查询数据 + target_df = self.query_target_data() + source_df = self.query_source_data() + + if target_df.empty or source_df.empty: + logger.warning("查询结果为空,无法预览") + return + + # 对比数据 + merged_df = pd.merge( + target_df, + source_df, + on=['type_id', 'ma_code'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if not merged_df.empty: + logger.info("预览将要更新的记录:") + for idx, row in merged_df.head(10).iterrows(): # 只显示前10条 + logger.info(f"ID: {row['id']}, " + f"type_id: {row['type_id']}, " + f"ma_code: {row['ma_code']}, " + f"原ma_id: {row.get('ma_id_source', 'NULL')}, " + f"新ma_id: {row.get('ma_id_target', 'NULL')}") + + if len(merged_df) > 10: + logger.info(f"... 还有 {len(merged_df) - 10} 条记录") + + except Exception as e: + logger.error(f"预览时发生错误: {str(e)}") + + +def main(): + """主函数""" + updater = ClzLeaseOutDetailsUpdater() + + # 询问用户是否要预览更改 + preview = input("是否要预览将要进行的更改?(y/N): ") + if preview.lower() == 'y': + updater.preview_changes() + + # 询问用户是否要执行更新 + confirm = input("是否要执行数据更新?(y/N): ") + if confirm.lower() == 'y': + success = updater.execute_update() + if success: + print("✅ 更新操作完成") + else: + print("❌ 更新操作失败") + else: + print("操作已取消") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/materialSite/clz_slt_agreement_info_update.py b/materialSite/clz_slt_agreement_info_update.py new file mode 100644 index 0000000..896d405 --- /dev/null +++ b/materialSite/clz_slt_agreement_info_update.py @@ -0,0 +1,254 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +class ClzSltAgreementInfoUpdater: + """clz_slt_agreement_info表数据对比和更新器""" + + def __init__(self): + self.source_engine = source_engine + self.target_engine = target_engine + + def query_target_data(self): + """ + 从目标数据库查询ma_machine数据 + """ + try: + sql = """ + SELECT + mm.type_id, + mm.ma_code, + mm.ma_id + FROM + ma_machine mm + """ + + logger.info("执行目标数据库查询...") + df = pd.read_sql(sql, self.target_engine) + logger.info(f"目标数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询目标数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def query_source_data(self): + """ + 从源数据库查询clz_slt_agreement_info数据 + """ + try: + sql = """ + SELECT + cod.id, + cod.type_id, + mm.ma_code, + mm.ma_id + FROM + clz_slt_agreement_info cod + LEFT JOIN ma_machine mm on cod.ma_id = mm.ma_id + WHERE cod.type_id > 6000 and cod.ma_id is not null AND mm.ma_code is not null + """ + + logger.info("执行源数据库查询...") + df = pd.read_sql(sql, self.source_engine) + logger.info(f"源数据库查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询源数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def compare_and_update_data(self, target_df, source_df): + """ + 对比数据并执行更新 + + Args: + target_df: 目标数据库数据 + source_df: 源数据库数据 + + Returns: + int: 更新的记录数 + """ + try: + update_count = 0 + + # 创建对比条件:type_id和ma_code都相同 + logger.info("开始数据对比...") + + # 使用pandas merge进行数据对比 + merged_df = pd.merge( + target_df, + source_df, + on=['type_id', 'ma_code'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if merged_df.empty: + logger.warning("没有找到匹配的数据") + return 0 + + # 执行更新操作 + with self.source_engine.connect() as conn: + for _, row in merged_df.iterrows(): + # 检查目标表的ma_id字段是否有值 + target_ma_id = row.get('ma_id_target') + source_id = row.get('id') + + if pd.notna(target_ma_id) and pd.notna(source_id): + # 构建UPDATE语句 - 更新cod.ma_id字段 + sql = f""" + UPDATE clz_slt_agreement_info + SET ma_id = {target_ma_id} + WHERE id = {source_id} + """ + + logger.debug(f"执行更新SQL: {sql}") + result = conn.execute(text(sql)) + update_count += result.rowcount + + conn.commit() + + logger.info(f"成功更新 {update_count} 条记录") + return update_count + + except Exception as e: + logger.error(f"数据对比和更新时发生错误: {str(e)}") + return 0 + + def execute_update(self): + """ + 执行完整的数据对比和更新流程 + """ + try: + logger.info("=== 开始clz_slt_agreement_info表数据对比和更新 ===") + + # 1. 查询目标数据库数据 + target_df = self.query_target_data() + if target_df.empty: + logger.error("目标数据库查询结果为空") + return False + + # 2. 查询源数据库数据 + source_df = self.query_source_data() + if source_df.empty: + logger.error("源数据库查询结果为空") + return False + + # 3. 对比数据并执行更新 + update_count = self.compare_and_update_data(target_df, source_df) + + if update_count > 0: + logger.info(f"✅ 数据对比和更新完成,共更新 {update_count} 条记录") + return True + else: + logger.warning("⚠️ 没有找到需要更新的数据") + return True + + except Exception as e: + logger.error(f"执行更新流程时发生错误: {str(e)}") + return False + + def preview_changes(self): + """ + 预览将要进行的更改(不实际执行更新) + """ + try: + logger.info("=== 预览数据对比结果 ===") + + # 查询数据 + target_df = self.query_target_data() + source_df = self.query_source_data() + + if target_df.empty or source_df.empty: + logger.warning("查询结果为空,无法预览") + return + + # 对比数据 + merged_df = pd.merge( + target_df, + source_df, + on=['type_id', 'ma_code'], + how='inner', + suffixes=('_target', '_source') + ) + + logger.info(f"找到 {len(merged_df)} 条匹配记录") + + if not merged_df.empty: + logger.info("预览将要更新的记录:") + for idx, row in merged_df.head(10).iterrows(): # 只显示前10条 + logger.info(f"ID: {row['id']}, " + f"type_id: {row['type_id']}, " + f"ma_code: {row['ma_code']}, " + f"原cod.ma_id: {row.get('ma_id_source', 'NULL')}, " + f"新cod.ma_id: {row.get('ma_id_target', 'NULL')}") + + if len(merged_df) > 10: + logger.info(f"... 还有 {len(merged_df) - 10} 条记录") + + except Exception as e: + logger.error(f"预览时发生错误: {str(e)}") + + +def main(): + """主函数""" + updater = ClzSltAgreementInfoUpdater() + + # 询问用户是否要预览更改 + preview = input("是否要预览将要进行的更改?(y/N): ") + if preview.lower() == 'y': + updater.preview_changes() + + # 询问用户是否要执行更新 + confirm = input("是否要执行数据更新?(y/N): ") + if confirm.lower() == 'y': + success = updater.execute_update() + if success: + print("✅ 更新操作完成") + else: + print("❌ 更新操作失败") + else: + print("操作已取消") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/materialSite/config.ini b/materialSite/config.ini new file mode 100644 index 0000000..21fb073 --- /dev/null +++ b/materialSite/config.ini @@ -0,0 +1,16 @@ +[source_db] +host = 192.168.1.114 +user = boot +password = boot@123 +database = newimt +port = 13306 + +[target_db] +host = 192.168.1.114 +user = boot +password = boot@123 +database = test_jiju +port = 13306 + +[settings] +rule_file = E:\workingSpace\PycharmProjects\jijv\ma_supplier_info.xlsx diff --git a/materialSite/data_comparison_update.py b/materialSite/data_comparison_update.py new file mode 100644 index 0000000..b1d875f --- /dev/null +++ b/materialSite/data_comparison_update.py @@ -0,0 +1,325 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine, text +from urllib.parse import quote_plus +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + + +class DataComparisonUpdater: + """数据对比和更新器""" + + def __init__(self): + self.source_engine = source_engine + self.target_engine = target_engine + self.update_rules = {} # 存储更新规则 + self.comparison_results = [] # 存储对比结果 + + def set_update_rules(self, rules): + """ + 设置更新规则 + rules: 字典格式,包含表名、条件、要更新的字段等 + 示例: + { + 'table_name': 'ma_supplier_info', + 'source_table': 'bm_supplier', + 'join_conditions': {'supplier': 'NAME'}, + 'update_fields': ['address', 'phone'], + 'where_conditions': {'status': 'active'} + } + """ + self.update_rules = rules + logger.info(f"已设置更新规则: {rules}") + + def query_target_data(self, table_name, columns=None, conditions=None): + """ + 从目标数据库查询数据 + + Args: + table_name: 表名 + columns: 要查询的列,默认为所有列 + conditions: 查询条件,字典格式 + + Returns: + DataFrame: 查询结果 + """ + try: + # 构建查询SQL + if columns: + columns_str = ', '.join(columns) + else: + columns_str = '*' + + sql = f"SELECT {columns_str} FROM {table_name}" + + # 添加WHERE条件 + if conditions: + where_clauses = [] + for key, value in conditions.items(): + if isinstance(value, str): + where_clauses.append(f"{key} = '{value}'") + else: + where_clauses.append(f"{key} = {value}") + + if where_clauses: + sql += " WHERE " + " AND ".join(where_clauses) + + logger.info(f"执行目标数据库查询: {sql}") + df = pd.read_sql(sql, self.target_engine) + logger.info(f"查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询目标数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def query_source_data(self, table_name, columns=None, conditions=None): + """ + 从源数据库查询数据 + + Args: + table_name: 表名 + columns: 要查询的列,默认为所有列 + conditions: 查询条件,字典格式 + + Returns: + DataFrame: 查询结果 + """ + try: + # 构建查询SQL + if columns: + columns_str = ', '.join(columns) + else: + columns_str = '*' + + sql = f"SELECT {columns_str} FROM {table_name}" + + # 添加WHERE条件 + if conditions: + where_clauses = [] + for key, value in conditions.items(): + if isinstance(value, str): + where_clauses.append(f"{key} = '{value}'") + else: + where_clauses.append(f"{key} = {value}") + + if where_clauses: + sql += " WHERE " + " AND ".join(where_clauses) + + logger.info(f"执行源数据库查询: {sql}") + df = pd.read_sql(sql, self.source_engine) + logger.info(f"查询到 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"查询源数据库时发生错误: {str(e)}") + return pd.DataFrame() + + def compare_data(self, target_df, source_df, join_columns): + """ + 对比目标数据库和源数据库的数据 + + Args: + target_df: 目标数据库数据 + source_df: 源数据库数据 + join_columns: 用于关联的列名,字典格式 {'target_col': 'source_col'} + + Returns: + DataFrame: 对比结果 + """ + try: + # 重命名源数据库列名以便关联 + source_df_renamed = source_df.copy() + for target_col, source_col in join_columns.items(): + if source_col in source_df_renamed.columns: + source_df_renamed = source_df_renamed.rename(columns={source_col: target_col}) + + # 执行关联 + merged_df = pd.merge(target_df, source_df_renamed, + on=list(join_columns.keys()), + how='inner', + suffixes=('_target', '_source')) + + logger.info(f"数据对比完成,找到 {len(merged_df)} 条匹配记录") + return merged_df + + except Exception as e: + logger.error(f"数据对比时发生错误: {str(e)}") + return pd.DataFrame() + + def update_source_data(self, comparison_df, update_fields, table_name): + """ + 根据对比结果更新源数据库 + + Args: + comparison_df: 对比结果数据 + update_fields: 要更新的字段列表 + table_name: 要更新的表名 + + Returns: + int: 更新的记录数 + """ + try: + update_count = 0 + + with self.source_engine.connect() as conn: + for _, row in comparison_df.iterrows(): + # 构建UPDATE语句 + set_clauses = [] + for field in update_fields: + target_field = f"{field}_target" + if target_field in row and pd.notna(row[target_field]): + if isinstance(row[target_field], str): + set_clauses.append(f"{field} = '{row[target_field]}'") + else: + set_clauses.append(f"{field} = {row[target_field]}") + + if set_clauses: + # 构建WHERE条件(使用主键或唯一标识) + where_conditions = [] + for key in comparison_df.columns: + if key.endswith('_source') and not key.endswith('_target'): + if pd.notna(row[key]): + if isinstance(row[key], str): + where_conditions.append(f"{key.replace('_source', '')} = '{row[key]}'") + else: + where_conditions.append(f"{key.replace('_source', '')} = {row[key]}") + + if where_conditions: + sql = f"UPDATE {table_name} SET {', '.join(set_clauses)} WHERE {' AND '.join(where_conditions)}" + logger.debug(f"执行更新SQL: {sql}") + + result = conn.execute(text(sql)) + update_count += result.rowcount + + conn.commit() + + logger.info(f"成功更新 {update_count} 条记录") + return update_count + + except Exception as e: + logger.error(f"更新源数据库时发生错误: {str(e)}") + return 0 + + def execute_comparison_update(self): + """ + 执行完整的数据对比和更新流程 + """ + if not self.update_rules: + logger.error("未设置更新规则,请先调用 set_update_rules()") + return False + + try: + rules = self.update_rules + + # 1. 查询目标数据库数据 + target_df = self.query_target_data( + rules['table_name'], + rules.get('target_columns'), + rules.get('target_conditions') + ) + + if target_df.empty: + logger.warning("目标数据库查询结果为空") + return False + + # 2. 查询源数据库数据 + source_df = self.query_source_data( + rules['source_table'], + rules.get('source_columns'), + rules.get('source_conditions') + ) + + if source_df.empty: + logger.warning("源数据库查询结果为空") + return False + + # 3. 对比数据 + comparison_df = self.compare_data( + target_df, + source_df, + rules['join_conditions'] + ) + + if comparison_df.empty: + logger.warning("没有找到匹配的数据") + return False + + # 4. 更新源数据库 + update_count = self.update_source_data( + comparison_df, + rules['update_fields'], + rules['source_table'] + ) + + logger.info(f"数据对比和更新完成,共更新 {update_count} 条记录") + return True + + except Exception as e: + logger.error(f"执行数据对比更新时发生错误: {str(e)}") + return False + + +def main(): + """主函数 - 示例用法""" + updater = DataComparisonUpdater() + + # 示例:设置更新规则 + # 这里您需要根据具体需求设置规则 + example_rules = { + 'table_name': 'ma_supplier_info', # 目标表 + 'source_table': 'bm_supplier', # 源表 + 'join_conditions': {'supplier': 'NAME'}, # 关联条件 + 'update_fields': ['address', 'phone'], # 要更新的字段 + 'target_conditions': {'status': 'active'}, # 目标表查询条件 + 'source_conditions': None, # 源表查询条件 + 'target_columns': ['supplier', 'address', 'phone'], # 目标表查询列 + 'source_columns': ['NAME', 'ADDRESS', 'PHONE_NUMBER'] # 源表查询列 + } + + # 设置更新规则 + updater.set_update_rules(example_rules) + + # 执行对比和更新 + success = updater.execute_comparison_update() + + if success: + print("✅ 数据对比和更新完成") + else: + print("❌ 数据对比和更新失败") + + +if __name__ == "__main__": + main() \ No newline at end of file