137 lines
5.4 KiB
Python
137 lines
5.4 KiB
Python
import fitz # PyMuPDF
|
||
from paddleocr import PaddleOCR
|
||
from PIL import Image
|
||
import io
|
||
import numpy as np
|
||
from typing import List, Dict, Any
|
||
import traceback
|
||
import json
|
||
import traceback
|
||
import os
|
||
|
||
class PaddlePdfProcessor:
|
||
"""
|
||
一个基于 PaddleOCR 的健壮PDF处理器。
|
||
(最终版:适配 PyMuPDF, PaddlePaddle 3.x, 和 PaddleOCR 3.x API)
|
||
"""
|
||
|
||
def __init__(self, pdf_path: str):
|
||
self.pdf_path = pdf_path
|
||
self.doc = fitz.open(pdf_path)
|
||
print("正在初始化 PaddleOCR 模型...")
|
||
self.ocr_engine = PaddleOCR(use_textline_orientation=True, lang='ch')
|
||
print("PaddleOCR 初始化完成。")
|
||
|
||
def process_page(self, page_num: int) -> List[Dict[str, Any]]:
|
||
page = self.doc[page_num - 1]
|
||
pix = page.get_pixmap(dpi=200) # type: ignore
|
||
img_bytes = pix.tobytes("png")
|
||
image = Image.open(io.BytesIO(img_bytes)).convert("RGB")
|
||
img_array = np.array(image)
|
||
|
||
result = self.ocr_engine.predict(input=img_array)
|
||
# result[0] 是一个 OCRResult 对象
|
||
structured_blocks = self._parse_paddle_result(result[0] if result else None, page_num)
|
||
return structured_blocks
|
||
|
||
def _parse_paddle_result(self, ocr_result, page_num: int) -> List[Dict[str, Any]]:
|
||
"""
|
||
解析 PaddleOCR 3.x 返回的 OCRResult 对象。
|
||
"""
|
||
blocks = []
|
||
# 如果 ocr_result 为 None 或为空,直接返回
|
||
if not ocr_result:
|
||
return blocks
|
||
|
||
# 从 OCRResult 对象中,通过键名提取三个平行的列表
|
||
boxes = ocr_result['dt_polys']
|
||
texts = ocr_result['rec_texts']
|
||
scores = ocr_result['rec_scores']
|
||
|
||
# 使用 zip() 函数同时遍历这三个列表
|
||
for box, text, score in zip(boxes, texts, scores):
|
||
# 将多边形坐标(box)转换为简单的矩形边界框(bbox)
|
||
# box 是一个 numpy 数组,形状类似 [[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
|
||
x_coords = [p[0] for p in box]
|
||
y_coords = [p[1] for p in box]
|
||
bbox = [min(x_coords), min(y_coords), max(x_coords), max(y_coords)]
|
||
|
||
blocks.append({
|
||
"type": "text",
|
||
"content": text,
|
||
"bbox": bbox,
|
||
"page_num": page_num,
|
||
"confidence": score
|
||
})
|
||
|
||
# 按从上到下、从左到右的阅读顺序对文本块进行排序
|
||
return sorted(blocks, key=lambda b: (b['bbox'][1], b['bbox'][0]))
|
||
|
||
def process_all_pages(self) -> List[Dict[str, Any]]:
|
||
all_content_blocks = []
|
||
for i in range(len(self.doc)):
|
||
page_number = i + 1
|
||
print(f"--- 正在处理第 {page_number}/{len(self.doc)} 页 ---")
|
||
blocks = self.process_page(page_number)
|
||
all_content_blocks.extend(blocks)
|
||
return all_content_blocks
|
||
|
||
def close(self):
|
||
self.doc.close()
|
||
|
||
# --- 主程序入口 ---
|
||
# --- 主程序入口 (包含保存和加载逻辑) ---
|
||
if __name__ == '__main__':
|
||
# 1. 定义输入和输出文件路径
|
||
pdf_path = "../document/sample_bidding_document.pdf"
|
||
# 我们将把解析结果保存到这个JSON文件中
|
||
parsed_output_path = "parsed_output.json"
|
||
|
||
all_blocks = []
|
||
|
||
# 2. 检查解析结果是否已存在
|
||
if os.path.exists(parsed_output_path):
|
||
print(f"✅ 找到了已保存的解析结果 '{parsed_output_path}',直接加载...")
|
||
with open(parsed_output_path, 'r', encoding='utf-8') as f:
|
||
all_blocks = json.load(f)
|
||
else:
|
||
print(f"未找到已保存的结果,启动完整的PDF解析流程...")
|
||
print(f"准备处理文件: {pdf_path}")
|
||
|
||
try:
|
||
# 启动耗时的解析过程
|
||
processor = PaddlePdfProcessor(pdf_path)
|
||
all_blocks = processor.process_all_pages()
|
||
processor.close()
|
||
|
||
# 3. 将新解析的结果保存到文件中,供下次使用
|
||
print(f"\n💾 正在将解析结果保存到 '{parsed_output_path}'...")
|
||
with open(parsed_output_path, 'w', encoding='utf-8') as f:
|
||
# indent=4 让JSON文件格式化,更易读
|
||
# ensure_ascii=False 确保中文字符能被正确写入
|
||
json.dump(all_blocks, f, indent=4, ensure_ascii=False)
|
||
print("保存成功!")
|
||
|
||
except FileNotFoundError:
|
||
print(f"❌ 错误: 文件 '{pdf_path}' 未找到。")
|
||
except Exception as e:
|
||
print(f"❌ 程序在解析过程中发生严重错误: {e}")
|
||
traceback.print_exc()
|
||
|
||
# 4. 无论数据是加载的还是新生成的,都可以在这里进行后续处理
|
||
if all_blocks:
|
||
print("\n" + "="*50)
|
||
print(f"数据加载/处理完成,共获得 {len(all_blocks)} 个文本块。")
|
||
print("="*50)
|
||
|
||
print("\n--- 处理结果预览 (前 5 个文本块) ---")
|
||
for block in all_blocks[:5]:
|
||
# 注意:json会把元组(tuple)存成列表(list),所以bbox现在是列表
|
||
print(f"P{block['page_num']}: {block['content']} (置信度: {block['confidence']:.4f})")
|
||
|
||
# 在这里调用您后续的分析函数,例如段落合并
|
||
# print("\n--- 开始进行段落合并 ---")
|
||
# paragraphs = assemble_paragraphs(all_blocks)
|
||
# ...
|
||
else:
|
||
print("未能获取任何文本块,程序结束。") |