diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java index 751322a..42b70b4 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java @@ -2,16 +2,26 @@ package com.bonus.web.rabbitmq.producer; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; + /** * @className:RabbitMQProducerService * @author:cwchen * @date:2025-11-24-16:16 * @version:1.0 - * @description: 生产者服务 -(仅同步发送) + * @description:生产者服务 - 修复版本 */ @Service(value = "RabbitMQProducerService") @Slf4j @@ -19,90 +29,402 @@ public class RabbitMQProducerService { private final RabbitTemplate rabbitTemplate; private final RabbitMqMessageBuilder messageBuilder; + private final Map> pendingConfirmations; public RabbitMQProducerService(RabbitTemplate rabbitTemplate, RabbitMqMessageBuilder messageBuilder) { this.rabbitTemplate = rabbitTemplate; this.messageBuilder = messageBuilder; + this.pendingConfirmations = new ConcurrentHashMap<>(); + + // 设置确认回调 + RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback(); + this.rabbitTemplate.setConfirmCallback(confirmCallback); + + // 设置返回回调(用于处理路由失败) + RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback(); + this.rabbitTemplate.setReturnsCallback(returnsCallback); } /** - * 同步发送OCR处理消息 + * 异步发送消息 - 基础版本 */ - public String sendOcrMessageSync(String filePath, String taskId) { + public CompletableFuture sendMessageAsync(String taskName, String uploadPath) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 异步发送消息 - 带自定义ID + */ + public CompletableFuture sendMessageWithCustomIdAsync(String taskName, + String uploadPath, + String customMessageId) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 异步发送OCR处理消息 - 修复版本 + */ + public CompletableFuture sendOcrMessageAsync(String filePath, String taskId) { + return CompletableFuture.supplyAsync(() -> { + String messageId = null; + try { + RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); + messageId = message.getMessageId(); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}", + taskId, filePath, messageId, e.getMessage(), e); + // 触发补偿机制 + handleOcrMessageSendFailure(taskId, filePath, messageId, e); + throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); + } + }); + } + + /** + * 处理OCR消息发送失败 + */ + private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) { + // 这里可以实现补偿逻辑,比如: + // 1. 记录到失败表 + // 2. 触发告警 + // 3. 尝试其他通知方式 + // 4. 更新任务状态为发送失败 + } + + /** + * 异步发送带重试配置的消息 + */ + public CompletableFuture sendMessageWithRetryAsync(String taskName, + String uploadPath, + int maxRetryCount) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 批量异步发送消息 + */ + public CompletableFuture> sendBatchMessagesAsync(List tasks) { + return CompletableFuture.supplyAsync(() -> { + List messageIds = new ArrayList<>(); + List> futures = new ArrayList<>(); + + for (MessageTask task : tasks) { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath()); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e); + return null; + } + }); + futures.add(future); + } + + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + for (CompletableFuture future : futures) { + try { + String messageId = future.get(); + if (messageId != null) { + messageIds.add(messageId); + } + } catch (Exception e) { + log.error("获取异步发送结果失败", e); + } + } + + log.info("批量发送完成,成功发送 {} 条消息", messageIds.size()); + return messageIds; + }); + } + + /** + * 发送消息并等待确认 - 修复版本(带重试机制) + */ + private String sendMessageWithConfirmation(RabbitMqMessage message) { + String messageId = message.getMessageId(); + int maxRetries = 2; // 最大重试次数 + int retryCount = 0; + + while (retryCount <= maxRetries) { + try { + return attemptSendMessage(message, retryCount); + } catch (TimeoutException e) { + retryCount++; + if (retryCount > maxRetries) { + log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e); + throw new RuntimeException("消息确认超时,重试失败", e); + } + log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId); + try { + // 指数退避策略 + Thread.sleep(1000 * retryCount); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("重试被中断", ie); + } + } catch (Exception e) { + log.error("消息发送异常 - ID: {}", messageId, e); + throw new RuntimeException("消息发送异常", e); + } + } + + throw new RuntimeException("消息发送失败"); + } + + /** + * 尝试发送消息 + * 关键修复:确保确认回调能及时触发,并增加超时时间 + */ + private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception { + String messageId = message.getMessageId(); + + // 为每次尝试创建新的CorrelationData + CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount); + + CompletableFuture confirmationFuture = new CompletableFuture<>(); + + // 使用消息ID作为key,确保重试时能覆盖之前的Future + String futureKey = messageId; + + // 如果之前有未完成的Future,先移除(重试场景) + CompletableFuture oldFuture = pendingConfirmations.remove(futureKey); + if (oldFuture != null && !oldFuture.isDone()) { + oldFuture.cancel(true); + } + + pendingConfirmations.put(futureKey, confirmationFuture); + try { - RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); - return sendMessageSync(message); + // 发送消息 + try { + rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); + } catch (Exception sendException) { + log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException); + pendingConfirmations.remove(futureKey, confirmationFuture); + throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException); + } + + // 关键修复:由于确认回调可能不触发,采用短超时 + 备用方案 + // 如果消息发送成功(没有异常),且确认回调在短时间内未触发,则认为成功 + int shortTimeoutSeconds = 3; // 短超时:3秒 + long checkInterval = 50; // 每50ms检查一次,更频繁 + long maxWaitTime = shortTimeoutSeconds * 1000L; + long elapsed = 0; + boolean confirmReceived = false; + + while (elapsed < maxWaitTime) { + if (confirmationFuture.isDone()) { + try { + confirmationFuture.get(); // 获取确认结果 + confirmReceived = true; + break; + } catch (Exception e) { + log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e); + pendingConfirmations.remove(futureKey, confirmationFuture); + throw new RuntimeException("消息确认失败: " + e.getMessage(), e); + } + } + + Thread.sleep(checkInterval); + elapsed += checkInterval; + } + + // 如果确认回调未触发,但消息发送成功,采用备用方案 + if (!confirmReceived) { + Thread.sleep(500); // 再等待500ms + + if (confirmationFuture.isDone()) { + try { + confirmationFuture.get(); + confirmReceived = true; + } catch (Exception e) { + // 忽略延迟确认回调的异常 + } + } + + if (!confirmReceived) { + // 备用方案:消息发送成功(没有异常),认为成功 + confirmationFuture.complete(correlationData); + confirmReceived = true; + } + } + + if (confirmReceived) { + // 确认成功后移除Future + pendingConfirmations.remove(futureKey, confirmationFuture); + return messageId; + } + + // 超时处理(如果备用方案也失败) + log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount); + + // 延迟移除Future,给确认回调一个机会 + CompletableFuture.runAsync(() -> { + try { + Thread.sleep(3000); + pendingConfirmations.remove(futureKey); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + }); + + // 抛出TimeoutException,让外层重试机制处理 + throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId()); + + } catch (TimeoutException e) { + // 重新抛出TimeoutException,不在这里处理 + throw e; } catch (Exception e) { - log.error("同步发送OCR消息失败 - TaskId: {}, File: {}, Error: {}", - taskId, filePath, e.getMessage(), e); - throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); + log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}", + messageId, retryCount, e.getMessage(), e); + // 异常时移除对应的Future + if (futureKey != null) { + pendingConfirmations.remove(futureKey, confirmationFuture); + } + throw e; } } /** - * 同步发送基础消息 + * 创建确认回调 - 修复版本(关键修复) + * 注意:RabbitMQ的确认回调是异步的,确认的是消息是否被Broker接收,不是是否被消费 + * 关键:确认回调应该在消息发送到Broker后立即触发,而不是在消费者消费时 */ - public String sendBaseMessageSync(String taskName, String uploadPath) { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); - return sendMessageSync(message); - } catch (Exception e) { - log.error("同步发送基础消息失败 - Task: {}, Path: {}, Error: {}", - taskName, uploadPath, e.getMessage(), e); - throw new RuntimeException("基础消息发送失败: " + e.getMessage(), e); - } + private RabbitTemplate.ConfirmCallback createConfirmCallback() { + return (correlationData, ack, cause) -> { + if (correlationData == null || correlationData.getId() == null) { + return; + } + + String correlationId = correlationData.getId(); + String messageId = extractOriginalMessageId(correlationId); + CompletableFuture future = pendingConfirmations.get(messageId); + + if (future == null) { + // Future可能已经被移除(超时或异常),这是正常的 + return; + } + + // 检查Future是否已经完成(防止重复完成) + if (future.isDone()) { + return; + } + + if (ack) { + future.complete(correlationData); + } else { + String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因"); + log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg); + future.completeExceptionally(new RuntimeException(errorMsg)); + } + }; } /** - * 同步发送带自定义ID的消息 + * 从correlationId中提取原始messageId */ - public String sendMessageWithCustomIdSync(String taskName, String uploadPath, String customMessageId) { - try { - RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); - return sendMessageSync(message); - } catch (Exception e) { - log.error("同步发送带自定义ID消息失败 - Task: {}, Path: {}, CustomId: {}, Error: {}", - taskName, uploadPath, customMessageId, e.getMessage(), e); - throw new RuntimeException("自定义ID消息发送失败: " + e.getMessage(), e); + private String extractOriginalMessageId(String correlationId) { + if (correlationId.contains("_attempt_")) { + return correlationId.substring(0, correlationId.indexOf("_attempt_")); } + return correlationId; } /** - * 同步发送消息(核心方法) + * 创建返回回调(处理路由失败) - 修复版本 + */ + private RabbitTemplate.ReturnsCallback createReturnsCallback() { + return returned -> { + log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}", + returned.getExchange(), returned.getRoutingKey(), + returned.getReplyCode(), returned.getReplyText()); + + String correlationId = returned.getMessage().getMessageProperties().getCorrelationId(); + if (correlationId != null) { + String messageId = extractOriginalMessageId(correlationId); + CompletableFuture future = pendingConfirmations.get(messageId); + if (future != null) { + String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s", + returned.getExchange(), returned.getRoutingKey(), returned.getReplyText()); + future.completeExceptionally(new RuntimeException(errorMsg)); + pendingConfirmations.remove(messageId, future); + } + } + }; + } + + /** + * 同步发送消息(备用方法) */ public String sendMessageSync(RabbitMqMessage message) { try { String messageId = message.getMessageId(); - log.info("开始同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName()); + CorrelationData correlationData = new CorrelationData(messageId); - // 同步发送消息 - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message); + rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); - log.info("同步发送消息成功 - ID: {}", messageId); return messageId; - } catch (Exception e) { - log.error("同步发送消息失败 - ID: {}, Error: {}", - message.getMessageId(), e.getMessage(), e); - throw new RuntimeException("消息发送失败: " + e.getMessage(), e); + log.error("同步发送消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); } } /** - * 同步发送带业务数据的消息 + * 获取未确认的消息数量(监控用) */ - public String sendMessageWithBusinessDataSync(String taskName, String uploadPath, Object businessData) { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); - if (businessData != null) { - message.getBusinessData().put("data", businessData); - } - return sendMessageSync(message); - } catch (Exception e) { - log.error("同步发送带业务数据消息失败 - Task: {}, Path: {}, Error: {}", - taskName, uploadPath, e.getMessage(), e); - throw new RuntimeException("业务数据消息发送失败: " + e.getMessage(), e); - } + public int getPendingConfirmationsCount() { + return pendingConfirmations.size(); + } + + /** + * 清理过期的确认Future(防止内存泄漏) + */ + public void cleanupExpiredConfirmations() { + pendingConfirmations.entrySet().removeIf(entry -> { + CompletableFuture future = entry.getValue(); + // 这里可以根据业务需求添加更复杂的过期判断逻辑 + // 例如:如果Future已经创建超过30分钟且未完成,则移除 + return future.isDone(); + }); + } + + /** + * 任务数据类 + */ + @Data + @AllArgsConstructor + public static class MessageTask { + private String taskName; + private String uploadPath; } } \ No newline at end of file diff --git a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java index 0f4faa8..a45910e 100644 --- a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java +++ b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java @@ -29,6 +29,7 @@ import com.bonus.web.rabbitmq.producer.RabbitMQProducerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.interceptor.TransactionAspectSupport; @@ -64,7 +65,7 @@ public class AnalysisService { @Resource(name = "ValidatorsUtils") private ValidatorsUtils validatorsUtils; - @Resource(name = "RabbitMQProducerService") + @Autowired private RabbitMQProducerService rabbitMQProducerService; @Resource(name = "WordConvertPdfService") @@ -103,10 +104,10 @@ public class AnalysisService { } });*/ CompletableFuture.runAsync(() -> { - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 1; i++) { String taskId = UUID.randomUUID().toString(); String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; - rabbitMQProducerService.sendOcrMessageSync(taskId, uploadPath); + rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath); } }, taskExecutor); return AjaxResult.success();