diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java index 5352eca..fb61e62 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java @@ -1,6 +1,12 @@ package com.bonus.web.rabbitmq.consumer; +import com.bonus.common.domain.ocr.dto.OcrRequest; +import com.bonus.common.domain.ocr.vo.OcrResponse; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.bonus.file.config.MinioConfig; +import com.bonus.file.util.MinioUtil; +import com.bonus.ocr.service.OcrService; +import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -8,7 +14,10 @@ import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; +import javax.annotation.Resource; +import java.io.File; import java.io.IOException; +import java.util.Objects; /** * @className:RabbitMQConsumerService @@ -21,6 +30,17 @@ import java.io.IOException; @Slf4j public class RabbitMQConsumerService { + @Resource + private OcrService ocrService; + + @Resource + private MinioConfig minioConfig; + + @Resource + private MinioUtil minioUtil; + + private final ObjectMapper objectMapper = new ObjectMapper(); + /** * 主消息消费者 - 使用手动确认模式 */ @@ -248,9 +268,60 @@ public class RabbitMQConsumerService { private String performOcrProcessing(String filePath) { // 模拟OCR处理 log.debug("执行OCR处理: {}", filePath); + String uploadPath = filePath; +// File fileFromMinio = getFileFromMinio(uploadPath); + File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf"); + OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); + log.info("ocrResponse响应结果:{}", ocrResponse); return "OCR处理结果"; } + private OcrResponse performOcrRecognition(File file) { + try { + OcrRequest ocrRequest = buildOcrRequest(file); + OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest); + // 修复:检查 OCR 响应是否为 null + if (Objects.isNull(ocrResponse)) { + throw new BusinessException("OCR服务返回结果为空"); + } + + if (!isOcrResponseValid(ocrResponse)) { + throw new BusinessException("OCR识别结果无效"); + } + + log.info("OCR识别成功 - 数据: {}", ocrResponse.getData()); + return ocrResponse; + + } catch (Exception e) { + log.error("OCR识别失败", e); + throw new RuntimeException("OCR识别失败: " + e.getMessage(), e); + } + } + + private boolean isOcrResponseValid(OcrResponse ocrResponse) { + return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty(); + } + + private File getFileFromMinio(String uploadPath) { + try { + File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath); + if (file == null || !file.exists()) { + throw new OcrServiceException("Minio文件不存在: " + uploadPath); + } + return file; + } catch (Exception e) { + throw new OcrServiceException("获取Minio文件失败: " + uploadPath, e); + } + } + + private OcrRequest buildOcrRequest(File file) { + OcrRequest ocrRequest = new OcrRequest(); + ocrRequest.setFile(file); + ocrRequest.setType("application/pdf"); + ocrRequest.setFields_json("项目名称,合同签订时间,合同金额,建设地点,建设单位,建设单位电话,建设单位开户行,建设单位账号"); + return ocrRequest; + } + private void saveOcrResult(String taskId, String result) { // 模拟保存OCR结果 log.debug("保存OCR结果 - 任务ID: {}", taskId); diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java index 45261f8..5054a39 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java @@ -37,15 +37,13 @@ public class RabbitMQProducerService { this.messageBuilder = messageBuilder; this.pendingConfirmations = new ConcurrentHashMap<>(); - // 关键修复:设置确认回调 + // 设置确认回调 RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback(); this.rabbitTemplate.setConfirmCallback(confirmCallback); - log.info("✅ RabbitMQ确认回调已设置 - ConfirmCallback: {}", confirmCallback != null ? "已设置" : "未设置"); // 设置返回回调(用于处理路由失败) RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback(); this.rabbitTemplate.setReturnsCallback(returnsCallback); - log.info("✅ RabbitMQ返回回调已设置 - ReturnsCallback: {}", returnsCallback != null ? "已设置" : "未设置"); } /** @@ -87,12 +85,9 @@ public class RabbitMQProducerService { return CompletableFuture.supplyAsync(() -> { String messageId = null; try { - log.info("开始发送OCR消息 - TaskId: {}, File: {}", taskId, filePath); RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); messageId = message.getMessageId(); - String result = sendMessageWithConfirmation(message); - log.info("OCR消息发送成功 - TaskId: {}, MessageId: {}", taskId, result); - return result; + return sendMessageWithConfirmation(message); } catch (Exception e) { log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}", taskId, filePath, messageId, e.getMessage(), e); @@ -107,19 +102,11 @@ public class RabbitMQProducerService { * 处理OCR消息发送失败 */ private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) { - try { - log.warn("处理OCR消息发送失败补偿 - TaskId: {}, MessageId: {}", taskId, messageId); - // 这里可以实现补偿逻辑,比如: - // 1. 记录到失败表 - // 2. 触发告警 - // 3. 尝试其他通知方式 - // 4. 更新任务状态为发送失败 - - // 示例:记录到日志文件或数据库 - // failureRecordService.recordOcrMessageFailure(taskId, filePath, messageId, e.getMessage()); - } catch (Exception ex) { - log.error("处理OCR消息发送失败补偿时发生异常", ex); - } + // 这里可以实现补偿逻辑,比如: + // 1. 记录到失败表 + // 2. 触发告警 + // 3. 尝试其他通知方式 + // 4. 更新任务状态为发送失败 } /** @@ -219,7 +206,6 @@ public class RabbitMQProducerService { */ private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception { String messageId = message.getMessageId(); - long sendStartTime = System.currentTimeMillis(); // 为每次尝试创建新的CorrelationData CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount); @@ -232,29 +218,20 @@ public class RabbitMQProducerService { // 如果之前有未完成的Future,先移除(重试场景) CompletableFuture oldFuture = pendingConfirmations.remove(futureKey); if (oldFuture != null && !oldFuture.isDone()) { - log.warn("发现未完成的Future,移除旧Future - ID: {}", messageId); oldFuture.cancel(true); } pendingConfirmations.put(futureKey, confirmationFuture); try { - log.info("📤 准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}, CorrelationId: {}", - messageId, message.getTaskName(), message.getUploadPath(), retryCount, correlationData.getId()); - // 发送消息 try { rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); } catch (Exception sendException) { - log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 异常: {}", - messageId, correlationData.getId(), sendException.getMessage(), sendException); + log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException); pendingConfirmations.remove(futureKey, confirmationFuture); throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException); } - - long sendEndTime = System.currentTimeMillis(); - log.info("📨 消息已发送到Broker,等待确认 - ID: {}, CorrelationId: {}, 发送耗时: {}ms", - messageId, correlationData.getId(), (sendEndTime - sendStartTime)); // 关键修复:由于确认回调可能不触发,采用短超时 + 备用方案 // 如果消息发送成功(没有异常),且确认回调在短时间内未触发,则认为成功 @@ -268,14 +245,10 @@ public class RabbitMQProducerService { if (confirmationFuture.isDone()) { try { confirmationFuture.get(); // 获取确认结果 - long confirmTime = System.currentTimeMillis(); - log.info("✅ 消息发送确认成功(通过确认回调) - ID: {}, CorrelationId: {}, 总耗时: {}ms", - messageId, correlationData.getId(), (confirmTime - sendStartTime)); confirmReceived = true; break; } catch (Exception e) { - log.error("❌ 确认回调返回异常 - ID: {}, CorrelationId: {}, 异常: {}", - messageId, correlationData.getId(), e.getMessage(), e); + log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e); pendingConfirmations.remove(futureKey, confirmationFuture); throw new RuntimeException("消息确认失败: " + e.getMessage(), e); } @@ -285,34 +258,21 @@ public class RabbitMQProducerService { elapsed += checkInterval; } - // 关键修复:如果确认回调未触发,但消息发送成功,采用备用方案 + // 如果确认回调未触发,但消息发送成功,采用备用方案 if (!confirmReceived) { - log.warn("⚠️ 确认回调未在{}秒内触发,采用备用方案 - ID: {}, CorrelationId: {}", - shortTimeoutSeconds, messageId, correlationData.getId()); - - // 备用方案:如果消息发送成功(没有异常),且消费者可能已经消费,则认为成功 - // 这里我们等待一小段时间,然后检查Future是否完成 - // 如果仍未完成,但消息发送成功,则认为成功(因为确认回调可能延迟或未触发) Thread.sleep(500); // 再等待500ms if (confirmationFuture.isDone()) { try { confirmationFuture.get(); - log.info("✅ 消息发送确认成功(延迟确认回调) - ID: {}, CorrelationId: {}", - messageId, correlationData.getId()); confirmReceived = true; } catch (Exception e) { - log.warn("⚠️ 延迟确认回调返回异常,但消息已发送成功 - ID: {}, 异常: {}", - messageId, e.getMessage()); + // 忽略延迟确认回调的异常 } } if (!confirmReceived) { // 备用方案:消息发送成功(没有异常),认为成功 - // 因为如果消息发送失败,convertAndSend会抛出异常 - log.warn("⚠️ 确认回调未触发,但消息发送成功,采用备用方案认为成功 - ID: {}, CorrelationId: {}", - messageId, correlationData.getId()); - // 手动完成Future confirmationFuture.complete(correlationData); confirmReceived = true; } @@ -325,30 +285,13 @@ public class RabbitMQProducerService { } // 超时处理(如果备用方案也失败) - log.error("❌ 消息确认失败(确认回调未触发且备用方案失败) - ID: {}, CorrelationId: {}, 重试次数: {}, 当前待确认数量: {}, 所有待确认ID: {}", - messageId, correlationData.getId(), retryCount, pendingConfirmations.size(), pendingConfirmations.keySet()); + log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount); - // 检查确认回调是否已经触发但未匹配到Future - CompletableFuture currentFuture = pendingConfirmations.get(futureKey); - log.error("⚠️ 检查确认回调状态 - ID: {}, Future是否存在: {}, Future是否完成: {}, CorrelationId: {}", - messageId, pendingConfirmations.containsKey(futureKey), - currentFuture != null ? currentFuture.isDone() : "N/A", correlationData.getId()); - - // 打印所有待确认的Future状态 - for (Map.Entry> entry : pendingConfirmations.entrySet()) { - log.error("待确认的Future详情 - Key: {}, Future是否完成: {}, Future是否取消: {}", - entry.getKey(), entry.getValue().isDone(), entry.getValue().isCancelled()); - } - - // 超时时移除对应的Future,但保留一段时间以便确认回调可以记录日志 - // 延迟移除,给确认回调一个机会 + // 延迟移除Future,给确认回调一个机会 CompletableFuture.runAsync(() -> { try { - Thread.sleep(3000); // 等待3秒,让确认回调有机会执行 - CompletableFuture removed = pendingConfirmations.remove(futureKey); - if (removed != null) { - log.warn("超时后移除Future - ID: {}, Future是否完成: {}", messageId, removed.isDone()); - } + Thread.sleep(3000); + pendingConfirmations.remove(futureKey); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } @@ -361,10 +304,8 @@ public class RabbitMQProducerService { // 重新抛出TimeoutException,不在这里处理 throw e; } catch (Exception e) { - String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); - log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 重试次数: {}, 异常类型: {}, 异常信息: {}", - messageId, correlationData != null ? correlationData.getId() : "N/A", retryCount, - e.getClass().getSimpleName(), errorMsg, e); + log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}", + messageId, retryCount, e.getMessage(), e); // 异常时移除对应的Future if (futureKey != null) { pendingConfirmations.remove(futureKey, confirmationFuture); @@ -380,68 +321,30 @@ public class RabbitMQProducerService { */ private RabbitTemplate.ConfirmCallback createConfirmCallback() { return (correlationData, ack, cause) -> { - long callbackTime = System.currentTimeMillis(); - - if (correlationData == null) { - log.error("❌ 确认回调中correlationData为null"); + if (correlationData == null || correlationData.getId() == null) { return; } String correlationId = correlationData.getId(); - if (correlationId == null) { - log.error("❌ 确认回调中correlationId为null"); - return; - } - - // 关键修复:使用ERROR级别记录,确保能看到确认回调是否触发 - log.error("📥 确认回调触发 - CorrelationId: {}, ACK: {}, 原因: {}, 当前待确认数量: {}, 所有待确认ID: {}", - correlationId, ack, cause, pendingConfirmations.size(), pendingConfirmations.keySet()); - - // 从correlationId中提取原始messageId(去掉重试后缀) String messageId = extractOriginalMessageId(correlationId); CompletableFuture future = pendingConfirmations.get(messageId); if (future == null) { - // Future可能已经被移除(超时或异常),但确认回调仍然会触发 - // 这是正常的,因为确认回调是异步的 - log.error("⚠️ 确认回调触发时未找到对应的Future(可能已超时) - ID: {}, CorrelationId: {}, ACK: {}, 当前待确认数量: {}, 所有待确认ID: {}", - messageId, correlationId, ack, pendingConfirmations.size(), pendingConfirmations.keySet()); - - // 尝试在所有待确认的Future中查找(可能是CorrelationId匹配问题) - for (Map.Entry> entry : pendingConfirmations.entrySet()) { - log.error("待确认的Future - Key: {}, Future是否完成: {}", entry.getKey(), entry.getValue().isDone()); - } + // Future可能已经被移除(超时或异常),这是正常的 return; } // 检查Future是否已经完成(防止重复完成) if (future.isDone()) { - log.warn("⚠️ Future已经完成,忽略重复的确认回调 - ID: {}, CorrelationId: {}, ACK: {}", - messageId, correlationId, ack); return; } if (ack) { - log.error("✅ 消息发送确认成功(Broker已接收) - ID: {}, CorrelationId: {}, 回调时间: {}ms", - messageId, correlationId, callbackTime); - // 完成Future,解除阻塞 - boolean completed = future.complete(correlationData); - if (!completed) { - log.error("⚠️ Future完成失败(可能已被其他线程完成) - ID: {}", messageId); - } else { - log.error("✅ Future完成成功 - ID: {}, 当前待确认数量: {}", messageId, pendingConfirmations.size()); - } + future.complete(correlationData); } else { String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因"); - log.error("❌ 消息发送确认失败(Broker拒绝接收) - ID: {}, CorrelationId: {}, 原因: {}", - messageId, correlationId, errorMsg); - // 完成Future并传递异常 - boolean completed = future.completeExceptionally(new RuntimeException(errorMsg)); - if (!completed) { - log.error("⚠️ Future异常完成失败(可能已被其他线程完成) - ID: {}", messageId); - } else { - log.error("✅ Future异常完成成功 - ID: {}", messageId); - } + log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg); + future.completeExceptionally(new RuntimeException(errorMsg)); } }; } @@ -465,7 +368,6 @@ public class RabbitMQProducerService { returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText()); - // 处理返回的消息 String correlationId = returned.getMessage().getMessageProperties().getCorrelationId(); if (correlationId != null) { String messageId = extractOriginalMessageId(correlationId); @@ -474,13 +376,8 @@ public class RabbitMQProducerService { String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s", returned.getExchange(), returned.getRoutingKey(), returned.getReplyText()); future.completeExceptionally(new RuntimeException(errorMsg)); - // 路由失败后立即移除,避免重复处理 pendingConfirmations.remove(messageId, future); - } else { - log.warn("路由失败的消息未找到对应的Future - CorrelationId: {}", correlationId); } - } else { - log.error("路由失败的消息没有CorrelationId"); } }; } @@ -493,11 +390,8 @@ public class RabbitMQProducerService { String messageId = message.getMessageId(); CorrelationData correlationData = new CorrelationData(messageId); - log.info("同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName()); - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); - log.info("同步发送完成 - ID: {}", messageId); return messageId; } catch (Exception e) { log.error("同步发送消息失败: {}", e.getMessage(), e); @@ -516,19 +410,12 @@ public class RabbitMQProducerService { * 清理过期的确认Future(防止内存泄漏) */ public void cleanupExpiredConfirmations() { - int initialSize = pendingConfirmations.size(); - pendingConfirmations.entrySet().removeIf(entry -> { CompletableFuture future = entry.getValue(); // 这里可以根据业务需求添加更复杂的过期判断逻辑 // 例如:如果Future已经创建超过30分钟且未完成,则移除 return future.isDone(); }); - - int removed = initialSize - pendingConfirmations.size(); - if (removed > 0) { - log.info("清理了 {} 个已完成的确认Future", removed); - } } /**