803 lines
31 KiB
Python
803 lines
31 KiB
Python
import os
|
||
import sys
|
||
import json
|
||
import logging
|
||
import time
|
||
import glob
|
||
import re
|
||
import argparse
|
||
import shutil
|
||
import traceback
|
||
import concurrent.futures
|
||
from datetime import datetime
|
||
from pdf_to_table import process_pdf_to_table, post_process_money_texts
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
datefmt='%Y-%m-%d %H:%M:%S'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 优化批量处理的线程数
|
||
DEFAULT_MAX_WORKERS = max(4, os.cpu_count() or 4)
|
||
|
||
# 设置一个环境变量控制日志详细程度
|
||
VERBOSE_LOGGING = os.environ.get('OCR_VERBOSE_LOGGING', '0') == '1'
|
||
|
||
def log_verbose(message):
|
||
"""只在详细日志模式下记录信息"""
|
||
if VERBOSE_LOGGING:
|
||
logger.info(message)
|
||
|
||
def parse_regions_file(txt_file):
|
||
"""从regions文本文件提取name和money信息"""
|
||
name_data = []
|
||
money_data = []
|
||
current_section = None
|
||
|
||
try:
|
||
logger.info(f"开始解析文件: {txt_file}")
|
||
|
||
# 使用更高效的文件读取方式
|
||
with open(txt_file, 'r', encoding='utf-8') as f:
|
||
# 一次性读取所有行,减少IO操作
|
||
lines = f.readlines()
|
||
|
||
# 预先定义正则表达式进行匹配,提高效率
|
||
item_pattern = re.compile(r'^\s+(\d+)\.\s+(.+)$')
|
||
|
||
# 直接遍历处理,避免不必要的日志记录
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
if 'NAME 区域' in line:
|
||
current_section = 'name'
|
||
continue
|
||
elif 'MONEY 区域' in line:
|
||
current_section = 'money'
|
||
continue
|
||
elif '===' in line: # 区域分隔符,重置当前部分
|
||
current_section = None
|
||
continue
|
||
|
||
# 使用正则表达式匹配,更高效
|
||
if current_section:
|
||
match = item_pattern.match(line)
|
||
if match:
|
||
content = match.group(2).strip()
|
||
if current_section == 'name':
|
||
name_data.append(content)
|
||
elif current_section == 'money':
|
||
money_data.append(content)
|
||
|
||
# 只在处理完成后记录一次结果
|
||
logger.info(f"从{txt_file}解析结果: {len(name_data)}个名称, {len(money_data)}个金额")
|
||
|
||
# 只有当实际有内容时才记录示例
|
||
if name_data and len(name_data) > 0:
|
||
examples = name_data[:min(3, len(name_data))]
|
||
logger.info(f"名称示例: {examples}")
|
||
if money_data and len(money_data) > 0:
|
||
examples = money_data[:min(3, len(money_data))]
|
||
logger.info(f"金额示例: {examples}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"解析结果文件时出错: {str(e)}")
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
return {
|
||
'name': name_data,
|
||
'money': money_data
|
||
}
|
||
|
||
def extract_texts_from_directory(directory, image_prefix):
|
||
"""
|
||
直接从目录中的文本文件提取name和money文本
|
||
作为parse_regions_file的备用方法
|
||
|
||
Args:
|
||
directory: 包含文本文件的目录路径
|
||
image_prefix: 图像前缀(如"5"表示提取5_name_*.txt和5_money_*.txt文件)
|
||
|
||
Returns:
|
||
包含name和money列表的字典
|
||
"""
|
||
name_data = []
|
||
money_data = []
|
||
|
||
try:
|
||
# 使用glob模式直接查找匹配的文件
|
||
name_patterns = [
|
||
os.path.join(directory, f"{image_prefix}_name_*_texts.txt"),
|
||
os.path.join(directory, f"{image_prefix}_rotated_name_*_texts.txt")
|
||
]
|
||
money_patterns = [
|
||
os.path.join(directory, f"{image_prefix}_money_*_texts.txt"),
|
||
os.path.join(directory, f"{image_prefix}_rotated_money_*_texts.txt")
|
||
]
|
||
|
||
name_files = []
|
||
for pattern in name_patterns:
|
||
name_files.extend(glob.glob(pattern))
|
||
|
||
money_files = []
|
||
for pattern in money_patterns:
|
||
money_files.extend(glob.glob(pattern))
|
||
|
||
# 如果在当前目录找不到,检查子目录
|
||
if not name_files and not money_files:
|
||
image_dir = os.path.join(directory, image_prefix)
|
||
if os.path.isdir(image_dir):
|
||
# 同样支持旋转与非旋转的文件格式
|
||
name_patterns = [
|
||
os.path.join(image_dir, f"{image_prefix}_name_*_texts.txt"),
|
||
os.path.join(image_dir, f"{image_prefix}_rotated_name_*_texts.txt")
|
||
]
|
||
money_patterns = [
|
||
os.path.join(image_dir, f"{image_prefix}_money_*_texts.txt"),
|
||
os.path.join(image_dir, f"{image_prefix}_rotated_money_*_texts.txt")
|
||
]
|
||
|
||
name_files = []
|
||
for pattern in name_patterns:
|
||
name_files.extend(glob.glob(pattern))
|
||
|
||
money_files = []
|
||
for pattern in money_patterns:
|
||
money_files.extend(glob.glob(pattern))
|
||
|
||
# 定义排序函数
|
||
def extract_index(file_path):
|
||
try:
|
||
# 使用split+basename更高效
|
||
parts = os.path.basename(file_path).split('_')
|
||
# 处理旋转图片的情况
|
||
if 'rotated' in parts:
|
||
# 如果是rotated文件,则索引位置会不同
|
||
idx_pos = 3 if len(parts) >= 4 else 0
|
||
else:
|
||
# 非旋转图片的正常情况
|
||
idx_pos = 2 if len(parts) >= 3 else 0
|
||
return int(parts[idx_pos]) if idx_pos < len(parts) else 0
|
||
except (IndexError, ValueError):
|
||
return float('inf')
|
||
|
||
# 对文件列表进行排序
|
||
name_files.sort(key=extract_index)
|
||
money_files.sort(key=extract_index)
|
||
|
||
# 只有在找到文件时才记录
|
||
if name_files or money_files:
|
||
logger.info(f"找到 {len(name_files)} 个name文本文件和 {len(money_files)} 个money文本文件")
|
||
|
||
# 使用更高效的读取方式
|
||
for file_path in name_files:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
# 一次性读取并处理
|
||
lines = [line.strip() for line in f if line.strip()]
|
||
name_data.extend(lines)
|
||
logger.info(f"从 {os.path.basename(file_path)} 提取了 {len(lines)} 个名称")
|
||
|
||
for file_path in money_files:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
# 一次性读取并处理
|
||
lines = [line.strip() for line in f if line.strip()]
|
||
money_data.extend(lines)
|
||
logger.info(f"从 {os.path.basename(file_path)} 提取了 {len(lines)} 个金额")
|
||
|
||
if name_data or money_data:
|
||
logger.info(f"直接从文本文件提取: {len(name_data)}个名称, {len(money_data)}个金额")
|
||
|
||
except Exception as e:
|
||
logger.error(f"从目录提取文本时出错: {str(e)}")
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
|
||
return {
|
||
'name': name_data,
|
||
'money': money_data
|
||
}
|
||
|
||
def process_pdf(pdf_path, output_dir, confidence_threshold=0.6):
|
||
"""
|
||
处理单个PDF文件
|
||
|
||
Args:
|
||
pdf_path: PDF文件路径
|
||
output_dir: 输出目录
|
||
confidence_threshold: 置信度阈值
|
||
|
||
Returns:
|
||
处理结果字典
|
||
"""
|
||
try:
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 获取PDF文件名
|
||
pdf_filename = os.path.basename(pdf_path)
|
||
pdf_name = os.path.splitext(pdf_filename)[0]
|
||
|
||
logger.info(f"开始处理PDF: {pdf_path}")
|
||
|
||
# 获取PDF文件大小
|
||
file_size = os.path.getsize(pdf_path) / (1024 * 1024) # 转换为MB
|
||
logger.info(f"文件大小: {file_size:.2f} MB")
|
||
|
||
# 创建临时目录存放PDF
|
||
pdf_folder = os.path.join(output_dir, "temp_pdf")
|
||
os.makedirs(pdf_folder, exist_ok=True)
|
||
|
||
# 复制PDF到临时目录
|
||
temp_pdf_path = os.path.join(pdf_folder, pdf_filename)
|
||
shutil.copy2(pdf_path, temp_pdf_path)
|
||
|
||
# 记录处理开始时间
|
||
start_time = time.time()
|
||
|
||
# 处理PDF文件
|
||
stats = process_pdf_to_table(
|
||
pdf_folder=pdf_folder,
|
||
output_folder=output_dir,
|
||
confidence_threshold=confidence_threshold
|
||
)
|
||
|
||
# 计算处理时间
|
||
processing_time = time.time() - start_time
|
||
logger.info(f"处理耗时: {processing_time:.2f} 秒")
|
||
|
||
# 记录性能数据
|
||
stats['performance'] = {
|
||
'file_size_mb': round(file_size, 2),
|
||
'processing_time_seconds': round(processing_time, 2),
|
||
'processing_speed_mb_per_second': round(file_size / processing_time, 4) if processing_time > 0 else 0
|
||
}
|
||
|
||
# 解析结果
|
||
all_results = []
|
||
results_folder = os.path.join(output_dir, "results")
|
||
|
||
if os.path.exists(results_folder):
|
||
# 使用glob快速查找所有regions.txt文件
|
||
region_files = glob.glob(os.path.join(results_folder, "*_regions.txt"))
|
||
|
||
# 按文件名中的数字进行排序
|
||
def extract_number(file_path):
|
||
try:
|
||
filename = os.path.basename(file_path)
|
||
number_str = filename.split('_')[0]
|
||
return int(number_str)
|
||
except (IndexError, ValueError):
|
||
return float('inf')
|
||
|
||
# 按文件名中的数字从小到大排序
|
||
region_files.sort(key=extract_number)
|
||
|
||
if region_files:
|
||
logger.info(f"找到 {len(region_files)} 个区域文件用于解析")
|
||
|
||
# 顺序处理区域文件
|
||
for region_file in region_files:
|
||
image_prefix = os.path.basename(region_file).split('_')[0]
|
||
result = parse_regions_file(region_file)
|
||
|
||
# 只有当解析到内容时才添加结果
|
||
if result['name'] or result['money']:
|
||
# 对当前图片的数据进行处理和匹配
|
||
result = process_single_image_data(result, image_prefix)
|
||
all_results.append(result)
|
||
logger.info(f"从 {image_prefix} 解析到内容,并完成了匹配处理")
|
||
else:
|
||
# 备用方法:直接从texts.txt文件读取
|
||
backup_result = extract_texts_from_directory(results_folder, image_prefix)
|
||
if backup_result['name'] or backup_result['money']:
|
||
# 对备用方法提取的数据也进行处理和匹配
|
||
backup_result = process_single_image_data(backup_result, image_prefix)
|
||
all_results.append(backup_result)
|
||
logger.info(f"使用备用方法从 {image_prefix} 成功提取内容,并完成了匹配处理")
|
||
else:
|
||
logger.warning("未找到任何regions.txt文件")
|
||
|
||
# 如果没有从regions.txt解析到结果,尝试直接从子目录提取
|
||
if not all_results:
|
||
logger.warning("未从regions.txt文件解析到内容,尝试从子目录直接提取")
|
||
|
||
# 获取所有子目录并排序
|
||
subdirs = []
|
||
if os.path.exists(results_folder):
|
||
subdirs = [d for d in os.listdir(results_folder)
|
||
if os.path.isdir(os.path.join(results_folder, d))]
|
||
|
||
# 按目录名中的数字排序
|
||
def extract_dir_number(dirname):
|
||
try:
|
||
return int(dirname)
|
||
except ValueError:
|
||
return float('inf')
|
||
|
||
subdirs.sort(key=extract_dir_number)
|
||
|
||
if subdirs:
|
||
logger.info(f"找到 {len(subdirs)} 个子目录,准备提取")
|
||
|
||
# 使用顺序处理子目录
|
||
ordered_results = []
|
||
for dirname in subdirs:
|
||
result_dir = os.path.join(results_folder, dirname)
|
||
backup_result = extract_texts_from_directory(result_dir, dirname)
|
||
if backup_result['name'] or backup_result['money']:
|
||
# 对子目录数据进行处理和匹配
|
||
backup_result = process_single_image_data(backup_result, dirname)
|
||
ordered_results.append((int(dirname) if dirname.isdigit() else float('inf'), backup_result))
|
||
|
||
# 按子目录编号排序后添加到结果
|
||
ordered_results.sort(key=lambda x: x[0])
|
||
for _, result in ordered_results:
|
||
all_results.append(result)
|
||
|
||
# 合并所有结果
|
||
merged_result = {'name': [], 'money': [], 'name_money': []}
|
||
|
||
if all_results:
|
||
# 合并所有图片的处理结果,保持每个图片内的匹配关系
|
||
for result in all_results:
|
||
merged_result['name'].extend(result.get('name', []))
|
||
merged_result['money'].extend(result.get('money', []))
|
||
merged_result['name_money'].extend(result.get('name_money', []))
|
||
|
||
# 注意:由于每个图片已经单独处理并匹配,此处不再需要全局匹配逻辑
|
||
|
||
# 记录处理结果摘要
|
||
logger.info(f"PDF {pdf_filename} 处理完成,找到 {len(merged_result['name'])} 个名称, {len(merged_result['money'])} 个金额, {len(merged_result['name_money'])} 个名称-金额对")
|
||
|
||
# 保存最终结果到JSON文件
|
||
result_json_path = os.path.join(output_dir, f"{pdf_name}_result.json")
|
||
with open(result_json_path, 'w', encoding='utf-8') as f:
|
||
json.dump({
|
||
'pdf_file': pdf_filename,
|
||
'name': merged_result['name'],
|
||
'money': merged_result['money'],
|
||
'name_money': merged_result['name_money'],
|
||
'stats': stats
|
||
}, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"JSON结果已保存到: {result_json_path}")
|
||
|
||
# 保存最终结果到TXT文件
|
||
result_txt_path = os.path.join(output_dir, f"{pdf_name}_result.txt")
|
||
with open(result_txt_path, 'w', encoding='utf-8') as f:
|
||
f.write(f"PDF文件:{pdf_filename} 处理结果\n")
|
||
f.write(f"处理时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
# 写入name-money对
|
||
f.write(f"共找到 {len(merged_result['name_money'])} 个名称-金额对:\n\n")
|
||
for i, pair in enumerate(merged_result['name_money'], 1):
|
||
f.write(f"{i}. {pair}\n")
|
||
|
||
f.write("\n" + "=" * 80 + "\n")
|
||
logger.info(f"TXT结果已保存到: {result_txt_path}")
|
||
|
||
# 清理临时目录
|
||
try:
|
||
shutil.rmtree(pdf_folder)
|
||
logger.info("已清理临时PDF目录")
|
||
except Exception as e:
|
||
logger.warning(f"清理临时目录时出错: {str(e)}")
|
||
|
||
return {
|
||
'pdf_file': pdf_filename,
|
||
'result': merged_result,
|
||
'stats': stats
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理PDF {pdf_path} 时出错: {str(e)}")
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
return {
|
||
'pdf_file': os.path.basename(pdf_path),
|
||
'error': str(e)
|
||
}
|
||
|
||
def process_single_image_data(result, image_prefix):
|
||
"""
|
||
处理单张图片的名称和金额数据,解决匹配问题
|
||
|
||
Args:
|
||
result: 包含name和money的字典
|
||
image_prefix: 图片前缀标识
|
||
|
||
Returns:
|
||
处理后的结果字典,包含匹配好的name_money对
|
||
"""
|
||
log_verbose(f"开始处理图片 {image_prefix} 的数据匹配")
|
||
|
||
# 复制原始数据,避免修改原始数据
|
||
processed_result = {
|
||
'name': result.get('name', []).copy(),
|
||
'money': result.get('money', []).copy(),
|
||
'name_money': []
|
||
}
|
||
|
||
# 对金额应用后处理函数 - 使用专门的后处理函数
|
||
processed_result['money'] = post_process_money_texts(processed_result['money'])
|
||
log_verbose(f"图片 {image_prefix} 金额后处理后数量: {len(processed_result['money'])}")
|
||
|
||
# 金额格式化和验证 - 确保金额格式正确
|
||
validated_money = []
|
||
for money in processed_result['money']:
|
||
# 尝试清理和标准化金额格式
|
||
cleaned_money = money.strip()
|
||
|
||
# 移除可能的非数字字符(保留数字、小数点和逗号)
|
||
cleaned_money = re.sub(r'[^\d.,]', '', cleaned_money)
|
||
|
||
# 检查是否包含小数点,如果没有可能需要添加
|
||
if '.' not in cleaned_money and cleaned_money.isdigit():
|
||
cleaned_money = cleaned_money + '.00'
|
||
log_verbose(f"格式化金额: {money} -> {cleaned_money}")
|
||
# 确保小数点后有两位数字
|
||
elif '.' in cleaned_money:
|
||
parts = cleaned_money.split('.')
|
||
if len(parts) == 2:
|
||
integer_part, decimal_part = parts
|
||
# 如果小数部分长度不足2,补零
|
||
if len(decimal_part) < 2:
|
||
decimal_part = decimal_part.ljust(2, '0')
|
||
# 如果小数部分超过2位,可能需要截断或四舍五入(根据需求)
|
||
elif len(decimal_part) > 2:
|
||
decimal_part = decimal_part[:2]
|
||
cleaned_money = integer_part + '.' + decimal_part
|
||
log_verbose(f"标准化金额: {money} -> {cleaned_money}")
|
||
|
||
# 检查金额格式是否有效
|
||
try:
|
||
float_value = float(cleaned_money.replace(',', ''))
|
||
if float_value < 0.01:
|
||
logger.warning(f"忽略异常小的金额值: {cleaned_money}")
|
||
continue
|
||
validated_money.append(cleaned_money)
|
||
except ValueError:
|
||
logger.warning(f"忽略无效金额格式: {cleaned_money}")
|
||
|
||
# 更新为验证后的金额
|
||
processed_result['money'] = validated_money
|
||
|
||
# 处理name和money数量不匹配的情况
|
||
max_name_len = len(processed_result['name'])
|
||
max_money_len = len(processed_result['money'])
|
||
|
||
# 记录name和money配对前的情况
|
||
log_verbose(f"图片 {image_prefix} 配对前: {max_name_len}个名称, {max_money_len}个金额")
|
||
|
||
# 判断是否存在不匹配
|
||
if max_name_len != max_money_len:
|
||
logger.warning(f"图片 {image_prefix} 名称和金额数量不匹配: {max_name_len}个名称 vs {max_money_len}个金额")
|
||
|
||
# 处理姓名中可能存在的误识别情况
|
||
cleaned_names = []
|
||
for name in processed_result['name']:
|
||
# 判断姓名中是否包含错误识别的数字和非中文字符
|
||
clean_name = re.sub(r'[0-9a-zA-Z.,:;!@#$%^&*()_+={}\[\]|\\/<>?~`-]', '', name)
|
||
if clean_name != name:
|
||
log_verbose(f"清理姓名中的误识别: {name} -> {clean_name}")
|
||
|
||
# 如果清理后不为空,则保留
|
||
if clean_name.strip():
|
||
cleaned_names.append(clean_name.strip())
|
||
else:
|
||
logger.warning(f"姓名'{name}'在清理后为空,已移除")
|
||
|
||
# 更新清理后的姓名列表
|
||
processed_result['name'] = cleaned_names
|
||
|
||
# 再次检查数量
|
||
max_name_len = len(processed_result['name'])
|
||
log_verbose(f"清理后: {max_name_len}个名称, {max_money_len}个金额")
|
||
|
||
# 创建最终的配对结果
|
||
# 采用截断方式匹配,确保数据不会错位
|
||
match_len = min(max_name_len, max_money_len)
|
||
|
||
if match_len > 0:
|
||
for i in range(match_len):
|
||
name = processed_result['name'][i]
|
||
money = processed_result['money'][i]
|
||
pair = f"{name}-{money}"
|
||
processed_result['name_money'].append(pair)
|
||
log_verbose(f"图片 {image_prefix} 匹配对 #{i+1}: {pair}")
|
||
|
||
# 如果有多余的名称或金额,记录日志但不匹配
|
||
if max_name_len > match_len:
|
||
extra_names = processed_result['name'][match_len:]
|
||
logger.warning(f"图片 {image_prefix} 有 {len(extra_names)} 个未匹配的名称: {extra_names}")
|
||
|
||
if max_money_len > match_len:
|
||
extra_money = processed_result['money'][match_len:]
|
||
logger.warning(f"图片 {image_prefix} 有 {len(extra_money)} 个未匹配的金额: {extra_money}")
|
||
|
||
log_verbose(f"图片 {image_prefix} 处理完成,共生成 {len(processed_result['name_money'])} 个名称-金额对")
|
||
|
||
return processed_result
|
||
|
||
# 原始的顺序处理函数
|
||
def process_pdfs_in_directory_sequential(input_dir, output_dir, confidence_threshold=0.6):
|
||
"""
|
||
顺序处理目录中的所有PDF文件
|
||
|
||
Args:
|
||
input_dir: 输入目录,包含PDF文件
|
||
output_dir: 输出目录
|
||
confidence_threshold: 置信度阈值
|
||
|
||
Returns:
|
||
处理结果统计
|
||
"""
|
||
# 确保输出目录存在
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 查找所有PDF文件
|
||
pdf_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.pdf')]
|
||
|
||
if not pdf_files:
|
||
logger.warning(f"在 {input_dir} 目录中未找到PDF文件")
|
||
return {'processed': 0, 'success': 0, 'failed': 0, 'files': []}
|
||
|
||
logger.info(f"在 {input_dir} 目录中找到 {len(pdf_files)} 个PDF文件")
|
||
|
||
# 按照文件名排序,确保处理顺序一致
|
||
pdf_files.sort()
|
||
|
||
results = []
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
# 顺序处理每个PDF
|
||
for pdf_file in pdf_files:
|
||
pdf_path = os.path.join(input_dir, pdf_file)
|
||
pdf_name = os.path.splitext(pdf_file)[0]
|
||
|
||
# 为每个PDF创建单独的输出目录
|
||
pdf_output_dir = os.path.join(output_dir, pdf_name)
|
||
|
||
logger.info(f"开始处理 {pdf_file} ({len(results)+1}/{len(pdf_files)})")
|
||
|
||
# 处理PDF
|
||
result = process_pdf(pdf_path, pdf_output_dir, confidence_threshold)
|
||
|
||
if 'error' in result:
|
||
logger.error(f"处理 {pdf_file} 失败: {result['error']}")
|
||
failed_count += 1
|
||
else:
|
||
logger.info(f"成功处理 {pdf_file}")
|
||
success_count += 1
|
||
|
||
results.append(result)
|
||
|
||
# 创建汇总报告
|
||
summary = {
|
||
'processed': len(pdf_files),
|
||
'success': success_count,
|
||
'failed': failed_count,
|
||
'files': [r['pdf_file'] for r in results]
|
||
}
|
||
|
||
# 保存汇总报告
|
||
summary_path = os.path.join(output_dir, "processing_summary.json")
|
||
with open(summary_path, 'w', encoding='utf-8') as f:
|
||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"处理完成,共处理 {len(pdf_files)} 个PDF,成功 {success_count} 个,失败 {failed_count} 个")
|
||
logger.info(f"汇总报告已保存到: {summary_path}")
|
||
|
||
return summary
|
||
|
||
def process_pdfs_in_directory_parallel(input_dir, output_dir, confidence_threshold=0.6, max_workers=None):
|
||
"""
|
||
并行处理目录中的所有PDF文件
|
||
|
||
Args:
|
||
input_dir: 输入目录,包含PDF文件
|
||
output_dir: 输出目录
|
||
confidence_threshold: 置信度阈值
|
||
max_workers: 最大线程数,默认为CPU核心数
|
||
|
||
Returns:
|
||
处理结果统计
|
||
"""
|
||
# 确保输出目录存在
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 设置最大线程数
|
||
if max_workers is None:
|
||
max_workers = DEFAULT_MAX_WORKERS
|
||
|
||
# 查找所有PDF文件
|
||
pdf_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.pdf')]
|
||
|
||
if not pdf_files:
|
||
logger.warning(f"在 {input_dir} 目录中未找到PDF文件")
|
||
return {'processed': 0, 'success': 0, 'failed': 0, 'files': []}
|
||
|
||
logger.info(f"在 {input_dir} 目录中找到 {len(pdf_files)} 个PDF文件,将使用{max_workers}个线程并行处理")
|
||
|
||
# 按照文件名排序,确保处理顺序一致
|
||
pdf_files.sort()
|
||
|
||
# 使用线程安全的计数器和结果列表
|
||
from threading import Lock
|
||
results_lock = Lock()
|
||
results = []
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
# 定义处理单个PDF的函数
|
||
def process_single_pdf(pdf_file):
|
||
nonlocal success_count, failed_count
|
||
|
||
pdf_path = os.path.join(input_dir, pdf_file)
|
||
pdf_name = os.path.splitext(pdf_file)[0]
|
||
|
||
# 为每个PDF创建单独的输出目录
|
||
pdf_output_dir = os.path.join(output_dir, pdf_name)
|
||
|
||
logger.info(f"开始处理 {pdf_file}")
|
||
|
||
try:
|
||
# 处理PDF
|
||
result = process_pdf(pdf_path, pdf_output_dir, confidence_threshold)
|
||
|
||
# 使用线程锁更新结果
|
||
with results_lock:
|
||
if 'error' in result:
|
||
logger.error(f"处理 {pdf_file} 失败: {result['error']}")
|
||
failed_count += 1
|
||
else:
|
||
logger.info(f"成功处理 {pdf_file}")
|
||
success_count += 1
|
||
|
||
results.append(result)
|
||
|
||
return result
|
||
except Exception as e:
|
||
logger.error(f"处理 {pdf_file} 时发生异常: {str(e)}")
|
||
logger.error(traceback.format_exc())
|
||
|
||
# 使用线程锁更新结果
|
||
with results_lock:
|
||
failed_count += 1
|
||
error_result = {
|
||
'pdf_file': pdf_file,
|
||
'error': str(e)
|
||
}
|
||
results.append(error_result)
|
||
|
||
return error_result
|
||
|
||
# 使用线程池并行处理
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
# 提交所有任务
|
||
futures = [executor.submit(process_single_pdf, pdf_file) for pdf_file in pdf_files]
|
||
|
||
# 等待所有任务完成
|
||
concurrent.futures.wait(futures)
|
||
|
||
# 创建汇总报告
|
||
summary = {
|
||
'processed': len(pdf_files),
|
||
'success': success_count,
|
||
'failed': failed_count,
|
||
'files': [r['pdf_file'] for r in results]
|
||
}
|
||
|
||
# 保存汇总报告
|
||
summary_path = os.path.join(output_dir, "processing_summary.json")
|
||
with open(summary_path, 'w', encoding='utf-8') as f:
|
||
json.dump(summary, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"处理完成,共处理 {len(pdf_files)} 个PDF,成功 {success_count} 个,失败 {failed_count} 个")
|
||
logger.info(f"汇总报告已保存到: {summary_path}")
|
||
|
||
return summary
|
||
|
||
# 设置默认处理函数为并行版本
|
||
process_pdfs_in_directory = process_pdfs_in_directory_parallel
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description='OCR处理工具:处理PDF并提取姓名和金额')
|
||
|
||
# 添加参数
|
||
parser.add_argument('--input', '-i', required=True, help='输入PDF文件路径或包含PDF文件的目录')
|
||
parser.add_argument('--output', '-o', required=True, help='输出目录')
|
||
parser.add_argument('--confidence', '-c', type=float, default=0.6, help='置信度阈值,默认0.6')
|
||
parser.add_argument('--parallel', '-p', action='store_true', help='是否使用并行处理(目录模式)')
|
||
parser.add_argument('--workers', '-w', type=int, default=None, help='并行处理的最大线程数,默认为CPU核心数')
|
||
parser.add_argument('--verbose', '-v', action='store_true', help='启用详细日志输出')
|
||
parser.add_argument('--use-cache', action='store_true', help='启用缓存机制,加速重复处理')
|
||
parser.add_argument('--cache-dir', help='指定缓存目录,默认使用系统临时目录')
|
||
parser.add_argument('--clear-cache', action='store_true', help='在处理前清空缓存目录')
|
||
|
||
# 解析参数
|
||
args = parser.parse_args()
|
||
|
||
# 设置详细日志模式
|
||
global VERBOSE_LOGGING
|
||
if args.verbose:
|
||
VERBOSE_LOGGING = True
|
||
os.environ['OCR_VERBOSE_LOGGING'] = '1'
|
||
|
||
# 设置缓存相关环境变量
|
||
if args.use_cache:
|
||
os.environ['OCR_USE_CACHE'] = '1'
|
||
else:
|
||
os.environ['OCR_USE_CACHE'] = '0'
|
||
|
||
if args.cache_dir:
|
||
os.environ['OCR_CACHE_DIR'] = args.cache_dir
|
||
|
||
# 清空缓存
|
||
if args.clear_cache:
|
||
# 缓存目录通过环境变量获取
|
||
import tempfile
|
||
cache_dir = os.environ.get('OCR_CACHE_DIR', os.path.join(tempfile.gettempdir(), 'ocr_cache'))
|
||
if os.path.exists(cache_dir):
|
||
import shutil
|
||
shutil.rmtree(cache_dir)
|
||
logger.info(f"已清空缓存目录: {cache_dir}")
|
||
os.makedirs(cache_dir, exist_ok=True)
|
||
|
||
# 记录开始时间
|
||
start_time = time.time()
|
||
|
||
# 检查输入路径
|
||
if not os.path.exists(args.input):
|
||
logger.error(f"输入路径不存在: {args.input}")
|
||
return 1
|
||
|
||
# 创建输出目录
|
||
os.makedirs(args.output, exist_ok=True)
|
||
|
||
# 处理单个文件或目录
|
||
if os.path.isfile(args.input):
|
||
if not args.input.lower().endswith('.pdf'):
|
||
logger.error(f"输入文件不是PDF: {args.input}")
|
||
return 1
|
||
|
||
logger.info(f"开始处理单个PDF文件: {args.input}")
|
||
result = process_pdf(args.input, args.output, args.confidence)
|
||
|
||
if 'error' in result:
|
||
logger.error(f"处理失败: {result['error']}")
|
||
return 1
|
||
else:
|
||
logger.info("处理成功")
|
||
else:
|
||
logger.info(f"开始处理目录中的PDF文件: {args.input}")
|
||
|
||
# 根据参数选择处理方式
|
||
if args.parallel:
|
||
summary = process_pdfs_in_directory_parallel(args.input, args.output, args.confidence, args.workers)
|
||
else:
|
||
summary = process_pdfs_in_directory_sequential(args.input, args.output, args.confidence)
|
||
|
||
if summary['success'] == 0:
|
||
logger.warning("没有成功处理任何PDF文件")
|
||
if summary['processed'] > 0:
|
||
return 1
|
||
|
||
# 计算总处理时间
|
||
total_time = time.time() - start_time
|
||
logger.info(f"总处理时间: {total_time:.2f} 秒")
|
||
|
||
return 0
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
sys.exit(main())
|
||
except Exception as e:
|
||
logger.error(f"程序执行过程中发生错误: {str(e)}")
|
||
logger.error(f"详细错误: {traceback.format_exc()}")
|
||
sys.exit(1) |