首次提交

This commit is contained in:
jiang 2025-08-24 13:53:15 +08:00
parent 1d0135d33b
commit 78e1427c3c
9 changed files with 115 additions and 86 deletions

View File

@ -70,6 +70,7 @@ def process_bm_project_dept():
result['unit_id'] = df['ID'] + 4000
result['unit_name'] = df['NAME']
result['type_id'] = 36 # 项目部固定值
df['COMPANY_ID'] = df['COMPANY_ID'].astype(int)
result['dept_id'] = df['COMPANY_ID'].map(dept_id_mapping)
result['create_time'] = df['CREATE_TIME']
@ -84,6 +85,7 @@ def process_bm_team_info():
result['unit_id'] = df['ID'] + 5000
result['unit_name'] = df['NAME']
result['type_id'] = 33 # 施工队固定值
df['COMPANY_ID'] = df['COMPANY_ID'].astype(int)
result['dept_id'] = df['COMPANY_ID'].map(dept_id_mapping)
result['create_time'] = df['CREATE_TIME']
@ -111,7 +113,7 @@ def process_bm_rear_service():
result['unit_id'] = df['id'] + 7000
result['unit_name'] = df['rear_name']
result['type_id'] = 1685 # 后勤科室固定值
result['dept_id'] = df['company_id'] # 直接复制
result['dept_id'] = df['company_id'].map(dept_id_mapping) # 直接复制
# 注意此表没有create_time字段
return result

View File

@ -9,7 +9,7 @@ port = 13306
host = localhost
user = root
password = Bonus@caiqi0802!
database = test_jiju
database = bns_cloud_material
port = 13306
[settings]

View File

@ -1,6 +1,7 @@
import configparser
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import text
from urllib.parse import quote_plus
# 读取配置文件
config = configparser.ConfigParser()
@ -66,7 +67,7 @@ def process_machine_types():
# 按照规则转换数据
result = pd.DataFrame()
result['type_id'] = df['ID'] + 6000 # ID加6000
result['parent_id'] = df['PARENT_ID']
result['parent_id'] = df['PARENT_ID'] + 6000
result['type_name'] = df['NAME']
result['level'] = df['LEVEL']
result['unit_name'] = df['UNIT_ID'].map(unit_mapping) # 单位映射
@ -97,12 +98,16 @@ def process_machine_types():
# 更新现有库存和租赁单价根据type_id
with target_engine.connect() as conn:
# 更新storage_num
conn.execute("""
UPDATE ma_type t, ma_machine_type s
# 更新 storage_num
conn.execute(text(f"""
UPDATE {target_config['database']}.ma_type t
JOIN {source_config['database']}.ma_machine_type s
ON t.type_id = s.ID + 6000
SET t.storage_num = s.STORAGE_NUM,
t.lease_price = s.LEASE_PRICE
WHERE t.type_id = s.ID + 6000
""")
t.lease_price = s.RENTAL_PRICE
"""))
conn.commit()
print("已更新现有库存和租赁单价")
return True

View File

