招标解析异步任务

This commit is contained in:
cwchen 2025-11-25 13:13:58 +08:00
parent b919635515
commit 10c49e55bc
2 changed files with 93 additions and 135 deletions

View File

@ -1,6 +1,12 @@
package com.bonus.web.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.OcrService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@ -8,7 +14,10 @@ import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
/**
* @className:RabbitMQConsumerService
@ -21,6 +30,17 @@ import java.io.IOException;
@Slf4j
public class RabbitMQConsumerService {
@Resource
private OcrService ocrService;
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* 主消息消费者 - 使用手动确认模式
*/
@ -248,9 +268,60 @@ public class RabbitMQConsumerService {
private String performOcrProcessing(String filePath) {
// 模拟OCR处理
log.debug("执行OCR处理: {}", filePath);
String uploadPath = filePath;
// File fileFromMinio = getFileFromMinio(uploadPath);
File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf");
OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
log.info("ocrResponse响应结果:{}", ocrResponse);
return "OCR处理结果";
}
private OcrResponse performOcrRecognition(File file) {
try {
OcrRequest ocrRequest = buildOcrRequest(file);
OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
// 修复检查 OCR 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new BusinessException("OCR服务返回结果为空");
}
if (!isOcrResponseValid(ocrResponse)) {
throw new BusinessException("OCR识别结果无效");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
}
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new OcrServiceException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new OcrServiceException("获取Minio文件失败: " + uploadPath, e);
}
}
private OcrRequest buildOcrRequest(File file) {
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType("application/pdf");
ocrRequest.setFields_json("项目名称,合同签订时间,合同金额,建设地点,建设单位,建设单位电话,建设单位开户行,建设单位账号");
return ocrRequest;
}
private void saveOcrResult(String taskId, String result) {
// 模拟保存OCR结果
log.debug("保存OCR结果 - 任务ID: {}", taskId);

View File

@ -37,15 +37,13 @@ public class RabbitMQProducerService {
this.messageBuilder = messageBuilder;
this.pendingConfirmations = new ConcurrentHashMap<>();
// 关键修复设置确认回调
// 设置确认回调
RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback();
this.rabbitTemplate.setConfirmCallback(confirmCallback);
log.info("✅ RabbitMQ确认回调已设置 - ConfirmCallback: {}", confirmCallback != null ? "已设置" : "未设置");
// 设置返回回调用于处理路由失败
RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback();
this.rabbitTemplate.setReturnsCallback(returnsCallback);
log.info("✅ RabbitMQ返回回调已设置 - ReturnsCallback: {}", returnsCallback != null ? "已设置" : "未设置");
}
/**
@ -87,12 +85,9 @@ public class RabbitMQProducerService {
return CompletableFuture.supplyAsync(() -> {
String messageId = null;
try {
log.info("开始发送OCR消息 - TaskId: {}, File: {}", taskId, filePath);
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
messageId = message.getMessageId();
String result = sendMessageWithConfirmation(message);
log.info("OCR消息发送成功 - TaskId: {}, MessageId: {}", taskId, result);
return result;
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}",
taskId, filePath, messageId, e.getMessage(), e);
@ -107,19 +102,11 @@ public class RabbitMQProducerService {
* 处理OCR消息发送失败
*/
private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) {
try {
log.warn("处理OCR消息发送失败补偿 - TaskId: {}, MessageId: {}", taskId, messageId);
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
// 示例记录到日志文件或数据库
// failureRecordService.recordOcrMessageFailure(taskId, filePath, messageId, e.getMessage());
} catch (Exception ex) {
log.error("处理OCR消息发送失败补偿时发生异常", ex);
}
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
}
/**
@ -219,7 +206,6 @@ public class RabbitMQProducerService {
*/
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
String messageId = message.getMessageId();
long sendStartTime = System.currentTimeMillis();
// 为每次尝试创建新的CorrelationData
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
@ -232,30 +218,21 @@ public class RabbitMQProducerService {
// 如果之前有未完成的Future先移除重试场景
CompletableFuture<CorrelationData> oldFuture = pendingConfirmations.remove(futureKey);
if (oldFuture != null && !oldFuture.isDone()) {
log.warn("发现未完成的Future移除旧Future - ID: {}", messageId);
oldFuture.cancel(true);
}
pendingConfirmations.put(futureKey, confirmationFuture);
try {
log.info("📤 准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}, CorrelationId: {}",
messageId, message.getTaskName(), message.getUploadPath(), retryCount, correlationData.getId());
// 发送消息
try {
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
} catch (Exception sendException) {
log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 异常: {}",
messageId, correlationData.getId(), sendException.getMessage(), sendException);
log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException);
}
long sendEndTime = System.currentTimeMillis();
log.info("📨 消息已发送到Broker等待确认 - ID: {}, CorrelationId: {}, 发送耗时: {}ms",
messageId, correlationData.getId(), (sendEndTime - sendStartTime));
// 关键修复由于确认回调可能不触发采用短超时 + 备用方案
// 如果消息发送成功没有异常且确认回调在短时间内未触发则认为成功
int shortTimeoutSeconds = 3; // 短超时3秒
@ -268,14 +245,10 @@ public class RabbitMQProducerService {
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get(); // 获取确认结果
long confirmTime = System.currentTimeMillis();
log.info("✅ 消息发送确认成功(通过确认回调) - ID: {}, CorrelationId: {}, 总耗时: {}ms",
messageId, correlationData.getId(), (confirmTime - sendStartTime));
confirmReceived = true;
break;
} catch (Exception e) {
log.error("❌ 确认回调返回异常 - ID: {}, CorrelationId: {}, 异常: {}",
messageId, correlationData.getId(), e.getMessage(), e);
log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
@ -285,34 +258,21 @@ public class RabbitMQProducerService {
elapsed += checkInterval;
}
// 关键修复如果确认回调未触发但消息发送成功采用备用方案
// 如果确认回调未触发但消息发送成功采用备用方案
if (!confirmReceived) {
log.warn("⚠️ 确认回调未在{}秒内触发,采用备用方案 - ID: {}, CorrelationId: {}",
shortTimeoutSeconds, messageId, correlationData.getId());
// 备用方案如果消息发送成功没有异常且消费者可能已经消费则认为成功
// 这里我们等待一小段时间然后检查Future是否完成
// 如果仍未完成但消息发送成功则认为成功因为确认回调可能延迟或未触发
Thread.sleep(500); // 再等待500ms
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get();
log.info("✅ 消息发送确认成功(延迟确认回调) - ID: {}, CorrelationId: {}",
messageId, correlationData.getId());
confirmReceived = true;
} catch (Exception e) {
log.warn("⚠️ 延迟确认回调返回异常,但消息已发送成功 - ID: {}, 异常: {}",
messageId, e.getMessage());
// 忽略延迟确认回调的异常
}
}
if (!confirmReceived) {
// 备用方案消息发送成功没有异常认为成功
// 因为如果消息发送失败convertAndSend会抛出异常
log.warn("⚠️ 确认回调未触发,但消息发送成功,采用备用方案认为成功 - ID: {}, CorrelationId: {}",
messageId, correlationData.getId());
// 手动完成Future
confirmationFuture.complete(correlationData);
confirmReceived = true;
}
@ -325,30 +285,13 @@ public class RabbitMQProducerService {
}
// 超时处理如果备用方案也失败
log.error("❌ 消息确认失败(确认回调未触发且备用方案失败) - ID: {}, CorrelationId: {}, 重试次数: {}, 当前待确认数量: {}, 所有待确认ID: {}",
messageId, correlationData.getId(), retryCount, pendingConfirmations.size(), pendingConfirmations.keySet());
log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount);
// 检查确认回调是否已经触发但未匹配到Future
CompletableFuture<CorrelationData> currentFuture = pendingConfirmations.get(futureKey);
log.error("⚠️ 检查确认回调状态 - ID: {}, Future是否存在: {}, Future是否完成: {}, CorrelationId: {}",
messageId, pendingConfirmations.containsKey(futureKey),
currentFuture != null ? currentFuture.isDone() : "N/A", correlationData.getId());
// 打印所有待确认的Future状态
for (Map.Entry<String, CompletableFuture<CorrelationData>> entry : pendingConfirmations.entrySet()) {
log.error("待确认的Future详情 - Key: {}, Future是否完成: {}, Future是否取消: {}",
entry.getKey(), entry.getValue().isDone(), entry.getValue().isCancelled());
}
// 超时时移除对应的Future但保留一段时间以便确认回调可以记录日志
// 延迟移除给确认回调一个机会
// 延迟移除Future给确认回调一个机会
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000); // 等待3秒让确认回调有机会执行
CompletableFuture<CorrelationData> removed = pendingConfirmations.remove(futureKey);
if (removed != null) {
log.warn("超时后移除Future - ID: {}, Future是否完成: {}", messageId, removed.isDone());
}
Thread.sleep(3000);
pendingConfirmations.remove(futureKey);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
@ -361,10 +304,8 @@ public class RabbitMQProducerService {
// 重新抛出TimeoutException不在这里处理
throw e;
} catch (Exception e) {
String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 重试次数: {}, 异常类型: {}, 异常信息: {}",
messageId, correlationData != null ? correlationData.getId() : "N/A", retryCount,
e.getClass().getSimpleName(), errorMsg, e);
log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}",
messageId, retryCount, e.getMessage(), e);
// 异常时移除对应的Future
if (futureKey != null) {
pendingConfirmations.remove(futureKey, confirmationFuture);
@ -380,68 +321,30 @@ public class RabbitMQProducerService {
*/
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> {
long callbackTime = System.currentTimeMillis();
if (correlationData == null) {
log.error("❌ 确认回调中correlationData为null");
if (correlationData == null || correlationData.getId() == null) {
return;
}
String correlationId = correlationData.getId();
if (correlationId == null) {
log.error("❌ 确认回调中correlationId为null");
return;
}
// 关键修复使用ERROR级别记录确保能看到确认回调是否触发
log.error("📥 确认回调触发 - CorrelationId: {}, ACK: {}, 原因: {}, 当前待确认数量: {}, 所有待确认ID: {}",
correlationId, ack, cause, pendingConfirmations.size(), pendingConfirmations.keySet());
// 从correlationId中提取原始messageId去掉重试后缀
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future == null) {
// Future可能已经被移除超时或异常但确认回调仍然会触发
// 这是正常的因为确认回调是异步的
log.error("⚠️ 确认回调触发时未找到对应的Future可能已超时 - ID: {}, CorrelationId: {}, ACK: {}, 当前待确认数量: {}, 所有待确认ID: {}",
messageId, correlationId, ack, pendingConfirmations.size(), pendingConfirmations.keySet());
// 尝试在所有待确认的Future中查找可能是CorrelationId匹配问题
for (Map.Entry<String, CompletableFuture<CorrelationData>> entry : pendingConfirmations.entrySet()) {
log.error("待确认的Future - Key: {}, Future是否完成: {}", entry.getKey(), entry.getValue().isDone());
}
// Future可能已经被移除超时或异常这是正常的
return;
}
// 检查Future是否已经完成防止重复完成
if (future.isDone()) {
log.warn("⚠️ Future已经完成忽略重复的确认回调 - ID: {}, CorrelationId: {}, ACK: {}",
messageId, correlationId, ack);
return;
}
if (ack) {
log.error("✅ 消息发送确认成功Broker已接收 - ID: {}, CorrelationId: {}, 回调时间: {}ms",
messageId, correlationId, callbackTime);
// 完成Future解除阻塞
boolean completed = future.complete(correlationData);
if (!completed) {
log.error("⚠️ Future完成失败可能已被其他线程完成 - ID: {}", messageId);
} else {
log.error("✅ Future完成成功 - ID: {}, 当前待确认数量: {}", messageId, pendingConfirmations.size());
}
future.complete(correlationData);
} else {
String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
log.error("❌ 消息发送确认失败Broker拒绝接收 - ID: {}, CorrelationId: {}, 原因: {}",
messageId, correlationId, errorMsg);
// 完成Future并传递异常
boolean completed = future.completeExceptionally(new RuntimeException(errorMsg));
if (!completed) {
log.error("⚠️ Future异常完成失败可能已被其他线程完成 - ID: {}", messageId);
} else {
log.error("✅ Future异常完成成功 - ID: {}", messageId);
}
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg);
future.completeExceptionally(new RuntimeException(errorMsg));
}
};
}
@ -465,7 +368,6 @@ public class RabbitMQProducerService {
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
// 处理返回的消息
String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
if (correlationId != null) {
String messageId = extractOriginalMessageId(correlationId);
@ -474,13 +376,8 @@ public class RabbitMQProducerService {
String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
future.completeExceptionally(new RuntimeException(errorMsg));
// 路由失败后立即移除避免重复处理
pendingConfirmations.remove(messageId, future);
} else {
log.warn("路由失败的消息未找到对应的Future - CorrelationId: {}", correlationId);
}
} else {
log.error("路由失败的消息没有CorrelationId");
}
};
}
@ -493,11 +390,8 @@ public class RabbitMQProducerService {
String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId);
log.info("同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName());
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
log.info("同步发送完成 - ID: {}", messageId);
return messageId;
} catch (Exception e) {
log.error("同步发送消息失败: {}", e.getMessage(), e);
@ -516,19 +410,12 @@ public class RabbitMQProducerService {
* 清理过期的确认Future防止内存泄漏
*/
public void cleanupExpiredConfirmations() {
int initialSize = pendingConfirmations.size();
pendingConfirmations.entrySet().removeIf(entry -> {
CompletableFuture<CorrelationData> future = entry.getValue();
// 这里可以根据业务需求添加更复杂的过期判断逻辑
// 例如如果Future已经创建超过30分钟且未完成则移除
return future.isDone();
});
int removed = initialSize - pendingConfirmations.size();
if (removed > 0) {
log.info("清理了 {} 个已完成的确认Future", removed);
}
}
/**