首次提交
This commit is contained in:
parent
2768ef835a
commit
7d85515b2e
|
|
@ -0,0 +1,129 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_agreement(config_file_path):
|
||||
"""
|
||||
从源数据库提取ba_ma_agreement数据,转换后加载到目标数据库bm_agreement_info
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
# 定义替换规则
|
||||
is_slt_mapping = {
|
||||
0: 1, # 已结算 → 1
|
||||
1: 0 # 未结算 → 0
|
||||
}
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤COMPANY_ID=1的记录)
|
||||
print("正在从源表ba_ma_agreement读取数据...")
|
||||
source_query = """
|
||||
SELECT ID, \
|
||||
CODE, \
|
||||
SIGN_DATE, \
|
||||
LEASE_COMPANY, \
|
||||
PROJECT, \
|
||||
START_TIME,
|
||||
AUTHORIZING_PERSON, \
|
||||
AUTHORIZING_PHONE, \
|
||||
CONTRACT_NUMBER,
|
||||
IS_BALANCE, \
|
||||
is_push, CREATE_TIME
|
||||
FROM ba_ma_agreement
|
||||
WHERE COMPANY_ID = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 直接复制字段
|
||||
target_df['agreement_id'] = source_df['ID']
|
||||
target_df['agreement_code'] = source_df['CODE']
|
||||
target_df['sign_time'] = source_df['SIGN_DATE']
|
||||
target_df['unit_id'] = source_df['LEASE_COMPANY']
|
||||
target_df['project_id'] = source_df['PROJECT']
|
||||
target_df['plan_start_time'] = source_df['START_TIME']
|
||||
target_df['auth_person'] = source_df['AUTHORIZING_PERSON']
|
||||
target_df['phone'] = source_df['AUTHORIZING_PHONE']
|
||||
target_df['contract_code'] = source_df['CONTRACT_NUMBER']
|
||||
target_df['is_push'] = source_df['is_push']
|
||||
target_df['create_time'] = source_df['CREATE_TIME']
|
||||
|
||||
# 替换字段(IS_BALANCE → is_slt)
|
||||
target_df['is_slt'] = source_df['IS_BALANCE'].map(is_slt_mapping)
|
||||
|
||||
# 检查未映射的值
|
||||
if target_df['is_slt'].isna().any():
|
||||
unmapped = source_df[target_df['is_slt'].isna()]['IS_BALANCE'].unique()
|
||||
print(f"警告: 发现未映射的IS_BALANCE值: {unmapped}")
|
||||
|
||||
# 写入目标表
|
||||
print("正在写入目标表bm_agreement_info...")
|
||||
target_df.to_sql(
|
||||
'bm_agreement_info',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'agreement_id': sqlalchemy.types.INTEGER(),
|
||||
'agreement_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'sign_time': sqlalchemy.types.DateTime(),
|
||||
'unit_id': sqlalchemy.types.INTEGER(),
|
||||
'project_id': sqlalchemy.types.INTEGER(),
|
||||
'plan_start_time': sqlalchemy.types.DateTime(),
|
||||
'auth_person': sqlalchemy.types.VARCHAR(length=50),
|
||||
'phone': sqlalchemy.types.VARCHAR(length=20),
|
||||
'contract_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'is_slt': sqlalchemy.types.SmallInteger(),
|
||||
'is_push': sqlalchemy.types.SmallInteger(),
|
||||
'create_time': sqlalchemy.types.DateTime()
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(target_df)}条数据")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径(根据实际情况修改)
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_agreement(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,150 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""
|
||||
从配置文件中构建数据库连接字符串
|
||||
:param config: 配置解析器对象
|
||||
:param section: 配置节名(source_db或target_db)
|
||||
:return: 数据库连接字符串
|
||||
"""
|
||||
db_type = 'mysql' # 假设使用MySQL数据库
|
||||
driver = 'pymysql' # MySQL驱动
|
||||
|
||||
return f"{db_type}+{driver}://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_bm_project(config_file_path):
|
||||
"""
|
||||
从源数据库提取bm_project数据,转换后加载到目标数据库
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
# 定义替换映射
|
||||
imp_unit_mapping = {
|
||||
1: 327, # 送电一分公司 → 327
|
||||
2: 102, # 送电二分公司 → 102
|
||||
3: 309, # 宏源变电工程处 → 309
|
||||
5: 338, # 土建分公司 → 338
|
||||
8: 309, # 宏源送电工程处 → 309
|
||||
9: 100, # 变电分公司 → 100
|
||||
10: 101, # 机具(物流)分公司 → 101
|
||||
11: 345, # 外部往来单位 → 345
|
||||
12: 344, # 机械化分公司 → 344
|
||||
13: 346, # 运检分公司 → 346
|
||||
15: 340, # 安徽顺全电力工程有限公司 → 340
|
||||
16: 337, # 检修试验分公司 → 337
|
||||
17: 339, # 安徽顺安电网建设有限公司 → 339
|
||||
18: 342, # 公司机关 → 342
|
||||
21: 341 # 班组管理中心 → 341
|
||||
}
|
||||
|
||||
pro_type_mapping = {
|
||||
1: 0, # 线路工程 → 0
|
||||
2: 1, # 变电工程 → 1
|
||||
3: 2, # 业务工程 → 2
|
||||
4: 3 # 其他工程 → 3
|
||||
}
|
||||
|
||||
try:
|
||||
# 获取数据库连接字符串
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
|
||||
# 创建数据库引擎
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源数据库读取数据
|
||||
print("正在从源数据库读取bm_project表数据...")
|
||||
source_query = """
|
||||
SELECT ID, NAME, NUM, PRO_ID, HTZT, time, COMPANY_ID, TYPE_ID, COMPANY, IS_ACTIVE
|
||||
FROM bm_project
|
||||
WHERE COMPANY = 1 AND IS_ACTIVE = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
target_df['pro_id'] = source_df['ID'] # 复制ID → pro_id
|
||||
target_df['pro_name'] = source_df['NAME'] # 复制NAME → pro_name
|
||||
target_df['pro_code'] = source_df['NUM'] # 复制NUM → pro_code
|
||||
target_df['external_id'] = source_df['PRO_ID'] # 复制PRO_ID → external_id
|
||||
target_df['contract_part'] = source_df['HTZT'] # 复制HTZT → contract_part
|
||||
target_df['create_time'] = source_df['time'] # 复制time → create_time
|
||||
|
||||
# 替换COMPANY_ID → imp_unit
|
||||
target_df['imp_unit'] = source_df['COMPANY_ID'].map(imp_unit_mapping)
|
||||
|
||||
# 替换TYPE_ID → pro_type_id
|
||||
target_df['pro_type_id'] = source_df['TYPE_ID'].map(pro_type_mapping)
|
||||
|
||||
# 检查是否有未映射的值
|
||||
if target_df['imp_unit'].isna().any():
|
||||
unmapped_units = source_df[target_df['imp_unit'].isna()]['COMPANY_ID'].unique()
|
||||
print(f"警告: 发现未映射的COMPANY_ID值: {unmapped_units}")
|
||||
|
||||
if target_df['pro_type_id'].isna().any():
|
||||
unmapped_types = source_df[target_df['pro_type_id'].isna()]['TYPE_ID'].unique()
|
||||
print(f"警告: 发现未映射的TYPE_ID值: {unmapped_types}")
|
||||
|
||||
# 写入目标数据库
|
||||
print("正在将数据写入目标数据库...")
|
||||
target_df.to_sql(
|
||||
'bm_project',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'pro_id': sqlalchemy.types.INTEGER(),
|
||||
'pro_name': sqlalchemy.types.VARCHAR(length=255),
|
||||
'pro_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'external_id': sqlalchemy.types.VARCHAR(length=50),
|
||||
'contract_part': sqlalchemy.types.VARCHAR(length=50),
|
||||
'create_time': sqlalchemy.types.DateTime(),
|
||||
'imp_unit': sqlalchemy.types.INTEGER(),
|
||||
'pro_type_id': sqlalchemy.types.INTEGER()
|
||||
}
|
||||
)
|
||||
|
||||
print(f"成功写入{len(target_df)}条数据到目标数据库")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
# 关闭数据库连接
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini" # 假设配置文件在当前目录下
|
||||
|
||||
try:
|
||||
# 执行转换
|
||||
transform_and_load_bm_project(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine, text
|
||||
import configparser
|
||||
import os
|
||||
from datetime import datetime
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_qrcode(config_file_path):
|
||||
"""
|
||||
从源数据库提取qr_code数据,转换后加载到目标数据库bm_qrcode_info
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤COMPANY_ID=1的记录)
|
||||
print("正在从源表qr_code读取数据...")
|
||||
source_query = """
|
||||
SELECT code, ma_model, vender, is_bind, gen_month, nullif(task_id, '') as task_id
|
||||
FROM bm_qrcode
|
||||
WHERE COMPANY_ID = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 字段映射(源字段 → 目标字段)
|
||||
field_mapping = {
|
||||
'code': 'qr_code',
|
||||
'ma_model': 'type_id',
|
||||
'vender': 'supplier_id',
|
||||
'is_bind': 'is_bind',
|
||||
'gen_month': 'create_time',
|
||||
'task_id': 'task_id'
|
||||
}
|
||||
|
||||
# 复制字段
|
||||
for source_field, target_field in field_mapping.items():
|
||||
target_df[target_field] = source_df[source_field]
|
||||
|
||||
# 检查数据质量
|
||||
print("\n数据质量检查:")
|
||||
print(f"- 空qr_code记录: {target_df['qr_code'].isna().sum()}")
|
||||
print(f"- 空type_id记录: {target_df['type_id'].isna().sum()}")
|
||||
|
||||
# 写入目标表(使用connection避免重复记录问题)
|
||||
print("\n正在写入目标表bm_qrcode_info...")
|
||||
with target_engine.begin() as conn:
|
||||
# 先检查并删除可能重复的qr_code(根据业务需求决定是否保留)
|
||||
existing_codes = pd.read_sql(
|
||||
"SELECT qr_code FROM bm_qrcode_info",
|
||||
conn
|
||||
)['qr_code'].tolist()
|
||||
|
||||
new_records = target_df[~target_df['qr_code'].isin(existing_codes)]
|
||||
dup_count = len(target_df) - len(new_records)
|
||||
if dup_count > 0:
|
||||
print(f"发现{dup_count}条重复qr_code记录,将自动跳过")
|
||||
|
||||
if not new_records.empty:
|
||||
new_records.to_sql(
|
||||
'bm_qrcode_info',
|
||||
conn,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'qr_code': sqlalchemy.types.VARCHAR(length=100),
|
||||
'type_id': sqlalchemy.types.INTEGER(),
|
||||
'supplier_id': sqlalchemy.types.INTEGER(),
|
||||
'is_bind': sqlalchemy.types.SmallInteger(),
|
||||
'create_time': sqlalchemy.types.DateTime(),
|
||||
'task_id': sqlalchemy.types.VARCHAR(length=50)
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(new_records)}条新数据")
|
||||
else:
|
||||
print("没有新数据需要写入")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_qrcode(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,142 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""
|
||||
从配置文件中构建数据库连接字符串
|
||||
:param config: 配置解析器对象
|
||||
:param section: 配置节名(source_db或target_db)
|
||||
:return: 数据库连接字符串
|
||||
"""
|
||||
db_type = 'mysql' # 假设使用MySQL数据库,可根据实际情况修改
|
||||
driver = 'pymysql' # MySQL驱动
|
||||
|
||||
return f"{db_type}+{driver}://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_bm_unit(config_file_path):
|
||||
"""
|
||||
从源数据库提取bm_unit数据,转换后加载到目标数据库
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
# 定义替换映射
|
||||
type_id_mapping = {
|
||||
1: 36, # 项目部 → 36
|
||||
2: 33, # 施工队 → 33
|
||||
3: 32, # 分包商 → 32
|
||||
4: 1685, # 后勤科室 → 1685
|
||||
5: 1704, # 外单位 → 1704
|
||||
6: 1706 # 修试部门 → 1706
|
||||
}
|
||||
|
||||
dept_id_mapping = {
|
||||
1: 327, # 送电一分公司 → 327
|
||||
2: 102, # 送电二分公司 → 102
|
||||
3: 309, # 宏源变电工程处 → 309
|
||||
5: 338, # 土建分公司 → 338
|
||||
8: 309, # 宏源送电工程处 → 309
|
||||
9: 100, # 变电分公司 → 100
|
||||
10: 101, # 机具(物流)分公司 → 101
|
||||
11: 345, # 外部往来单位 → 345
|
||||
12: 344, # 机械化分公司 → 344
|
||||
13: 346, # 运检分公司 → 346
|
||||
15: 340, # 安徽顺全电力工程有限公司 → 340
|
||||
16: 337, # 检修试验分公司 → 337
|
||||
17: 339, # 安徽顺安电网建设有限公司 → 339
|
||||
18: 342, # 公司机关 → 342
|
||||
21: 341 # 班组管理中心 → 341
|
||||
}
|
||||
|
||||
try:
|
||||
# 获取数据库连接字符串
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
|
||||
# 创建数据库引擎
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源数据库读取数据
|
||||
print("正在从源数据库读取bm_unit表数据...")
|
||||
source_query = """
|
||||
SELECT ID, NAME, TYPE_ID, COMPANY_ID, time, COMPANY, IS_ACTIVE
|
||||
FROM bm_unit
|
||||
WHERE COMPANY = 1 AND IS_ACTIVE = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
target_df['unit_id'] = source_df['ID']
|
||||
target_df['unit_name'] = source_df['NAME']
|
||||
target_df['type_id'] = source_df['TYPE_ID'].map(type_id_mapping)
|
||||
target_df['dept_id'] = source_df['COMPANY_ID'].map(dept_id_mapping)
|
||||
target_df['create_time'] = source_df['time']
|
||||
|
||||
# 检查是否有未映射的值
|
||||
if target_df['type_id'].isna().any():
|
||||
unmapped_types = source_df[target_df['type_id'].isna()]['TYPE_ID'].unique()
|
||||
print(f"警告: 发现未映射的TYPE_ID值: {unmapped_types}")
|
||||
|
||||
if target_df['dept_id'].isna().any():
|
||||
unmapped_depts = source_df[target_df['dept_id'].isna()]['COMPANY_ID'].unique()
|
||||
print(f"警告: 发现未映射的COMPANY_ID值: {unmapped_depts}")
|
||||
|
||||
# 写入目标数据库
|
||||
print("正在将数据写入目标数据库...")
|
||||
target_df.to_sql(
|
||||
'bm_unit',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'unit_id': sqlalchemy.types.INTEGER(),
|
||||
'unit_name': sqlalchemy.types.VARCHAR(length=255),
|
||||
'type_id': sqlalchemy.types.INTEGER(),
|
||||
'dept_id': sqlalchemy.types.INTEGER(),
|
||||
'create_time': sqlalchemy.types.DateTime()
|
||||
}
|
||||
)
|
||||
|
||||
print(f"成功写入{len(target_df)}条数据到目标数据库")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
# 关闭数据库连接
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini" # 假设配置文件在当前目录下
|
||||
|
||||
try:
|
||||
# 执行转换
|
||||
transform_and_load_bm_unit(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
[source_db]
|
||||
host = 192.168.1.114
|
||||
user = boot
|
||||
password = boot@123
|
||||
database = newimt
|
||||
port = 13306
|
||||
|
||||
[target_db]
|
||||
host = 192.168.1.114
|
||||
user = boot
|
||||
password = boot@123
|
||||
database = test_jiju
|
||||
port = 13306
|
||||
|
||||
[settings]
|
||||
rule_file = C:\Users\bonus-lvjilong\Desktop\机具系统数据迁移\ma_supplier_info.xlsx
|
||||
|
|
@ -0,0 +1,170 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine,text
|
||||
import configparser
|
||||
import os
|
||||
from datetime import datetime
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_ma_machines(config_file_path):
|
||||
"""
|
||||
从源数据库提取ma_machines数据,转换后加载到目标数据库ma_machine
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
# 定义状态替换映射
|
||||
status_mapping = {
|
||||
1: 0, # 待通知 → 0
|
||||
2: 0, # 待检验 → 0
|
||||
3: 0, # 待打印 → 0
|
||||
4: 0, # 待入库 → 0
|
||||
5: 1, # 在库 → 1
|
||||
6: 2, # 在用 → 2
|
||||
7: 3, # 在修 → 3
|
||||
8: 3, # 在试 → 3
|
||||
9: 5, # 修试后待入库 → 5
|
||||
10: 7, # 待报废 → 7
|
||||
11: 8, # 已报废 → 8
|
||||
12: 7, # 报废封存 → 7
|
||||
13: 4, # 在检 → 4
|
||||
14: 4, # 在审 → 4
|
||||
15: None, # 移交 → NULL(根据业务需求处理)
|
||||
16: 17, # 报废检验 → 17
|
||||
17: 17, # 封存检验 → 17
|
||||
18: 18, # 报备丢失 → 18
|
||||
19: 18 # 结算丢失 → 18
|
||||
}
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤COMPANY_ID=1的记录)
|
||||
print("正在从源表ma_machines读取数据...")
|
||||
source_query = """
|
||||
SELECT ID, \
|
||||
TYPE, \
|
||||
BATCH_STATUS, \
|
||||
DEVICE_CODE, \
|
||||
OUT_FAC_TIME, \
|
||||
model_name,
|
||||
OUT_FAC_NUM, \
|
||||
VENDER_ID, \
|
||||
QRCODE, \
|
||||
ASSETS_NUM, \
|
||||
THIS_CHECK_TIME,
|
||||
THIS_CHECK_MAN, \
|
||||
NEXT_CHECK_TIME, \
|
||||
CYCLE_NUM
|
||||
FROM ma_machines
|
||||
WHERE COMPANY_ID = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 直接复制字段
|
||||
target_df['ma_id'] = source_df['ID']
|
||||
target_df['type_id'] = source_df['TYPE']
|
||||
target_df['ma_code'] = source_df['DEVICE_CODE']
|
||||
target_df['out_fac_time'] = pd.to_datetime(source_df['OUT_FAC_TIME'], errors='coerce')
|
||||
target_df['machine_name'] = source_df['model_name']
|
||||
target_df['out_fac_code'] = source_df['OUT_FAC_NUM']
|
||||
target_df['ma_vender'] = source_df['VENDER_ID']
|
||||
target_df['qr_code'] = source_df['QRCODE']
|
||||
target_df['assets_code'] = source_df['ASSETS_NUM']
|
||||
target_df['this_check_time'] = pd.to_datetime(source_df['THIS_CHECK_TIME'], errors='coerce')
|
||||
target_df['check_man'] = source_df['THIS_CHECK_MAN']
|
||||
target_df['next_check_time'] = pd.to_datetime(source_df['NEXT_CHECK_TIME'], errors='coerce')
|
||||
target_df['in_out_num'] = source_df['CYCLE_NUM']
|
||||
|
||||
# 替换状态字段
|
||||
target_df['ma_status'] = source_df['BATCH_STATUS'].map(status_mapping)
|
||||
|
||||
# 检查数据质量
|
||||
print("\n数据质量检查:")
|
||||
print(f"- 无效出厂时间记录: {target_df['out_fac_time'].isna().sum()}")
|
||||
print(f"- 无效本次检验时间记录: {target_df['this_check_time'].isna().sum()}")
|
||||
print(f"- 无效下次检验时间记录: {target_df['next_check_time'].isna().sum()}")
|
||||
print(f"- 未映射的状态值: {source_df[target_df['ma_status'].isna()]['BATCH_STATUS'].unique()}")
|
||||
|
||||
# 写入目标表
|
||||
print("\n正在写入目标表ma_machine...")
|
||||
# 步骤1:清空目标表
|
||||
with target_engine.connect() as conn:
|
||||
conn.execute(text("TRUNCATE TABLE ma_machine")) # 注意需要从sqlalchemy导入text
|
||||
conn.commit() # 显式提交事务
|
||||
|
||||
# 步骤2:写入去重后的数据
|
||||
target_df.drop_duplicates(['type_id', 'ma_code']).to_sql(
|
||||
'ma_machine',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False
|
||||
)
|
||||
|
||||
target_df.to_sql(
|
||||
'ma_machine',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'ma_id': sqlalchemy.types.INTEGER(),
|
||||
'type_id': sqlalchemy.types.INTEGER(),
|
||||
'ma_status': sqlalchemy.types.SmallInteger(),
|
||||
'ma_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'out_fac_time': sqlalchemy.types.DateTime(),
|
||||
'machine_name': sqlalchemy.types.VARCHAR(length=100),
|
||||
'out_fac_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'ma_vender': sqlalchemy.types.INTEGER(),
|
||||
'qr_code': sqlalchemy.types.VARCHAR(length=100),
|
||||
'assets_code': sqlalchemy.types.VARCHAR(length=50),
|
||||
'this_check_time': sqlalchemy.types.DateTime(),
|
||||
'check_man': sqlalchemy.types.VARCHAR(length=50),
|
||||
'next_check_time': sqlalchemy.types.DateTime(),
|
||||
'in_out_num': sqlalchemy.types.INTEGER()
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(target_df)}条数据")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_ma_machines(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,133 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine, text
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_part_type(config_file_path):
|
||||
"""
|
||||
从源数据库提取pa_type数据,转换后加载到目标数据库ma_part_type
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤company_id=1且IS_ACTIVE=1的记录)
|
||||
print("正在从源表pa_type读取数据...")
|
||||
source_query = """
|
||||
SELECT id, \
|
||||
PARENT_ID, \
|
||||
NAME, \
|
||||
NUM, \
|
||||
PRICE, \
|
||||
UNIT, \
|
||||
LEVEL, \
|
||||
IS_CONSUMABLES
|
||||
FROM pa_type
|
||||
WHERE company_id = 1 \
|
||||
AND IS_ACTIVE = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 字段映射(源字段 → 目标字段)
|
||||
field_mapping = {
|
||||
'id': 'pa_id',
|
||||
'PARENT_ID': 'parent_id',
|
||||
'NAME': 'pa_name',
|
||||
'NUM': 'storage_num',
|
||||
'PRICE': 'buy_price',
|
||||
'UNIT': 'unit_name',
|
||||
'LEVEL': 'level',
|
||||
'IS_CONSUMABLES': 'is_comsumable'
|
||||
}
|
||||
|
||||
# 复制字段
|
||||
for source_field, target_field in field_mapping.items():
|
||||
target_df[target_field] = source_df[source_field]
|
||||
|
||||
# 检查数据质量
|
||||
print("\n数据质量检查:")
|
||||
print(f"- 空ID记录: {target_df['pa_id'].isna().sum()}")
|
||||
print(f"- 空名称记录: {target_df['pa_name'].isna().sum()}")
|
||||
|
||||
# 写入目标表(使用事务确保原子性)
|
||||
print("\n正在写入目标表ma_part_type...")
|
||||
with target_engine.begin() as conn:
|
||||
# 检查并处理可能的主键冲突
|
||||
existing_ids = pd.read_sql(
|
||||
"SELECT pa_id FROM ma_part_type",
|
||||
conn
|
||||
)['pa_id'].tolist()
|
||||
|
||||
new_records = target_df[~target_df['pa_id'].isin(existing_ids)]
|
||||
dup_count = len(target_df) - len(new_records)
|
||||
if dup_count > 0:
|
||||
print(f"发现{dup_count}条重复主键记录,将自动跳过")
|
||||
print("重复ID示例:", target_df[target_df['pa_id'].isin(existing_ids)]['pa_id'].head(5).tolist())
|
||||
|
||||
if not new_records.empty:
|
||||
new_records.to_sql(
|
||||
'ma_part_type',
|
||||
conn,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'pa_id': sqlalchemy.types.INTEGER(),
|
||||
'parent_id': sqlalchemy.types.INTEGER(),
|
||||
'pa_name': sqlalchemy.types.VARCHAR(length=100),
|
||||
'storage_num': sqlalchemy.types.VARCHAR(length=50),
|
||||
'buy_price': sqlalchemy.types.DECIMAL(10, 2),
|
||||
'unit_name': sqlalchemy.types.VARCHAR(length=20),
|
||||
'level': sqlalchemy.types.SmallInteger(),
|
||||
'is_comsumable': sqlalchemy.types.SmallInteger()
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(new_records)}条新数据")
|
||||
else:
|
||||
print("没有新数据需要写入")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_part_type(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine, text
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_supplier(config_file_path):
|
||||
"""
|
||||
从源数据库提取ma_vender数据,转换后加载到目标数据库ma_supplier_info
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤company_id=1且IS_ACTIVE=1的记录)
|
||||
print("正在从源表ma_vender读取数据...")
|
||||
source_query = """
|
||||
SELECT id, NAME, ADDRESS, COMPANY_MAN, MAIN_PERSON, PHONE, SCOPE_BUSINESS
|
||||
FROM ma_vender
|
||||
WHERE company_id = 1 \
|
||||
AND IS_ACTIVE = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 字段映射(源字段 → 目标字段)
|
||||
field_mapping = {
|
||||
'id': 'supplier_id',
|
||||
'NAME': 'supplier',
|
||||
'ADDRESS': 'address',
|
||||
'COMPANY_MAN': 'legal_person',
|
||||
'MAIN_PERSON': 'primary_contact',
|
||||
'PHONE': 'phone',
|
||||
'SCOPE_BUSINESS': 'business_scope'
|
||||
}
|
||||
|
||||
# 复制字段
|
||||
for source_field, target_field in field_mapping.items():
|
||||
target_df[target_field] = source_df[source_field]
|
||||
|
||||
# 检查数据质量
|
||||
print("\n数据质量检查:")
|
||||
print(f"- 空供应商ID记录: {target_df['supplier_id'].isna().sum()}")
|
||||
print(f"- 空供应商名称记录: {target_df['supplier'].isna().sum()}")
|
||||
print(f"- 无效电话号码记录: {target_df['phone'].str.contains('[^0-9-]').sum()}")
|
||||
|
||||
# 写入目标表(使用事务确保原子性)
|
||||
print("\n正在写入目标表ma_supplier_info...")
|
||||
with target_engine.begin() as conn:
|
||||
# 检查并处理可能的主键冲突
|
||||
existing_ids = pd.read_sql(
|
||||
"SELECT supplier_id FROM ma_supplier_info",
|
||||
conn
|
||||
)['supplier_id'].tolist()
|
||||
|
||||
new_records = target_df[~target_df['supplier_id'].isin(existing_ids)]
|
||||
dup_count = len(target_df) - len(new_records)
|
||||
if dup_count > 0:
|
||||
print(f"发现{dup_count}条重复供应商记录,将自动跳过")
|
||||
print("重复ID示例:",
|
||||
target_df[target_df['supplier_id'].isin(existing_ids)]['supplier_id'].head(3).tolist())
|
||||
|
||||
if not new_records.empty:
|
||||
new_records.to_sql(
|
||||
'ma_supplier_info',
|
||||
conn,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'supplier_id': sqlalchemy.types.INTEGER(),
|
||||
'supplier': sqlalchemy.types.VARCHAR(length=100),
|
||||
'address': sqlalchemy.types.VARCHAR(length=200),
|
||||
'legal_person': sqlalchemy.types.VARCHAR(length=50),
|
||||
'primary_contact': sqlalchemy.types.VARCHAR(length=50),
|
||||
'phone': sqlalchemy.types.VARCHAR(length=20),
|
||||
'business_scope': sqlalchemy.types.TEXT()
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(new_records)}条新供应商数据")
|
||||
else:
|
||||
print("没有新供应商数据需要写入")
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_supplier(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
|
|
@ -0,0 +1,143 @@
|
|||
import pandas as pd
|
||||
import sqlalchemy
|
||||
from sqlalchemy import create_engine
|
||||
import configparser
|
||||
import os
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
def fix_date(date_str):
|
||||
try:
|
||||
# 处理只有年月的情况(如"2018-07")
|
||||
if isinstance(date_str, str) and len(date_str) == 7 and date_str[4] == '-':
|
||||
return datetime.strptime(date_str + '-01', '%Y-%m-%d')
|
||||
return date_str
|
||||
except:
|
||||
return None # 对于无法解析的日期返回NULL
|
||||
|
||||
def get_db_connection_string(config, section):
|
||||
"""从配置文件中构建数据库连接字符串"""
|
||||
return f"mysql+pymysql://{config[section]['user']}:{quote_plus(config[section]['password'])}@" \
|
||||
f"{config[section]['host']}:{config[section]['port']}/{config[section]['database']}"
|
||||
|
||||
|
||||
def transform_and_load_ma_type(config_file_path):
|
||||
"""
|
||||
从源数据库提取ma_type数据,转换后加载到目标数据库ma_type
|
||||
:param config_file_path: 配置文件路径
|
||||
"""
|
||||
# 读取配置文件
|
||||
if not os.path.exists(config_file_path):
|
||||
raise FileNotFoundError(f"配置文件不存在: {config_file_path}")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_file_path)
|
||||
|
||||
try:
|
||||
# 获取数据库连接
|
||||
source_conn_str = get_db_connection_string(config, 'source_db')
|
||||
target_conn_str = get_db_connection_string(config, 'target_db')
|
||||
source_engine = create_engine(source_conn_str)
|
||||
target_engine = create_engine(target_conn_str)
|
||||
|
||||
# 从源表读取数据(过滤COMPANY_ID=1且IS_ACTIVE=1的记录)
|
||||
print("正在从源表ma_type读取数据...")
|
||||
source_query = """
|
||||
SELECT ID, \
|
||||
PARENT_ID, \
|
||||
NAME, \
|
||||
LEVEL, \
|
||||
UNIT, \
|
||||
NUM, \
|
||||
nullif(BUY_PRICE,0) as BUY_PRICE, \
|
||||
nullif(LEASE_PRICE,0) as LEASE_PRICE, \
|
||||
IS_COUNT, \
|
||||
RATED_LOAD, \
|
||||
TEST_LOAD, \
|
||||
HOLDING_TIME, \
|
||||
manage_type,
|
||||
nullif(IS_TEST,0) as IS_TEST,
|
||||
nullif(TIME,0) as TIME
|
||||
FROM ma_type
|
||||
WHERE COMPANY_ID = 1 AND IS_ACTIVE = 1 \
|
||||
"""
|
||||
source_df = pd.read_sql(source_query, source_engine)
|
||||
|
||||
if source_df.empty:
|
||||
print("没有符合条件的数据需要转换")
|
||||
return
|
||||
print(f"读取到{len(source_df)}条待转换数据")
|
||||
|
||||
# 数据转换(直接字段复制)
|
||||
print("正在进行数据转换...")
|
||||
target_df = pd.DataFrame()
|
||||
|
||||
# 字段映射(源字段 → 目标字段)
|
||||
field_mapping = {
|
||||
'ID': 'type_id',
|
||||
'PARENT_ID': 'parent_id',
|
||||
'NAME': 'type_name',
|
||||
'LEVEL': 'level',
|
||||
'UNIT': 'unit_name',
|
||||
'NUM': 'storage_num',
|
||||
'BUY_PRICE': 'buy_price',
|
||||
'LEASE_PRICE': 'lease_price',
|
||||
'IS_COUNT': 'manage_type',
|
||||
'RATED_LOAD': 'rated_load',
|
||||
'TEST_LOAD': 'test_load',
|
||||
'HOLDING_TIME': 'holding_time',
|
||||
'manage_type': 'is_enter',
|
||||
'IS_TEST': 'is_test',
|
||||
'TIME': 'create_time'
|
||||
}
|
||||
|
||||
for source_field, target_field in field_mapping.items():
|
||||
target_df[target_field] = source_df[source_field]
|
||||
# 应用日期修正
|
||||
target_df['create_time'] = source_df['TIME'].apply(fix_date)
|
||||
# 写入目标表
|
||||
print("正在写入目标表ma_type...")
|
||||
target_df.to_sql(
|
||||
'ma_type',
|
||||
target_engine,
|
||||
if_exists='append',
|
||||
index=False,
|
||||
dtype={
|
||||
'type_id': sqlalchemy.types.INTEGER(),
|
||||
'parent_id': sqlalchemy.types.INTEGER(),
|
||||
'type_name': sqlalchemy.types.VARCHAR(length=100),
|
||||
'level': sqlalchemy.types.SmallInteger(),
|
||||
'unit_name': sqlalchemy.types.VARCHAR(length=20),
|
||||
'storage_num': sqlalchemy.types.VARCHAR(length=50),
|
||||
'buy_price': sqlalchemy.types.DECIMAL(10, 2),
|
||||
'lease_price': sqlalchemy.types.DECIMAL(10, 2),
|
||||
'manage_type': sqlalchemy.types.SmallInteger(),
|
||||
'rated_load': sqlalchemy.types.DECIMAL(10, 2),
|
||||
'test_load': sqlalchemy.types.DECIMAL(10, 2),
|
||||
'holding_time': sqlalchemy.types.INTEGER(),
|
||||
'is_enter': sqlalchemy.types.SmallInteger(),
|
||||
'is_test': sqlalchemy.types.SmallInteger(),
|
||||
'create_time': sqlalchemy.types.DateTime()
|
||||
}
|
||||
)
|
||||
print(f"成功写入{len(target_df)}条数据")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程中发生错误: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
if 'source_engine' in locals():
|
||||
source_engine.dispose()
|
||||
if 'target_engine' in locals():
|
||||
target_engine.dispose()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 配置文件路径(根据实际情况修改)
|
||||
config_file = "config.ini"
|
||||
|
||||
try:
|
||||
transform_and_load_ma_type(config_file)
|
||||
except Exception as e:
|
||||
print(f"程序执行失败: {str(e)}")
|
||||
Loading…
Reference in New Issue