@ -45,7 +45,6 @@ UNIT_TYPE_MAPPING = {
def process_tm_task():
"""处理任务表数据"""
try:
# 执行转换SQL
sql = """
SELECT
(be.ID + 500000) as task_id,
@ -63,12 +62,9 @@ def process_tm_task():
"""
df = pd.read_sql(sql, source_engine)
# 写入目标表
df.to_sql('tm_task', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条任务数据到tm_task表")
return True
except Exception as e:
print(f"处理任务数据时出错: {str(e)}")
return False
@ -77,7 +73,6 @@ def process_tm_task():
def process_tm_task_agreement():
"""处理任务协议关联表数据"""
try:
# 第一步:查询基础数据
base_sql = """
SELECT
(be.ID + 500000) as task_id,
@ -95,58 +90,55 @@ def process_tm_task_agreement():
"""
base_df = pd.read_sql(base_sql, source_engine)
# 准备存储结果
results = []
skipped_records = 0
# 对每条记录处理
for _, row in base_df.iterrows():
try:
# 确定unit_id和type_code
# 转换字段为整数(遇到非数字自动变 NaN -> 转成 None
dept_id = pd.to_numeric(row['DEPT_ID'], errors='coerce')
sub_id = pd.to_numeric(row['SUB_ID'], errors='coerce')
team_id = pd.to_numeric(row['TEAM_ID'], errors='coerce')
rear_id = pd.to_numeric(row['REAR_ID'], errors='coerce')
project_id = pd.to_numeric(row['PROJECT_ID'], errors='coerce')
unit_id = None
type_code = None
if row['TYPE'] == 6: # 施工队
if row['TEAM_ID'] > 0:
unit_id = int(row['TEAM_ID'])
type_code = 'sgd'
elif row['TYPE'] == 7: # 项目部
if row['DEPT_ID'] > 0:
unit_id = int(row['DEPT_ID'])
type_code = 'xmb'
elif row['TYPE'] == 122: # 分包商
if row['SUB_ID'] > 0:
unit_id = int(row['SUB_ID'])
type_code = 'fbs'
elif row['TYPE'] == 126: # 后勤
if row['REAR_ID'] > 0:
unit_id = int(row['REAR_ID'])
type_code = 'hq'
if row['TYPE'] == 6 and pd.notna(team_id) and team_id > 0:
unit_id = int(team_id)
type_code = 'sgd'
elif row['TYPE'] == 7 and pd.notna(dept_id) and dept_id > 0:
unit_id = int(dept_id)
type_code = 'xmb'
elif row['TYPE'] == 122 and pd.notna(sub_id) and sub_id > 0:
unit_id = int(sub_id)
type_code = 'fbs'
elif row['TYPE'] == 126 and pd.notna(rear_id) and rear_id > 0:
unit_id = int(rear_id)
type_code = 'hq'
# 检查必要参数是否有效
if not unit_id or not type_code or row['PROJECT_ID'] <= 0:
if not unit_id or not type_code or pd.isna(project_id) or project_id <= 0:
skipped_records += 1
continue
# 使用参数化查询避免字符串拼接问题
agreement_id = get_agreement_id(unit_id, int(row['PROJECT_ID']), type_code)
agreement_id = get_agreement_id(unit_id, int(project_id), type_code)
if agreement_id:
if agreement_id:
results.append({
'task_id': int(row['task_id']),
'agreement_id': int(agreement_id)
})
results.append({
'task_id': int(row['task_id']),
'agreement_id': int(agreement_id)
})
except Exception as e:
print(f"查询agreement_id时出错: {str(e)}")
continue
# 转换为DataFrame并写入
if results:
result_df = pd.DataFrame(results)
result_df.to_sql('tm_task_agreement', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(result_df)} 条任务协议关联数据到tm_task_agreement表")
return True
else:
print("没有找到匹配的任务协议关联数据")
print(f"没有找到匹配的任务协议关联数据,跳过 {skipped_records} 条记录")
return False
except Exception as e:
@ -175,7 +167,6 @@ def get_agreement_id(unit_id, project_id, type_code):
def process_lease_apply_info():
"""处理租赁申请信息表数据"""
try:
# 执行转换SQL
sql = """
SELECT
(be.ID + 500000) as id,
@ -194,13 +185,9 @@ def process_lease_apply_info():
be.DEFINITION_ID = 1
"""
df = pd.read_sql(sql, source_engine)
# 写入目标表
df.to_sql('lease_apply_info', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条租赁申请信息数据")
return True
except Exception as e:
print(f"处理租赁申请信息时出错: {str(e)}")
return False
@ -209,7 +196,6 @@ def process_lease_apply_info():
def process_lease_apply_details():
"""处理租赁申请明细数据"""
try:
# 执行转换SQL
sql = """
SELECT
(be.ID + 500000) as parent_id,
@ -227,13 +213,9 @@ def process_lease_apply_details():
bpf.ID
"""
df = pd.read_sql(sql, source_engine)
# 写入目标表
df.to_sql('lease_apply_details', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条租赁申请明细数据")
return True
except Exception as e:
print(f"处理租赁申请明细时出错: {str(e)}")
return False
@ -242,7 +224,6 @@ def process_lease_apply_details():
def process_lease_out_details():
"""处理租赁出库明细数据"""
try:
# 执行转换SQL
sql = """
SELECT
(bfd.EXAMPLE_ID + 500000) as parent_id,
@ -256,20 +237,15 @@ def process_lease_out_details():
LEFT JOIN pm_user pu on bfd.CREATOR = pu.ID
"""
df = pd.read_sql(sql, source_engine)
# 写入目标表
df.to_sql('lease_out_details', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条租赁出库明细数据")
return True
except Exception as e:
print(f"处理租赁出库明细时出错: {str(e)}")
return False
if __name__ == "__main__":
# 执行所有转换流程
results = [
process_tm_task(),
process_tm_task_agreement(),

View File

@ -78,7 +78,8 @@ def transform_and_load_agreement(config_file_path):
target_df['create_time'] = source_df['CREATE_TIME']
# 替换字段IS_BALANCE → is_slt
target_df['is_slt'] = source_df['IS_BALANCE'].map(is_slt_mapping)
# 替换字段IS_BALANCE → is_slt
target_df['is_slt'] = source_df['IS_BALANCE'].astype(int).map(is_slt_mapping)
# 检查未映射的值
if target_df['is_slt'].isna().any():

View File

@ -1,16 +1,15 @@
[source_db]
host = 192.168.1.114
user = boot
password = boot@123
host = localhost
user = root
password = Bonus@caiqi0802!
database = newimt
port = 13306
[target_db]
host = 192.168.1.114
user = boot
password = boot@123
database = test_jiju
host = 10.138.55.133
user = prod_jiju
password = bns@sbdahjj2025
database = bns_jiju_material
port = 13306
[settings]
rule_file = C:\Users\bonus-lvjilong\Desktop\机具系统数据迁移\ma_supplier_info.xlsx

View File

@ -80,7 +80,7 @@ def process_purchase_check_info():
LEFT JOIN tm_task tt on bms.ID = tt.ID
LEFT JOIN tm_task_status tts on tt.`STATUS` = tts.`CODE`
LEFT JOIN tm_task_ma_type ttmt on bms.ID = ttmt.TASK_ID
WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW()
WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() AND ttmt.MA_TYPE_ID IS NOT NULL
GROUP BY bms.ID
"""
df = pd.read_sql(sql, source_engine)
@ -116,7 +116,7 @@ def process_purchase_check_details():
ba_ma_shop bms
LEFT JOIN tm_task tt on bms.ID = tt.ID
LEFT JOIN tm_task_ma_type ttmt on bms.ID = ttmt.TASK_ID
WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW()
WHERE tt.CREATE_TIME BETWEEN '2025-01-01' and NOW() AND ttmt.MA_TYPE_ID IS NOT NULL
GROUP BY bms.ID,ttmt.MA_TYPE_ID
"""
df = pd.read_sql(sql, source_engine)

View File

@ -84,7 +84,8 @@ def process_purchase_part_info():
GROUP BY bps.ID
"""
df = pd.read_sql(sql, source_engine)
df['purchase_time'] = df['purchase_time'].replace({'': None})
df['arrival_time'] = df['arrival_time'].replace({'': None})
# 写入目标表
df.to_sql('purchase_part_info', target_engine,
if_exists='append', index=False)
@ -110,7 +111,7 @@ def process_purchase_part_details():
ttmt.MANUFACTURER_ID as supplier_id,
IF(ttmt.MACHINES_NUM - IFNULL(ttmt.INPUT_NUM,0) = 0,1,0) as status,
bps.BUYER as create_by,
tt.CREATE_TIME as as create_time
tt.CREATE_TIME as create_time
FROM
ba_pa_shop bps
LEFT JOIN tm_task tt on bps.ID = tt.ID

View File

@ -106,7 +106,7 @@ def process_repair_apply_details():
SELECT
bmr.ID as task_id,
ttmt.MA_TYPE_ID as type_id,
NULL as ma_id,
null as ma_id,
ttmt.MACHINES_NUM as repair_num,
ttmt.ACTUAL_NUM as repaired_num,
ttmt2.MACHINES_NUM as scrap_num,
@ -121,7 +121,7 @@ def process_repair_apply_details():
LEFT JOIN tm_task_ma_type ttmt on bmr.ID = ttmt.TASK_ID
LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID
LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID
WHERE ttmt.IS_COUNT = 1
WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519)
GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID
"""
@ -147,6 +147,7 @@ FROM
LEFT JOIN ba_ma_repair bmr ON brp.repair_id = bmr.ID
LEFT JOIN tm_task tt ON bmr.ID = tt.ID
LEFT JOIN ma_machines mm ON brp.ma_id = mm.id
WHERE bmr.ID not in(519)
"""
# 来源3编码申请维修含报废
@ -169,22 +170,65 @@ FROM
LEFT JOIN tm_task tt on bmr.ID = tt.ID
LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID
LEFT JOIN ma_machines mm on tmsr.ma_id = mm.id
WHERE tmsr.IS_COUNT = 0
WHERE tmsr.IS_COUNT = 0 and bmr.ID not in(519)
"""
# 合并三个来源的数据
df1 = pd.read_sql(sql1, source_engine)
#df1 = pd.read_sql(sql1, source_engine)
df2 = pd.read_sql(sql2, source_engine)
df3 = pd.read_sql(sql3, source_engine)
df = pd.concat([df1, df2, df3], ignore_index=True)
df = pd.concat([ df2, df3], ignore_index=True)
# 过滤掉type_id为NULL的记录
initial_count = len(df)
df = df[df['type_id'].notna()]
filtered_count = len(df)
if initial_count > filtered_count:
print(f"过滤掉了 {initial_count - filtered_count} 条type_id为NULL的记录")
# 转换数值字段
num_cols = ['repair_num', 'repaired_num', 'scrap_num', 'status', 'is_ds']
for col in num_cols:
df[col] = safe_convert_to_int(df[col])
df.to_sql('repair_apply_details', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条维修申请明细合并3个来源")
# 检查目标表中已存在的记录
check_existing_sql = """
SELECT task_id, ma_id, type_id
FROM repair_apply_details
"""
existing_records = pd.read_sql(check_existing_sql, target_engine)
# 创建唯一标识符进行比较
if not existing_records.empty:
# 为现有记录创建复合键
existing_records['composite_key'] = existing_records['task_id'].astype(str) + '_' + \
existing_records['ma_id'].astype(str) + '_' + \
existing_records['type_id'].astype(str)
# 为新记录创建复合键
df['composite_key'] = df['task_id'].astype(str) + '_' + \
df['ma_id'].astype(str) + '_' + \
df['type_id'].astype(str)
# 过滤掉已存在的记录
before_dedup_count = len(df)
df = df[~df['composite_key'].isin(existing_records['composite_key'])]
after_dedup_count = len(df)
# 移除临时列
df = df.drop(columns=['composite_key'])
if before_dedup_count > after_dedup_count:
print(f"去除了 {before_dedup_count - after_dedup_count} 条已存在的记录")
# 插入新记录
if not df.empty:
df.to_sql('repair_apply_details', target_engine, if_exists='append', index=False)
print(f"成功导入 {len(df)} 条维修申请明细合并3个来源")
else:
print("没有新的维修申请明细需要导入")
return True
except Exception as e:
print(f"处理维修申请明细时出错: {str(e)}")
@ -212,7 +256,7 @@ def process_repair_apply_record():
LEFT JOIN tm_task_ma_type ttmt on bmr.ID = ttmt.TASK_ID
LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID
LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID
WHERE ttmt.IS_COUNT = 1
WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519)
GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID
HAVING repair_num > 0
"""
@ -236,7 +280,7 @@ def process_repair_apply_record():
LEFT JOIN ba_ma_scarp bms on bms.REPAIR_ID = bmr.ID
LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID
LEFT JOIN tm_task_ma_type ttmt2 on bms.ID = ttmt2.TASK_ID AND ttmt.MA_TYPE_ID = ttmt2.MA_TYPE_ID
WHERE ttmt.IS_COUNT = 1
WHERE ttmt.IS_COUNT = 1 and bmr.ID not in(519)
GROUP BY ttmt.TASK_ID, ttmt.MA_TYPE_ID
HAVING scrap_num > 0
"""
@ -258,6 +302,7 @@ def process_repair_apply_record():
ba_ma_repair_pass brp
LEFT JOIN ba_ma_repair bmr on brp.repair_id = bmr.ID
LEFT JOIN ma_machines mm on brp.ma_id = mm.id
WHERE bmr.ID not in(519)
"""
# 来源4编码维修报废
@ -278,7 +323,7 @@ def process_repair_apply_record():
LEFT JOIN ba_ma_repair bmr on bms.repair_id = bmr.ID
LEFT JOIN tm_ma_scarp_reason tmsr on bms.ID = tmsr.TASK_ID
LEFT JOIN ma_machines mm on tmsr.ma_id = mm.id
WHERE tmsr.IS_COUNT = 0
WHERE tmsr.IS_COUNT = 0 and bmr.ID not in(519)
"""
# 合并四个来源的数据
@ -336,11 +381,11 @@ def process_repair_cost():
if __name__ == "__main__":
# 执行所有转换流程
processes = [
process_tm_task,
process_tm_task_agreement,
#process_tm_task,
#process_tm_task_agreement,
process_repair_apply_details,
process_repair_apply_record,
process_repair_cost
#process_repair_apply_record,
#process_repair_cost
]
success = all([p() for p in processes])