payoff_OCR/main-1.py

803 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import json
import logging
import time
import glob
import re
import argparse
import shutil
import traceback
import concurrent.futures
from datetime import datetime
from pdf_to_table import process_pdf_to_table, post_process_money_texts
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# 优化批量处理的线程数
DEFAULT_MAX_WORKERS = max(4, os.cpu_count() or 4)
# 设置一个环境变量控制日志详细程度
VERBOSE_LOGGING = os.environ.get('OCR_VERBOSE_LOGGING', '0') == '1'
def log_verbose(message):
"""只在详细日志模式下记录信息"""
if VERBOSE_LOGGING:
logger.info(message)
def parse_regions_file(txt_file):
"""从regions文本文件提取name和money信息"""
name_data = []
money_data = []
current_section = None
try:
logger.info(f"开始解析文件: {txt_file}")
# 使用更高效的文件读取方式
with open(txt_file, 'r', encoding='utf-8') as f:
# 一次性读取所有行减少IO操作
lines = f.readlines()
# 预先定义正则表达式进行匹配,提高效率
item_pattern = re.compile(r'^\s+(\d+)\.\s+(.+)$')
# 直接遍历处理,避免不必要的日志记录
for line in lines:
line = line.strip()
if not line:
continue
if 'NAME 区域' in line:
current_section = 'name'
continue
elif 'MONEY 区域' in line:
current_section = 'money'
continue
elif '===' in line: # 区域分隔符,重置当前部分
current_section = None
continue
# 使用正则表达式匹配,更高效
if current_section:
match = item_pattern.match(line)
if match:
content = match.group(2).strip()
if current_section == 'name':
name_data.append(content)
elif current_section == 'money':
money_data.append(content)
# 只在处理完成后记录一次结果
logger.info(f"{txt_file}解析结果: {len(name_data)}个名称, {len(money_data)}个金额")
# 只有当实际有内容时才记录示例
if name_data and len(name_data) > 0:
examples = name_data[:min(3, len(name_data))]
logger.info(f"名称示例: {examples}")
if money_data and len(money_data) > 0:
examples = money_data[:min(3, len(money_data))]
logger.info(f"金额示例: {examples}")
except Exception as e:
logger.error(f"解析结果文件时出错: {str(e)}")
logger.error(f"详细错误: {traceback.format_exc()}")
return {
'name': name_data,
'money': money_data
}
def extract_texts_from_directory(directory, image_prefix):
"""
直接从目录中的文本文件提取name和money文本
作为parse_regions_file的备用方法
Args:
directory: 包含文本文件的目录路径
image_prefix: 图像前缀(如"5"表示提取5_name_*.txt和5_money_*.txt文件)
Returns:
包含name和money列表的字典
"""
name_data = []
money_data = []
try:
# 使用glob模式直接查找匹配的文件
name_patterns = [
os.path.join(directory, f"{image_prefix}_name_*_texts.txt"),
os.path.join(directory, f"{image_prefix}_rotated_name_*_texts.txt")
]
money_patterns = [
os.path.join(directory, f"{image_prefix}_money_*_texts.txt"),
os.path.join(directory, f"{image_prefix}_rotated_money_*_texts.txt")
]
name_files = []
for pattern in name_patterns:
name_files.extend(glob.glob(pattern))
money_files = []
for pattern in money_patterns:
money_files.extend(glob.glob(pattern))
# 如果在当前目录找不到,检查子目录
if not name_files and not money_files:
image_dir = os.path.join(directory, image_prefix)
if os.path.isdir(image_dir):
# 同样支持旋转与非旋转的文件格式
name_patterns = [
os.path.join(image_dir, f"{image_prefix}_name_*_texts.txt"),
os.path.join(image_dir, f"{image_prefix}_rotated_name_*_texts.txt")
]
money_patterns = [
os.path.join(image_dir, f"{image_prefix}_money_*_texts.txt"),
os.path.join(image_dir, f"{image_prefix}_rotated_money_*_texts.txt")
]
name_files = []
for pattern in name_patterns:
name_files.extend(glob.glob(pattern))
money_files = []
for pattern in money_patterns:
money_files.extend(glob.glob(pattern))
# 定义排序函数
def extract_index(file_path):
try:
# 使用split+basename更高效
parts = os.path.basename(file_path).split('_')
# 处理旋转图片的情况
if 'rotated' in parts:
# 如果是rotated文件则索引位置会不同
idx_pos = 3 if len(parts) >= 4 else 0
else:
# 非旋转图片的正常情况
idx_pos = 2 if len(parts) >= 3 else 0
return int(parts[idx_pos]) if idx_pos < len(parts) else 0
except (IndexError, ValueError):
return float('inf')
# 对文件列表进行排序
name_files.sort(key=extract_index)
money_files.sort(key=extract_index)
# 只有在找到文件时才记录
if name_files or money_files:
logger.info(f"找到 {len(name_files)} 个name文本文件和 {len(money_files)} 个money文本文件")
# 使用更高效的读取方式
for file_path in name_files:
with open(file_path, 'r', encoding='utf-8') as f:
# 一次性读取并处理
lines = [line.strip() for line in f if line.strip()]
name_data.extend(lines)
logger.info(f"{os.path.basename(file_path)} 提取了 {len(lines)} 个名称")
for file_path in money_files:
with open(file_path, 'r', encoding='utf-8') as f:
# 一次性读取并处理
lines = [line.strip() for line in f if line.strip()]
money_data.extend(lines)
logger.info(f"{os.path.basename(file_path)} 提取了 {len(lines)} 个金额")
if name_data or money_data:
logger.info(f"直接从文本文件提取: {len(name_data)}个名称, {len(money_data)}个金额")
except Exception as e:
logger.error(f"从目录提取文本时出错: {str(e)}")
logger.error(f"详细错误: {traceback.format_exc()}")
return {
'name': name_data,
'money': money_data
}
def process_pdf(pdf_path, output_dir, confidence_threshold=0.6):
"""
处理单个PDF文件
Args:
pdf_path: PDF文件路径
output_dir: 输出目录
confidence_threshold: 置信度阈值
Returns:
处理结果字典
"""
try:
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 获取PDF文件名
pdf_filename = os.path.basename(pdf_path)
pdf_name = os.path.splitext(pdf_filename)[0]
logger.info(f"开始处理PDF: {pdf_path}")
# 获取PDF文件大小
file_size = os.path.getsize(pdf_path) / (1024 * 1024) # 转换为MB
logger.info(f"文件大小: {file_size:.2f} MB")
# 创建临时目录存放PDF
pdf_folder = os.path.join(output_dir, "temp_pdf")
os.makedirs(pdf_folder, exist_ok=True)
# 复制PDF到临时目录
temp_pdf_path = os.path.join(pdf_folder, pdf_filename)
shutil.copy2(pdf_path, temp_pdf_path)
# 记录处理开始时间
start_time = time.time()
# 处理PDF文件
stats = process_pdf_to_table(
pdf_folder=pdf_folder,
output_folder=output_dir,
confidence_threshold=confidence_threshold
)
# 计算处理时间
processing_time = time.time() - start_time
logger.info(f"处理耗时: {processing_time:.2f}")
# 记录性能数据
stats['performance'] = {
'file_size_mb': round(file_size, 2),
'processing_time_seconds': round(processing_time, 2),
'processing_speed_mb_per_second': round(file_size / processing_time, 4) if processing_time > 0 else 0
}
# 解析结果
all_results = []
results_folder = os.path.join(output_dir, "results")
if os.path.exists(results_folder):
# 使用glob快速查找所有regions.txt文件
region_files = glob.glob(os.path.join(results_folder, "*_regions.txt"))
# 按文件名中的数字进行排序
def extract_number(file_path):
try:
filename = os.path.basename(file_path)
number_str = filename.split('_')[0]
return int(number_str)
except (IndexError, ValueError):
return float('inf')
# 按文件名中的数字从小到大排序
region_files.sort(key=extract_number)
if region_files:
logger.info(f"找到 {len(region_files)} 个区域文件用于解析")
# 顺序处理区域文件
for region_file in region_files:
image_prefix = os.path.basename(region_file).split('_')[0]
result = parse_regions_file(region_file)
# 只有当解析到内容时才添加结果
if result['name'] or result['money']:
# 对当前图片的数据进行处理和匹配
result = process_single_image_data(result, image_prefix)
all_results.append(result)
logger.info(f"{image_prefix} 解析到内容,并完成了匹配处理")
else:
# 备用方法直接从texts.txt文件读取
backup_result = extract_texts_from_directory(results_folder, image_prefix)
if backup_result['name'] or backup_result['money']:
# 对备用方法提取的数据也进行处理和匹配
backup_result = process_single_image_data(backup_result, image_prefix)
all_results.append(backup_result)
logger.info(f"使用备用方法从 {image_prefix} 成功提取内容,并完成了匹配处理")
else:
logger.warning("未找到任何regions.txt文件")
# 如果没有从regions.txt解析到结果尝试直接从子目录提取
if not all_results:
logger.warning("未从regions.txt文件解析到内容尝试从子目录直接提取")
# 获取所有子目录并排序
subdirs = []
if os.path.exists(results_folder):
subdirs = [d for d in os.listdir(results_folder)
if os.path.isdir(os.path.join(results_folder, d))]
# 按目录名中的数字排序
def extract_dir_number(dirname):
try:
return int(dirname)
except ValueError:
return float('inf')
subdirs.sort(key=extract_dir_number)
if subdirs:
logger.info(f"找到 {len(subdirs)} 个子目录,准备提取")
# 使用顺序处理子目录
ordered_results = []
for dirname in subdirs:
result_dir = os.path.join(results_folder, dirname)
backup_result = extract_texts_from_directory(result_dir, dirname)
if backup_result['name'] or backup_result['money']:
# 对子目录数据进行处理和匹配
backup_result = process_single_image_data(backup_result, dirname)
ordered_results.append((int(dirname) if dirname.isdigit() else float('inf'), backup_result))
# 按子目录编号排序后添加到结果
ordered_results.sort(key=lambda x: x[0])
for _, result in ordered_results:
all_results.append(result)
# 合并所有结果
merged_result = {'name': [], 'money': [], 'name_money': []}
if all_results:
# 合并所有图片的处理结果,保持每个图片内的匹配关系
for result in all_results:
merged_result['name'].extend(result.get('name', []))
merged_result['money'].extend(result.get('money', []))
merged_result['name_money'].extend(result.get('name_money', []))
# 注意:由于每个图片已经单独处理并匹配,此处不再需要全局匹配逻辑
# 记录处理结果摘要
logger.info(f"PDF {pdf_filename} 处理完成,找到 {len(merged_result['name'])} 个名称, {len(merged_result['money'])} 个金额, {len(merged_result['name_money'])} 个名称-金额对")
# 保存最终结果到JSON文件
result_json_path = os.path.join(output_dir, f"{pdf_name}_result.json")
with open(result_json_path, 'w', encoding='utf-8') as f:
json.dump({
'pdf_file': pdf_filename,
'name': merged_result['name'],
'money': merged_result['money'],
'name_money': merged_result['name_money'],
'stats': stats
}, f, ensure_ascii=False, indent=2)
logger.info(f"JSON结果已保存到: {result_json_path}")
# 保存最终结果到TXT文件
result_txt_path = os.path.join(output_dir, f"{pdf_name}_result.txt")
with open(result_txt_path, 'w', encoding='utf-8') as f:
f.write(f"PDF文件{pdf_filename} 处理结果\n")
f.write(f"处理时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 80 + "\n\n")
# 写入name-money对
f.write(f"共找到 {len(merged_result['name_money'])} 个名称-金额对:\n\n")
for i, pair in enumerate(merged_result['name_money'], 1):
f.write(f"{i}. {pair}\n")
f.write("\n" + "=" * 80 + "\n")
logger.info(f"TXT结果已保存到: {result_txt_path}")
# 清理临时目录
try:
shutil.rmtree(pdf_folder)
logger.info("已清理临时PDF目录")
except Exception as e:
logger.warning(f"清理临时目录时出错: {str(e)}")
return {
'pdf_file': pdf_filename,
'result': merged_result,
'stats': stats
}
except Exception as e:
logger.error(f"处理PDF {pdf_path} 时出错: {str(e)}")
logger.error(f"详细错误: {traceback.format_exc()}")
return {
'pdf_file': os.path.basename(pdf_path),
'error': str(e)
}
def process_single_image_data(result, image_prefix):
"""
处理单张图片的名称和金额数据,解决匹配问题
Args:
result: 包含name和money的字典
image_prefix: 图片前缀标识
Returns:
处理后的结果字典包含匹配好的name_money对
"""
log_verbose(f"开始处理图片 {image_prefix} 的数据匹配")
# 复制原始数据,避免修改原始数据
processed_result = {
'name': result.get('name', []).copy(),
'money': result.get('money', []).copy(),
'name_money': []
}
# 对金额应用后处理函数 - 使用专门的后处理函数
processed_result['money'] = post_process_money_texts(processed_result['money'])
log_verbose(f"图片 {image_prefix} 金额后处理后数量: {len(processed_result['money'])}")
# 金额格式化和验证 - 确保金额格式正确
validated_money = []
for money in processed_result['money']:
# 尝试清理和标准化金额格式
cleaned_money = money.strip()
# 移除可能的非数字字符(保留数字、小数点和逗号)
cleaned_money = re.sub(r'[^\d.,]', '', cleaned_money)
# 检查是否包含小数点,如果没有可能需要添加
if '.' not in cleaned_money and cleaned_money.isdigit():
cleaned_money = cleaned_money + '.00'
log_verbose(f"格式化金额: {money} -> {cleaned_money}")
# 确保小数点后有两位数字
elif '.' in cleaned_money:
parts = cleaned_money.split('.')
if len(parts) == 2:
integer_part, decimal_part = parts
# 如果小数部分长度不足2补零
if len(decimal_part) < 2:
decimal_part = decimal_part.ljust(2, '0')
# 如果小数部分超过2位可能需要截断或四舍五入(根据需求)
elif len(decimal_part) > 2:
decimal_part = decimal_part[:2]
cleaned_money = integer_part + '.' + decimal_part
log_verbose(f"标准化金额: {money} -> {cleaned_money}")
# 检查金额格式是否有效
try:
float_value = float(cleaned_money.replace(',', ''))
if float_value < 0.01:
logger.warning(f"忽略异常小的金额值: {cleaned_money}")
continue
validated_money.append(cleaned_money)
except ValueError:
logger.warning(f"忽略无效金额格式: {cleaned_money}")
# 更新为验证后的金额
processed_result['money'] = validated_money
# 处理name和money数量不匹配的情况
max_name_len = len(processed_result['name'])
max_money_len = len(processed_result['money'])
# 记录name和money配对前的情况
log_verbose(f"图片 {image_prefix} 配对前: {max_name_len}个名称, {max_money_len}个金额")
# 判断是否存在不匹配
if max_name_len != max_money_len:
logger.warning(f"图片 {image_prefix} 名称和金额数量不匹配: {max_name_len}个名称 vs {max_money_len}个金额")
# 处理姓名中可能存在的误识别情况
cleaned_names = []
for name in processed_result['name']:
# 判断姓名中是否包含错误识别的数字和非中文字符
clean_name = re.sub(r'[0-9a-zA-Z.,:;!@#$%^&*()_+={}\[\]|\\/<>?~`-]', '', name)
if clean_name != name:
log_verbose(f"清理姓名中的误识别: {name} -> {clean_name}")
# 如果清理后不为空,则保留
if clean_name.strip():
cleaned_names.append(clean_name.strip())
else:
logger.warning(f"姓名'{name}'在清理后为空,已移除")
# 更新清理后的姓名列表
processed_result['name'] = cleaned_names
# 再次检查数量
max_name_len = len(processed_result['name'])
log_verbose(f"清理后: {max_name_len}个名称, {max_money_len}个金额")
# 创建最终的配对结果
# 采用截断方式匹配,确保数据不会错位
match_len = min(max_name_len, max_money_len)
if match_len > 0:
for i in range(match_len):
name = processed_result['name'][i]
money = processed_result['money'][i]
pair = f"{name}-{money}"
processed_result['name_money'].append(pair)
log_verbose(f"图片 {image_prefix} 匹配对 #{i+1}: {pair}")
# 如果有多余的名称或金额,记录日志但不匹配
if max_name_len > match_len:
extra_names = processed_result['name'][match_len:]
logger.warning(f"图片 {image_prefix}{len(extra_names)} 个未匹配的名称: {extra_names}")
if max_money_len > match_len:
extra_money = processed_result['money'][match_len:]
logger.warning(f"图片 {image_prefix}{len(extra_money)} 个未匹配的金额: {extra_money}")
log_verbose(f"图片 {image_prefix} 处理完成,共生成 {len(processed_result['name_money'])} 个名称-金额对")
return processed_result
# 原始的顺序处理函数
def process_pdfs_in_directory_sequential(input_dir, output_dir, confidence_threshold=0.6):
"""
顺序处理目录中的所有PDF文件
Args:
input_dir: 输入目录包含PDF文件
output_dir: 输出目录
confidence_threshold: 置信度阈值
Returns:
处理结果统计
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 查找所有PDF文件
pdf_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.pdf')]
if not pdf_files:
logger.warning(f"{input_dir} 目录中未找到PDF文件")
return {'processed': 0, 'success': 0, 'failed': 0, 'files': []}
logger.info(f"{input_dir} 目录中找到 {len(pdf_files)} 个PDF文件")
# 按照文件名排序,确保处理顺序一致
pdf_files.sort()
results = []
success_count = 0
failed_count = 0
# 顺序处理每个PDF
for pdf_file in pdf_files:
pdf_path = os.path.join(input_dir, pdf_file)
pdf_name = os.path.splitext(pdf_file)[0]
# 为每个PDF创建单独的输出目录
pdf_output_dir = os.path.join(output_dir, pdf_name)
logger.info(f"开始处理 {pdf_file} ({len(results)+1}/{len(pdf_files)})")
# 处理PDF
result = process_pdf(pdf_path, pdf_output_dir, confidence_threshold)
if 'error' in result:
logger.error(f"处理 {pdf_file} 失败: {result['error']}")
failed_count += 1
else:
logger.info(f"成功处理 {pdf_file}")
success_count += 1
results.append(result)
# 创建汇总报告
summary = {
'processed': len(pdf_files),
'success': success_count,
'failed': failed_count,
'files': [r['pdf_file'] for r in results]
}
# 保存汇总报告
summary_path = os.path.join(output_dir, "processing_summary.json")
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"处理完成,共处理 {len(pdf_files)} 个PDF成功 {success_count} 个,失败 {failed_count}")
logger.info(f"汇总报告已保存到: {summary_path}")
return summary
def process_pdfs_in_directory_parallel(input_dir, output_dir, confidence_threshold=0.6, max_workers=None):
"""
并行处理目录中的所有PDF文件
Args:
input_dir: 输入目录包含PDF文件
output_dir: 输出目录
confidence_threshold: 置信度阈值
max_workers: 最大线程数默认为CPU核心数
Returns:
处理结果统计
"""
# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)
# 设置最大线程数
if max_workers is None:
max_workers = DEFAULT_MAX_WORKERS
# 查找所有PDF文件
pdf_files = [f for f in os.listdir(input_dir) if f.lower().endswith('.pdf')]
if not pdf_files:
logger.warning(f"{input_dir} 目录中未找到PDF文件")
return {'processed': 0, 'success': 0, 'failed': 0, 'files': []}
logger.info(f"{input_dir} 目录中找到 {len(pdf_files)} 个PDF文件将使用{max_workers}个线程并行处理")
# 按照文件名排序,确保处理顺序一致
pdf_files.sort()
# 使用线程安全的计数器和结果列表
from threading import Lock
results_lock = Lock()
results = []
success_count = 0
failed_count = 0
# 定义处理单个PDF的函数
def process_single_pdf(pdf_file):
nonlocal success_count, failed_count
pdf_path = os.path.join(input_dir, pdf_file)
pdf_name = os.path.splitext(pdf_file)[0]
# 为每个PDF创建单独的输出目录
pdf_output_dir = os.path.join(output_dir, pdf_name)
logger.info(f"开始处理 {pdf_file}")
try:
# 处理PDF
result = process_pdf(pdf_path, pdf_output_dir, confidence_threshold)
# 使用线程锁更新结果
with results_lock:
if 'error' in result:
logger.error(f"处理 {pdf_file} 失败: {result['error']}")
failed_count += 1
else:
logger.info(f"成功处理 {pdf_file}")
success_count += 1
results.append(result)
return result
except Exception as e:
logger.error(f"处理 {pdf_file} 时发生异常: {str(e)}")
logger.error(traceback.format_exc())
# 使用线程锁更新结果
with results_lock:
failed_count += 1
error_result = {
'pdf_file': pdf_file,
'error': str(e)
}
results.append(error_result)
return error_result
# 使用线程池并行处理
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
futures = [executor.submit(process_single_pdf, pdf_file) for pdf_file in pdf_files]
# 等待所有任务完成
concurrent.futures.wait(futures)
# 创建汇总报告
summary = {
'processed': len(pdf_files),
'success': success_count,
'failed': failed_count,
'files': [r['pdf_file'] for r in results]
}
# 保存汇总报告
summary_path = os.path.join(output_dir, "processing_summary.json")
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
logger.info(f"处理完成,共处理 {len(pdf_files)} 个PDF成功 {success_count} 个,失败 {failed_count}")
logger.info(f"汇总报告已保存到: {summary_path}")
return summary
# 设置默认处理函数为并行版本
process_pdfs_in_directory = process_pdfs_in_directory_parallel
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='OCR处理工具处理PDF并提取姓名和金额')
# 添加参数
parser.add_argument('--input', '-i', required=True, help='输入PDF文件路径或包含PDF文件的目录')
parser.add_argument('--output', '-o', required=True, help='输出目录')
parser.add_argument('--confidence', '-c', type=float, default=0.6, help='置信度阈值默认0.6')
parser.add_argument('--parallel', '-p', action='store_true', help='是否使用并行处理(目录模式)')
parser.add_argument('--workers', '-w', type=int, default=None, help='并行处理的最大线程数默认为CPU核心数')
parser.add_argument('--verbose', '-v', action='store_true', help='启用详细日志输出')
parser.add_argument('--use-cache', action='store_true', help='启用缓存机制,加速重复处理')
parser.add_argument('--cache-dir', help='指定缓存目录,默认使用系统临时目录')
parser.add_argument('--clear-cache', action='store_true', help='在处理前清空缓存目录')
# 解析参数
args = parser.parse_args()
# 设置详细日志模式
global VERBOSE_LOGGING
if args.verbose:
VERBOSE_LOGGING = True
os.environ['OCR_VERBOSE_LOGGING'] = '1'
# 设置缓存相关环境变量
if args.use_cache:
os.environ['OCR_USE_CACHE'] = '1'
else:
os.environ['OCR_USE_CACHE'] = '0'
if args.cache_dir:
os.environ['OCR_CACHE_DIR'] = args.cache_dir
# 清空缓存
if args.clear_cache:
# 缓存目录通过环境变量获取
import tempfile
cache_dir = os.environ.get('OCR_CACHE_DIR', os.path.join(tempfile.gettempdir(), 'ocr_cache'))
if os.path.exists(cache_dir):
import shutil
shutil.rmtree(cache_dir)
logger.info(f"已清空缓存目录: {cache_dir}")
os.makedirs(cache_dir, exist_ok=True)
# 记录开始时间
start_time = time.time()
# 检查输入路径
if not os.path.exists(args.input):
logger.error(f"输入路径不存在: {args.input}")
return 1
# 创建输出目录
os.makedirs(args.output, exist_ok=True)
# 处理单个文件或目录
if os.path.isfile(args.input):
if not args.input.lower().endswith('.pdf'):
logger.error(f"输入文件不是PDF: {args.input}")
return 1
logger.info(f"开始处理单个PDF文件: {args.input}")
result = process_pdf(args.input, args.output, args.confidence)
if 'error' in result:
logger.error(f"处理失败: {result['error']}")
return 1
else:
logger.info("处理成功")
else:
logger.info(f"开始处理目录中的PDF文件: {args.input}")
# 根据参数选择处理方式
if args.parallel:
summary = process_pdfs_in_directory_parallel(args.input, args.output, args.confidence, args.workers)
else:
summary = process_pdfs_in_directory_sequential(args.input, args.output, args.confidence)
if summary['success'] == 0:
logger.warning("没有成功处理任何PDF文件")
if summary['processed'] > 0:
return 1
# 计算总处理时间
total_time = time.time() - start_time
logger.info(f"总处理时间: {total_time:.2f}")
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception as e:
logger.error(f"程序执行过程中发生错误: {str(e)}")
logger.error(f"详细错误: {traceback.format_exc()}")
sys.exit(1)