diff --git a/安全/1.py b/安全/1.py new file mode 100644 index 0000000..3af62b6 --- /dev/null +++ b/安全/1.py @@ -0,0 +1,289 @@ +import configparser +import pandas as pd +import sqlalchemy +from sqlalchemy import create_engine +from urllib.parse import quote_plus + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + + +def analyze_sql_and_output(): + # 2. 建立数据库连接 + # 创建数据库引擎 + source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" + ) + target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" + ) + + # 3. 定义SQL查询语句 + collar_sql = """ + SELECT + bat.AGREEMENT_ID AS agreement_id, + bmc.ID as lease_id, + mm.TYPE as type_id, + mm.ID as ma_id, + ttm.CREATE_TIME as start_time, + 1 as num, + IF(bma.IS_BALANCE = 0,1,0) as is_slt, + bma.SETTLEMENT_TIME as slt_time, + mt.IS_COUNT, + mt.lease_price + FROM + ba_ma_collar bmc + LEFT JOIN ba_agreement_task bat on bmc.ID = bat.TASK_ID + LEFT JOIN ba_ma_agreement bma on bat.AGREEMENT_ID = bma.ID + LEFT JOIN tm_task_ma ttm on bmc.ID = ttm.TASK_ID + LEFT JOIN ma_machines mm on ttm.MA_ID = mm.ID + LEFT JOIN ma_type mt on mm.TYPE = mt.ID + WHERE bmc.company_id =1 AND mt.IS_COUNT = 0 + UNION + SELECT + bat.AGREEMENT_ID AS agreement_id, + bmc.ID as lease_id, + ttot.MA_TYPE_ID as type_id, + null as ma_id, + ttot.CREATE_TIME as start_time, + ttot.NUM as num, + IF(bma.IS_BALANCE = 0,1,0) as is_slt, + bma.SETTLEMENT_TIME as slt_time, + mt.IS_COUNT, + mt.lease_price + FROM + ba_ma_collar bmc + LEFT JOIN ba_agreement_task bat on bmc.ID = bat.TASK_ID + LEFT JOIN ba_ma_agreement bma on bat.AGREEMENT_ID = bma.ID + LEFT JOIN tm_task_out_type ttot on bmc.ID = ttot.TASK_ID + LEFT JOIN ma_type mt on ttot.MA_TYPE_ID = mt.ID + WHERE bmc.company_id =1 AND ttot.IS_COUNT =1; + """ + + back_sql = """ + SELECT + bat.AGREEMENT_ID AS agreement_id, + bmb.ID as back_id, + mm.TYPE as type_id, + mm.ID as ma_id, + ttm.CREATE_TIME as end_time, + 1 as num, + mt.IS_COUNT + FROM + ba_ma_back bmb + LEFT JOIN ba_agreement_task bat on bmb.ID = bat.TASK_ID + LEFT JOIN ba_ma_agreement bma on bat.AGREEMENT_ID = bma.ID + LEFT JOIN tm_task_ma ttm on bmb.ID = ttm.TASK_ID + LEFT JOIN ma_machines mm on ttm.MA_ID = mm.ID + LEFT JOIN ma_type mt on mm.TYPE = mt.ID + WHERE mt.IS_COUNT = 0 + UNION + SELECT + bat.AGREEMENT_ID AS agreement_id, + bmb.id as back_id, + ttmt.MA_TYPE_ID as type_id, + null as maId, + bmb.CREATE_TIME as end_time, + ROUND(ttmt.MACHINES_NUM,3) as num, + ttmt.IS_COUNT + FROM + ba_ma_back bmb + LEFT JOIN ba_agreement_task bat on bmb.ID = bat.TASK_ID + LEFT JOIN ba_ma_agreement bma on bat.AGREEMENT_ID = bma.ID + LEFT JOIN tm_task_ma_type ttmt on bmb.ID = ttmt.TASK_ID + LEFT JOIN pm_user pu on bmb.CREATOR = pu.ID + WHERE ttmt.IS_COUNT =1 + """ + + price_sql = """ + SELECT + ph.id, + ph.type_id, + ph.start_time, + ph.end_time, + ph.lease_price + FROM + lease_price_his ph + WHERE is_active =1 + """ + + # 4. 执行SQL查询获取原始数据 + collar_df = pd.read_sql(collar_sql, source_engine) + back_df = pd.read_sql(back_sql, source_engine) + price_df = pd.read_sql(price_sql, source_engine) + + # 5. 处理数据,按照规则进行匹配 + result_list = [] + + # 处理IS_COUNT=0的情况 + count0_collar = collar_df[collar_df['IS_COUNT'] == 0] + for _, row in count0_collar.iterrows(): + # 在退料查询中查找相同的agreement_id和ma_id + matched_back = back_df[(back_df['agreement_id'] == row['agreement_id']) & + (back_df['ma_id'] == row['ma_id'])] + + if not matched_back.empty: + end_time = matched_back.iloc[0]['end_time'] + back_id = matched_back.iloc[0]['back_id'] + status = '已退料' + else: + end_time = None + back_id = None + status = '未退料' + + # 检查租赁单价区间 + lease_price = row['lease_price'] + price_match = price_df[(price_df['type_id'] == row['type_id']) & + (price_df['start_time'] <= row['start_time']) & + (price_df['end_time'] >= row['start_time'])] + if not price_match.empty: + lease_price = price_match.iloc[0]['lease_price'] + + # 构建结果行 + result_row = { + 'agreement_id': row['agreement_id'], + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': row['num'], + 'start_time': row['start_time'], + 'end_time': end_time, + 'status': status, + 'lease_id': row['lease_id'], + 'back_id': back_id, + 'lease_price': lease_price, + 'buy_price': None, # 根据实际情况补充 + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'] + } + result_list.append(result_row) + + # 处理IS_COUNT=1的情况 + count1_collar = collar_df[collar_df['IS_COUNT'] == 1] + for _, row in count1_collar.iterrows(): + # 按时间顺序正序查找相同的agreement_id和type_id + matched_backs = back_df[(back_df['agreement_id'] == row['agreement_id']) & + (back_df['type_id'] == row['type_id'])].sort_values('end_time') + + remaining_num = float(row['num']) + + for _, back_row in matched_backs.iterrows(): + if float(remaining_num) <= 0: + break + + if back_row['num'] == remaining_num: + # 数量相等,直接匹配 + result_row = { + 'agreement_id': row['agreement_id'], + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': remaining_num, + 'start_time': row['start_time'], + 'end_time': back_row['end_time'], + 'status': '已退料', + 'lease_id': row['lease_id'], + 'back_id': back_row['back_id'], + 'lease_price': row['lease_price'], + 'buy_price': None, + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'] + } + result_list.append(result_row) + remaining_num = 0.0 + elif float(back_row['num']) < remaining_num: + # 退料数量小于领料数量,拆分 + result_row1 = { + 'agreement_id': row['agreement_id'], + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': back_row['num'], + 'start_time': row['start_time'], + 'end_time': back_row['end_time'], + 'status': '已退料', + 'lease_id': row['lease_id'], + 'back_id': back_row['back_id'], + 'lease_price': row['lease_price'], + 'buy_price': None, + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'] + } + result_list.append(result_row1) + remaining_num -= float(back_row['num']) + else: + # 退料数量大于领料数量,匹配剩余部分 + result_row = { + 'agreement_id': row['agreement_id'], + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': remaining_num, + 'start_time': row['start_time'], + 'end_time': back_row['end_time'], + 'status': '已退料', + 'lease_id': row['lease_id'], + 'back_id': back_row['back_id'], + 'lease_price': row['lease_price'], + 'buy_price': None, + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'] + } + result_list.append(result_row) + remaining_num = 0 + + # 处理未匹配完的领料 + if float(remaining_num) > 0: + result_row = { + 'agreement_id': row['agreement_id'], + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': remaining_num, + 'start_time': row['start_time'], + 'end_time': None, + 'status': '未退料', + 'lease_id': row['lease_id'], + 'back_id': None, + 'lease_price': row['lease_price'], + 'buy_price': None, + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'] + } + result_list.append(result_row) + + # 6. 创建结果DataFrame并输出到目标表 + result_df = pd.DataFrame(result_list) + + # 检查租赁单价区间并更新lease_price + for idx, row in result_df.iterrows(): + if pd.isna(row['start_time']): + continue + + price_match = price_df[(price_df['type_id'] == row['type_id']) & + (price_df['start_time'] <= row['start_time']) & + (price_df['end_time'] >= row['start_time'])] + if not price_match.empty: + result_df.at[idx, 'lease_price'] = price_match.iloc[0]['lease_price'] + + # 输出到目标表 + result_df.to_sql('slt_agreement_info', target_engine, if_exists='replace', index=False) + + print("数据处理完成,已输出到slt_agreement_info表") + + +# 执行函数 +analyze_sql_and_output() diff --git a/安全/新购.py b/安全/新购.py new file mode 100644 index 0000000..45b9a23 --- /dev/null +++ b/安全/新购.py @@ -0,0 +1,197 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus +from datetime import datetime + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎(增加charset参数避免中文乱码) +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}?charset=utf8mb4" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}?charset=utf8mb4" +) + + +def safe_convert_to_int(series): + """安全转换为整数,处理空字符串、浮点字符串等情况""" + str_series = series.astype(str) + str_series = str_series.replace({'': '0', 'None': '0', 'nan': '0'}) + return pd.to_numeric(str_series, errors='coerce').fillna(0).astype(int) + + +def safe_convert_to_float(series): + """安全转换为浮点数""" + str_series = series.astype(str) + str_series = str_series.replace({'': '0.0', 'None': '0.0', 'nan': '0.0'}) + return pd.to_numeric(str_series, errors='coerce').fillna(0.0) + + +def process_tm_task(): + """处理新购任务主表""" + try: + sql = """ + SELECT + (be.ID + 500000) as task_id, + 0 as task_type, + IF(be.`STATUS` = 17, 19, 4) as task_status, + be.`CODE` as code, + pu.`NAME` as create_by, + be.CREATE_TIME + FROM + bpm_example be + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 2 + AND be.CREATE_TIME BETWEEN '2025-01-01' AND NOW() + """ + df = pd.read_sql(sql, source_engine) + + # 转换状态字段 + df['task_status'] = safe_convert_to_int(df['task_status']) + + # 确保时间字段格式正确 + if 'CREATE_TIME' in df.columns: + df['CREATE_TIME'] = pd.to_datetime(df['CREATE_TIME']) + + df.to_sql('tm_task', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条新购任务主表数据") + return True + except Exception as e: + print(f"处理新购任务主表时出错: {str(e)}") + return False + + +def process_purchase_check_info(): + """处理采购验收信息表""" + try: + sql = """ + SELECT + (be.ID + 500000) as task_id, + baf.BUY_TIME as purchase_time, + be.CREATE_TIME as arrival_time, + (baf.SUPPLIER_ID + 500) as supplier_id, + pu.`NAME` as create_by, + be.CREATE_TIME + FROM + bpm_example be + LEFT JOIN bpm_accept_form baf on be.ID = baf.EXAMPLE_ID + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 2 + AND be.CREATE_TIME BETWEEN '2025-01-01' AND NOW() + """ + df = pd.read_sql(sql, source_engine) + + # 处理时间字段 + time_cols = ['purchase_time', 'arrival_time', 'CREATE_TIME'] + for col in time_cols: + if col in df.columns: + df[col] = pd.to_datetime(df[col]) + + # 处理供应商ID + if 'supplier_id' in df.columns: + df['supplier_id'] = safe_convert_to_int(df['supplier_id']) + + df.to_sql('purchase_check_info', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条采购验收信息") + return True + except Exception as e: + print(f"处理采购验收信息时出错: {str(e)}") + return False + + +def process_purchase_check_details(): + """处理采购验收明细表""" + try: + sql = """ + SELECT + (be.ID + 500000) as task_id, + (baf.MATYPE_ID + 6000) as type_id, + baf.PURCHASE_PRICE as purchase_price, + baf.NOTAX_PRICE as purchase_tax_price, + (baf.SUPPLIER_ID + 500) as supplier_id, + baf.ACCEP_NUM as purchase_num, + baf.QUALIFIED_NUM as check_num, + baf.ACTUALNUM as bind_num, + baf.INPUT_NUM as input_num, + IF(baf.PURCHASE_STATUS = 17, 19, 4) as status, + pu.`NAME` as create_by, + be.CREATE_TIME + FROM + bpm_example be + LEFT JOIN bpm_accept_form baf on be.ID = baf.EXAMPLE_ID + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 2 + AND be.CREATE_TIME BETWEEN '2025-01-01' AND NOW() + """ + df = pd.read_sql(sql, source_engine) + + # 处理数值字段 + num_cols = [ + 'type_id', 'purchase_num', 'check_num', + 'bind_num', 'input_num', 'status', 'supplier_id' + ] + for col in num_cols: + if col in df.columns: + df[col] = safe_convert_to_int(df[col]) + + # 处理价格字段 + price_cols = ['purchase_price', 'purchase_tax_price'] + for col in price_cols: + if col in df.columns: + df[col] = safe_convert_to_float(df[col]) + + # 添加默认的生产时间(当前时间) + df['production_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 计算验收结果 + df['check_result'] = df.apply( + lambda row: 1 if row['check_num'] == row['purchase_num'] else 0, + axis=1 + ) + + df.to_sql('purchase_check_details', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条采购验收明细") + return True + except Exception as e: + print(f"处理采购验收明细时出错: {str(e)}") + return False + + +if __name__ == "__main__": + # 执行所有转换流程 + processes = [ + process_tm_task, + process_purchase_check_info, + process_purchase_check_details + ] + + success = all([p() for p in processes]) + + if success: + print("所有新购待验收相关表转换完成!") + else: + print("!!! 部分转换失败,请检查错误日志 !!!") \ No newline at end of file diff --git a/安全/结算.py b/安全/结算.py new file mode 100644 index 0000000..e69bfa4 --- /dev/null +++ b/安全/结算.py @@ -0,0 +1,187 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus +import numpy as np + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + +# 定义type映射关系 +type_mapping = { + 6: 'sgd', # 独立施工队 + 7: 'xmb', # 项目部 + 122: 'fbs', # 分包商 + 126: 'hq' # 后勤 +} + + +def get_agreement_id(unit_id, project_id, type_code): + """根据组合条件查询agreement_id""" + try: + # 检查参数有效性 + if pd.isna(unit_id) or pd.isna(project_id) or pd.isna(type_code): + return None + + sql = f""" + SELECT (bpr.ID + 500000) as agreement_id + FROM bm_project_relation bpr + WHERE bpr.UNIT_ID = {int(unit_id)} + AND bpr.project_id = {int(project_id)} + AND bpr.type = '{type_code}' + LIMIT 1 + """ + result = pd.read_sql(sql, source_engine) + return result['agreement_id'].iloc[0] if not result.empty else None + except Exception as e: + print(f"查询agreement_id出错: unit_id={unit_id}, project_id={project_id}, type={type_code}, 错误: {str(e)}") + return None + + +def process_slt_agreement(): + """处理租赁结算信息""" + try: + # 第一步:查询基础数据 + base_sql = """ + SELECT + mps.DEPT_ID, + mps.SUB_ID, + mps.TEAM_ID, + mps.REAR_ID, + mps.PROJECT_ID, + mps.TYPE_ID as type, + (mps.MACHINE_ID+70000) as ma_id, + (mps.MATYPE_ID+6000) as type_id, + mps.NUM, + IF(mps.`STATUS` = 1, 0, 1) as status, + mps.LEASE_PRICE as lease_price, + mps.OUT_TIME as start_time, + mps.BACK_TIME as end_time, + (mps.PICK_ID+500000) as lease_id, + (mps.BACK_ID+500000) as back_id, + mps.IS_SLT as is_slt, + mps.SETTLEMENT_TIME as slt_time + FROM ma_type_project_storage mps + """ + base_df = pd.read_sql(base_sql, source_engine) + + # 填充空值并确保数值类型 + base_df = base_df.fillna({ + 'DEPT_ID': 0, + 'SUB_ID': 0, + 'TEAM_ID': 0, + 'REAR_ID': 0, + 'PROJECT_ID': 0 + }).astype({ + 'DEPT_ID': 'int64', + 'SUB_ID': 'int64', + 'TEAM_ID': 'int64', + 'REAR_ID': 'int64', + 'PROJECT_ID': 'int64' + }) + + # 第二步:处理每个记录获取agreement_id + results = [] + skipped_records = 0 + + for _, row in base_df.iterrows(): + try: + # 确定unit_id和type_code + unit_id = None + type_code = None + + if row['type'] == 6: # 施工队 + if row['TEAM_ID'] > 0: + unit_id = row['TEAM_ID'] + 5000 + type_code = 'sgd' + elif row['type'] == 7: # 项目部 + if row['DEPT_ID'] > 0: + unit_id = row['DEPT_ID'] + 4000 + type_code = 'xmb' + elif row['type'] == 122: # 分包商 + if row['SUB_ID'] > 0: + unit_id = row['SUB_ID'] + 6000 + type_code = 'fbs' + elif row['type'] == 126: # 后勤 + if row['REAR_ID'] > 0: + unit_id = row['REAR_ID'] + 6000 + type_code = 'hq' + + # 检查必要参数是否有效 + if not unit_id or not type_code or row['PROJECT_ID'] <= 0: + skipped_records += 1 + continue + + # 查询agreement_id + agreement_id = get_agreement_id(unit_id, row['PROJECT_ID'] + 3000, type_code) + if agreement_id is None: + skipped_records += 1 + continue + + # 组装结果 + result = { + 'agreement_id': agreement_id, + 'type_id': row['type_id'], + 'ma_id': row['ma_id'], + 'num': row['NUM'], + 'status': row['status'], + 'lease_price': row['lease_price'], + 'start_time': row['start_time'], + 'end_time': row['end_time'], + 'lease_id': row['lease_id'], + 'back_id': row['back_id'], + 'is_slt': row['is_slt'], + 'slt_time': row['slt_time'], + 'buy_price': None # 根据业务需要补充 + } + results.append(result) + + except Exception as e: + print(f"处理记录时出错: {str(e)},记录内容: {row}") + skipped_records += 1 + continue + + # 转换为DataFrame并写入 + if results: + result_df = pd.DataFrame(results) + result_df.to_sql('slt_agreement_info', target_engine, + if_exists='append', index=False) + print(f"成功导入 {len(result_df)} 条租赁结算信息,跳过 {skipped_records} 条无效记录") + else: + print("没有符合条件的数据需要导入") + + return True + + except Exception as e: + print(f"处理租赁结算信息时出错: {str(e)}") + return False + + +if __name__ == "__main__": + process_slt_agreement() \ No newline at end of file diff --git a/安全/维修.py b/安全/维修.py new file mode 100644 index 0000000..0cbca44 --- /dev/null +++ b/安全/维修.py @@ -0,0 +1,292 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus + +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎(修正1:增加charset参数避免中文乱码) +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}?charset=utf8mb4" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}?charset=utf8mb4" +) + +# 定义type映射关系 +type_mapping = { + 6: 'sgd', # 独立施工队 + 7: 'xmb', # 项目部 + 122: 'fbs', # 分包商 + 126: 'hq' # 后勤 +} + + +def get_agreement_id(unit_id, project_id, type_code): + """根据组合条件查询agreement_id(使用参数化查询)""" + try: + # 修正2:添加参数有效性检查 + if pd.isna(unit_id) or pd.isna(project_id) or pd.isna(type_code): + return None + + sql = """ + SELECT (bpr.ID + 500000) as agreement_id + FROM bm_project_relation bpr + WHERE bpr.UNIT_ID = %s + AND bpr.project_id = %s + AND bpr.type = %s + LIMIT 1 + """ + params = (int(unit_id), int(project_id), str(type_code)) + result = pd.read_sql(sql, source_engine, params=params) + return result['agreement_id'].iloc[0] if not result.empty else None + except Exception as e: + print(f"查询agreement_id出错: unit_id={unit_id}, project_id={project_id}, type={type_code}, 错误: {str(e)}") + return None + + +def process_tm_task(): + """处理维修任务主表""" + try: + sql = """ + SELECT + (be.ID + 500000) as task_id, + be.`CODE`, + 4 as task_type, + pu.`NAME` as create_by, + be.CREATE_TIME, + IF(be.`STATUS` = 56, 1, 0) as task_status + FROM + bpm_example be + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 7 + """ + df = pd.read_sql(sql, source_engine) + + # 修正3:确保datetime类型正确 + if 'CREATE_TIME' in df.columns: + df['CREATE_TIME'] = pd.to_datetime(df['CREATE_TIME']) + + df.to_sql('tm_task', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条维修任务主表数据") + return True + except Exception as e: + print(f"处理维修任务主表时出错: {str(e)}") + return False + + +def process_tm_task_agreement(): + """处理维修任务关联表""" + try: + # 第一步查询基础数据 + base_sql = """ + SELECT + (be.ID + 500000) as task_id, + be.DEPT_ID, + be.SUB_ID, + be.TEAM_ID, + be.REAR_ID, + be.TYPE, + be.PROJECT_ID + FROM bpm_example be + WHERE be.DEFINITION_ID = 7 + """ + base_df = pd.read_sql(base_sql, source_engine) + + # 修正4:更安全的空值处理 + base_df = base_df.fillna({ + 'DEPT_ID': 0, 'SUB_ID': 0, + 'TEAM_ID': 0, 'REAR_ID': 0, + 'PROJECT_ID': 0, 'TYPE': 0 + }) + + # 修正5:确保所有ID为整数 + for col in ['DEPT_ID', 'SUB_ID', 'TEAM_ID', 'REAR_ID', 'PROJECT_ID', 'TYPE']: + base_df[col] = pd.to_numeric(base_df[col], errors='coerce').fillna(0).astype(int) + + results = [] + skipped_records = 0 + + for _, row in base_df.iterrows(): + try: + # 确定unit_id和type_code + unit_id = None + type_code = None + + if row['TYPE'] == 6 and row['TEAM_ID'] > 0: # 施工队 + unit_id = row['TEAM_ID'] + 5000 + type_code = 'sgd' + elif row['TYPE'] == 7 and row['DEPT_ID'] > 0: # 项目部 + unit_id = row['DEPT_ID'] + 4000 + type_code = 'xmb' + elif row['TYPE'] == 122 and row['SUB_ID'] > 0: # 分包商 + unit_id = row['SUB_ID'] + 6000 + type_code = 'fbs' + elif row['TYPE'] == 126 and row['REAR_ID'] > 0: # 后勤 + unit_id = row['REAR_ID'] + 6000 + type_code = 'hq' + + if not unit_id or row['PROJECT_ID'] <= 0: + skipped_records += 1 + continue + + agreement_id = get_agreement_id(unit_id, row['PROJECT_ID'], type_code) + if agreement_id: + results.append({ + 'task_id': int(row['task_id']), + 'agreement_id': int(agreement_id) + }) + except Exception as e: + print(f"处理记录时出错: {str(e)},记录ID: {row.get('task_id', '未知')}") + skipped_records += 1 + continue + + if results: + result_df = pd.DataFrame(results) + result_df.to_sql('tm_task_agreement', target_engine, + if_exists='append', index=False) + print(f"成功导入 {len(result_df)} 条维修任务关联数据,跳过 {skipped_records} 条无效记录") + return True + except Exception as e: + print(f"处理维修任务关联表时出错: {str(e)}") + return False + + +def safe_convert_to_int(series): + """ + 安全转换为整数,处理以下情况: + - 空字符串('') + - 浮点字符串('1.0') + - 空值(None/NaN) + - 常规整数字符串('123') + """ + # 先转换为字符串,处理各种输入类型 + str_series = series.astype(str) + # 替换空字符串为'0' + str_series = str_series.replace({'': '0'}) + # 转换为float再转为int + return pd.to_numeric(str_series, errors='coerce').fillna(0).astype(int) + + +def process_repair_apply_details(): + """处理维修申请明细""" + try: + sql = """ + SELECT + (brf.EXAMPLE_ID + 500000) as task_id, + (brf.MATYPE_ID + 6000) as type_id, + (brf.MA_ID + 70000) as ma_id, + brf.BACK_NUM as repair_num, + brf.REPAIR_NUM as repaired_num, + brf.SCRAP_NUM as scrap_num, + IF(be.`STATUS` = 56, 1, 0) as status, + pu.`NAME` as create_by, + be.PARENT_ID as back_id, + IF(be.`STATUS` = 56, 0, 1) as is_ds + FROM + bpm_repair_form brf + LEFT JOIN bpm_example be on brf.EXAMPLE_ID = be.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 7 + """ + df = pd.read_sql(sql, source_engine) + + # 转换所有数值列 + num_cols = ['repair_num', 'repaired_num', 'scrap_num', 'status', 'is_ds', 'back_id'] + for col in num_cols: + if col in df.columns: + df[col] = safe_convert_to_int(df[col]) + + # 记录转换前后的样本值 + print("维修申请明细表转换样例:") + for col in num_cols: + if col in df.columns and len(df) > 0: + sample = df[col].iloc[0] + print(f"{col}: {sample} (类型: {type(sample)})") + + df.to_sql('repair_apply_details', target_engine, + if_exists='append', index=False) + print(f"成功导入 {len(df)} 条维修申请明细") + return True + except Exception as e: + print(f"处理维修申请明细时出错: {str(e)}") + return False + + +def process_repair_apply_record(): + """处理维修申请记录""" + try: + sql = """ + SELECT + (brf.EXAMPLE_ID + 500000) as task_id, + (brf.MATYPE_ID + 6000) as type_id, + (brf.MA_ID + 70000) as ma_id, + brf.REPAIR_NUM as repair_num, + brf.SCRAP_NUM as scrap_num, + IF(be.`STATUS` = 56, 1, 0) as status, + pu.`NAME` as create_by + FROM + bpm_repair_form brf + LEFT JOIN bpm_example be on brf.EXAMPLE_ID = be.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.`STATUS` = 56 AND be.DEFINITION_ID = 7 + """ + df = pd.read_sql(sql, source_engine) + + # 转换所有数值列 + num_cols = ['repair_num', 'scrap_num', 'status'] + for col in num_cols: + if col in df.columns: + df[col] = safe_convert_to_int(df[col]) + + # 记录转换前后的样本值 + print("维修申请记录表转换样例:") + for col in num_cols: + if col in df.columns and len(df) > 0: + sample = df[col].iloc[0] + print(f"{col}: {sample} (类型: {type(sample)})") + + df.to_sql('repair_apply_record', target_engine, + if_exists='append', index=False) + print(f"成功导入 {len(df)} 条维修申请记录") + return True + except Exception as e: + print(f"处理维修申请记录时出错: {str(e)}") + return False + + +if __name__ == "__main__": + # 执行所有转换流程 + processes = [ + process_tm_task, + process_tm_task_agreement, + process_repair_apply_details, + process_repair_apply_record + ] + + success = all([p() for p in processes]) + + if success: + print("所有维修相关表转换完成!") + else: + print("!!! 部分转换失败,请检查错误日志 !!!") \ No newline at end of file diff --git a/安全/退料.py b/安全/退料.py new file mode 100644 index 0000000..05db356 --- /dev/null +++ b/安全/退料.py @@ -0,0 +1,291 @@ +import configparser +import pandas as pd +from sqlalchemy import create_engine +from urllib.parse import quote_plus +# 读取配置文件 +config = configparser.ConfigParser() +config.read('config.ini') + +# 获取数据库连接配置 +source_config = { + 'host': config.get('source_db', 'host'), + 'user': config.get('source_db', 'user'), + 'password': config.get('source_db', 'password'), + 'database': config.get('source_db', 'database'), + 'port': config.getint('source_db', 'port') +} + +target_config = { + 'host': config.get('target_db', 'host'), + 'user': config.get('target_db', 'user'), + 'password': config.get('target_db', 'password'), + 'database': config.get('target_db', 'database'), + 'port': config.getint('target_db', 'port') +} + +# 创建数据库引擎 +source_engine = create_engine( + f"mysql+pymysql://{source_config['user']}:{quote_plus(source_config['password'])}@{source_config['host']}:{source_config['port']}/{source_config['database']}" +) +target_engine = create_engine( + f"mysql+pymysql://{target_config['user']}:{quote_plus(target_config['password'])}@{target_config['host']}:{target_config['port']}/{target_config['database']}" +) + +# 定义type映射关系 +type_mapping = { + 6: 'sgd', # 独立施工队 + 7: 'xmb', # 项目部 + 122: 'fbs', # 分包商 + 126: 'hq' # 后勤 +} + + +def get_agreement_id(unit_id, project_id, type_code): + """根据组合条件查询agreement_id""" + try: + sql = f""" + SELECT (bpr.ID + 500000) as agreement_id + FROM bm_project_relation bpr + WHERE bpr.UNIT_ID = {unit_id} + AND bpr.project_id = {project_id} + AND bpr.type = '{type_code}' + LIMIT 1 + """ + result = pd.read_sql(sql, source_engine) + return result['agreement_id'].iloc[0] if not result.empty else None + except Exception as e: + print(f"查询agreement_id出错: {str(e)}") + return None + + +def process_tm_task(): + """处理任务主表""" + try: + sql = """ + SELECT + (be.ID + 500000) as task_id, + 3 as task_type, + IF(be.`STATUS` = 50, 2, 0) as task_status, + be.`CODE`, + pu.`NAME` as create_by, + be.CREATE_TIME + FROM + bpm_example be + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 5 + """ + df = pd.read_sql(sql, source_engine) + df.to_sql('tm_task', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条任务主表数据") + return True + except Exception as e: + print(f"处理任务主表时出错: {str(e)}") + return False + + +def process_tm_task_agreement(): + """处理任务关联表""" + try: + # 第一步查询基础数据 + base_sql = """ + SELECT + (be.ID + 600000) as task_id, + be.DEPT_ID, + be.SUB_ID, + be.TEAM_ID, + be.REAR_ID, + be.TYPE, + be.PROJECT_ID + FROM bpm_example be + WHERE be.DEFINITION_ID = 5 + """ + base_df = pd.read_sql(base_sql, source_engine) + + # 填充空值并转换为适当类型 + base_df = base_df.fillna(0) + base_df['TYPE'] = base_df['TYPE'].astype(int) + for col in ['DEPT_ID', 'SUB_ID', 'TEAM_ID', 'REAR_ID', 'PROJECT_ID']: + base_df[col] = pd.to_numeric(base_df[col], errors='coerce').fillna(0).astype(int) + + # 第二步处理每个记录获取agreement_id + results = [] + skipped_records = 0 + + for _, row in base_df.iterrows(): + try: + # 确定unit_id和type_code + unit_id = None + type_code = None + + if row['TYPE'] == 6: # 施工队 + if row['TEAM_ID'] > 0: + unit_id = int(row['TEAM_ID']) + 5000 + type_code = 'sgd' + elif row['TYPE'] == 7: # 项目部 + if row['DEPT_ID'] > 0: + unit_id = int(row['DEPT_ID']) + 4000 + type_code = 'xmb' + elif row['TYPE'] == 122: # 分包商 + if row['SUB_ID'] > 0: + unit_id = int(row['SUB_ID']) + 6000 + type_code = 'fbs' + elif row['TYPE'] == 126: # 后勤 + if row['REAR_ID'] > 0: + unit_id = int(row['REAR_ID']) + 6000 + type_code = 'hq' + + # 检查必要参数是否有效 + if not unit_id or not type_code or row['PROJECT_ID'] <= 0: + skipped_records += 1 + continue + + # 使用参数化查询避免字符串拼接问题 + agreement_id = get_agreement_id(unit_id, int(row['PROJECT_ID']), type_code) + if agreement_id: + results.append({ + 'task_id': int(row['task_id']), + 'agreement_id': int(agreement_id) + }) + except Exception as e: + print(f"处理记录时出错: {str(e)},记录内容: {row.to_dict()}") + skipped_records += 1 + continue + + # 写入目标表 + if results: + result_df = pd.DataFrame(results) + result_df.to_sql('tm_task_agreement', target_engine, + if_exists='append', index=False) + print(f"成功导入 {len(result_df)} 条任务关联数据,跳过 {skipped_records} 条无效记录") + return True + except Exception as e: + print(f"处理任务关联表时出错: {str(e)}") + return False + + +def get_agreement_id(unit_id, project_id, type_code): + """根据组合条件查询agreement_id(使用参数化查询)""" + try: + # 使用参数化查询 + sql = """ + SELECT (bpr.ID + 500000) as agreement_id + FROM bm_project_relation bpr + WHERE bpr.UNIT_ID = %s + AND bpr.project_id = %s + AND bpr.type = %s + LIMIT 1 + """ + params = (int(unit_id), int(project_id), str(type_code)) + result = pd.read_sql(sql, source_engine, params=params) + return result['agreement_id'].iloc[0] if not result.empty else None + except Exception as e: + print(f"查询agreement_id出错: unit_id={unit_id}, project_id={project_id}, type={type_code}, 错误: {str(e)}") + return None + + +def process_back_apply_info(): + """处理退料申请信息""" + try: + sql = """ + SELECT + be.`CODE`, + (be.ID + 500000) as task_id, + be.LINK_MAN as back_person, + NULL as phone, -- 需根据实际情况补充 + pu.`NAME` as create_by, + be.CREATE_TIME, + IF(be.`STATUS` = 50, 1, 0) as status, + be.IS_PRINT as print_status + FROM + bpm_example be + LEFT JOIN sys_data_dict sdd on be.`STATUS` = sdd.ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 5 + """ + df = pd.read_sql(sql, source_engine) + df.to_sql('back_apply_info', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条退料申请信息") + return True + except Exception as e: + print(f"处理退料申请信息时出错: {str(e)}") + return False + + +def process_back_apply_details(): + """处理退料申请明细""" + try: + sql = """ + SELECT + be.`CODE`, + (be.ID + 500000) as parent_id, + (bbf.MATYPE_ID+6000) as type_id, + SUM(bbf.BACK_NUM) as pre_num, + SUM(bbf.BACK_NUM) as audit_num, + 0 as bad_num, -- 需根据实际情况调整 + 0 as good_num, -- 需根据实际情况调整 + IF(be.`STATUS` = 50, 2, 0) as status, + pu.`NAME` as create_by, + be.CREATE_TIME + FROM + bpm_example be + LEFT JOIN bpm_back_form bbf on be.ID = bbf.EXAMPLE_ID + LEFT JOIN pm_user pu on be.CREATOR = pu.ID + WHERE be.DEFINITION_ID = 5 + GROUP BY be.ID + """ + df = pd.read_sql(sql, source_engine) + df.to_sql('back_apply_details', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条退料申请明细") + return True + except Exception as e: + print(f"处理退料申请明细时出错: {str(e)}") + return False + + +def process_back_check_details(): + """处理退料验收明细""" + try: + sql = """ + SELECT + (bbf.EXAMPLE_ID + 500000) as parent_id, + (bbf.MATYPE_ID + 6000) as type_id, + (bbf.MA_ID + 70000) as ma_id, + bbf.BACK_NUM as back_num, + CASE + WHEN bbf.BACK_STATUS = 19 THEN 1 + WHEN bbf.BACK_STATUS = 20 THEN 0 + ELSE 2 + END AS back_status, + IF(bbf.BACK_STATUS = 19, bbf.BACK_NUM, 0) as good_num, + IF(bbf.BACK_STATUS IN (20,21), bbf.BACK_NUM, 0) as bad_num, + 1 as status, + pu.`NAME` as create_by, + bbf.CREATE_TIME + FROM + bpm_back_form bbf + LEFT JOIN pm_user pu on bbf.CREATOR = pu.ID + """ + df = pd.read_sql(sql, source_engine) + df.to_sql('back_check_details', target_engine, if_exists='append', index=False) + print(f"成功导入 {len(df)} 条退料验收明细") + return True + except Exception as e: + print(f"处理退料验收明细时出错: {str(e)}") + return False + + +if __name__ == "__main__": + # 执行所有转换流程 + results = [ + process_tm_task(), + process_tm_task_agreement(), + process_back_apply_info(), + process_back_apply_details(), + process_back_check_details() + ] + + if all(results): + print("所有退料相关表转换完成!") + else: + print("部分转换失败,请检查错误日志") \ No newline at end of file