diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java index 4e9f58a..9f1fff2 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java @@ -17,13 +17,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.UUID; /** * @className:RabbitMQProducerService * @author:cwchen * @date:2025-11-24-16:16 * @version:1.0 - * @description:生产者服务 + * @description:生产者服务 - 修复版本 */ @Service(value = "RabbitMQProducerService") @Slf4j @@ -78,20 +79,47 @@ public class RabbitMQProducerService { } /** - * 异步发送OCR处理消息 + * 异步发送OCR处理消息 - 修复版本 */ public CompletableFuture sendOcrMessageAsync(String filePath, String taskId) { return CompletableFuture.supplyAsync(() -> { + String messageId = null; try { + log.info("开始发送OCR消息 - TaskId: {}, File: {}", taskId, filePath); RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); - return sendMessageWithConfirmation(message); + messageId = message.getMessageId(); + String result = sendMessageWithConfirmation(message); + log.info("OCR消息发送成功 - TaskId: {}, MessageId: {}", taskId, result); + return result; } catch (Exception e) { - log.error("异步发送OCR消息失败: {}", e.getMessage(), e); - throw new RuntimeException("OCR消息发送失败", e); + log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}", + taskId, filePath, messageId, e.getMessage(), e); + // 触发补偿机制 + handleOcrMessageSendFailure(taskId, filePath, messageId, e); + throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); } }); } + /** + * 处理OCR消息发送失败 + */ + private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) { + try { + log.warn("处理OCR消息发送失败补偿 - TaskId: {}, MessageId: {}", taskId, messageId); + // 这里可以实现补偿逻辑,比如: + // 1. 记录到失败表 + // 2. 触发告警 + // 3. 尝试其他通知方式 + // 4. 更新任务状态为发送失败 + + // 示例:记录到日志文件或数据库 + // failureRecordService.recordOcrMessageFailure(taskId, filePath, messageId, e.getMessage()); + } catch (Exception ex) { + log.error("处理OCR消息发送失败补偿时发生异常", ex); + } + } + /** * 异步发送带重试配置的消息 */ @@ -150,47 +178,90 @@ public class RabbitMQProducerService { } /** - * 发送消息并等待确认 + * 发送消息并等待确认 - 修复版本(带重试机制) */ private String sendMessageWithConfirmation(RabbitMqMessage message) { String messageId = message.getMessageId(); - CorrelationData correlationData = new CorrelationData(messageId); + int maxRetries = 2; // 最大重试次数 + int retryCount = 0; + + while (retryCount <= maxRetries) { + try { + return attemptSendMessage(message, retryCount); + } catch (TimeoutException e) { + retryCount++; + if (retryCount > maxRetries) { + log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e); + throw new RuntimeException("消息确认超时,重试失败", e); + } + log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId); + try { + // 指数退避策略 + Thread.sleep(1000 * retryCount); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("重试被中断", ie); + } + } catch (Exception e) { + log.error("消息发送异常 - ID: {}", messageId, e); + throw new RuntimeException("消息发送异常", e); + } + } + + throw new RuntimeException("消息发送失败"); + } + + /** + * 尝试发送消息 + */ + private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception { + String messageId = message.getMessageId(); + + // 为每次尝试创建新的CorrelationData + CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount); - // 创建确认Future CompletableFuture confirmationFuture = new CompletableFuture<>(); - // 保存到全局的Map中,确保确认回调可以访问 - pendingConfirmations.put(messageId, confirmationFuture); + // 使用消息ID作为key,确保重试时能覆盖之前的Future + String futureKey = messageId; + pendingConfirmations.put(futureKey, confirmationFuture); try { - log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}", - messageId, message.getTaskName(), message.getUploadPath()); + log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}", + messageId, message.getTaskName(), message.getUploadPath(), retryCount); // 发送消息 rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); log.info("消息已发送到Broker - ID: {}", messageId); - // 等待确认(带超时) - CorrelationData result = confirmationFuture.get(10, TimeUnit.SECONDS); + // 动态超时时间 + int timeoutSeconds = retryCount == 0 ? 10 : 15; + CorrelationData result = confirmationFuture.get(timeoutSeconds, TimeUnit.SECONDS); log.info("消息发送确认成功 - ID: {}", messageId); return messageId; } catch (TimeoutException e) { - log.error("消息确认超时 - ID: {}", messageId, e); + log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount, e); // 超时时移除对应的Future - pendingConfirmations.remove(messageId); - throw new RuntimeException("消息确认超时", e); + pendingConfirmations.remove(futureKey, confirmationFuture); + throw e; } catch (Exception e) { - log.error("消息发送异常 - ID: {}", messageId, e); - pendingConfirmations.remove(messageId); - throw new RuntimeException("消息发送异常", e); + log.error("消息发送异常 - ID: {}, 重试次数: {}", messageId, retryCount, e); + // 异常时移除对应的Future + pendingConfirmations.remove(futureKey, confirmationFuture); + throw e; + } finally { + // 最终清理:如果Future还未完成,确保移除 + if (!confirmationFuture.isDone()) { + pendingConfirmations.remove(futureKey, confirmationFuture); + } } } /** - * 创建确认回调 + * 创建确认回调 - 修复版本(关键修复) */ private RabbitTemplate.ConfirmCallback createConfirmCallback() { return (correlationData, ack, cause) -> { @@ -199,19 +270,47 @@ public class RabbitMQProducerService { return; } - String messageId = correlationData.getId(); + String correlationId = correlationData.getId(); + if (correlationId == null) { + log.error("确认回调中correlationId为null"); + return; + } + + // 从correlationId中提取原始messageId(去掉重试后缀) + String messageId = extractOriginalMessageId(correlationId); + CompletableFuture future = pendingConfirmations.get(messageId); + + if (future == null) { + log.warn("未找到对应的确认Future, 可能已超时或已完成 - ID: {}, CorrelationId: {}", messageId, correlationId); + return; + } + if (ack) { - log.debug("消息发送确认成功 - ID: {}", messageId); - // 这里可以更新消息状态为已确认 + log.info("消息发送确认成功 - ID: {}, CorrelationId: {}", messageId, correlationId); + // 完成Future,解除阻塞 + future.complete(correlationData); } else { - log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, cause); - // 这里可以处理发送失败的消息 + String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因"); + log.error("消息发送确认失败 - ID: {}, CorrelationId: {}, 原因: {}", + messageId, correlationId, errorMsg); + // 完成Future并传递异常 + future.completeExceptionally(new RuntimeException(errorMsg)); } }; } /** - * 创建返回回调(处理路由失败) + * 从correlationId中提取原始messageId + */ + private String extractOriginalMessageId(String correlationId) { + if (correlationId.contains("_attempt_")) { + return correlationId.substring(0, correlationId.indexOf("_attempt_")); + } + return correlationId; + } + + /** + * 创建返回回调(处理路由失败) - 修复版本 */ private RabbitTemplate.ReturnsCallback createReturnsCallback() { return returned -> { @@ -219,13 +318,22 @@ public class RabbitMQProducerService { returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText()); - // 如果有CorrelationData,可以在这里处理 - if (returned.getMessage().getMessageProperties().getCorrelationId() != null) { - String messageId = returned.getMessage().getMessageProperties().getCorrelationId(); - CompletableFuture future = pendingConfirmations.remove(messageId); + // 处理返回的消息 + String correlationId = returned.getMessage().getMessageProperties().getCorrelationId(); + if (correlationId != null) { + String messageId = extractOriginalMessageId(correlationId); + CompletableFuture future = pendingConfirmations.get(messageId); if (future != null) { - future.completeExceptionally(new RuntimeException("消息路由失败: " + returned.getReplyText())); + String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s", + returned.getExchange(), returned.getRoutingKey(), returned.getReplyText()); + future.completeExceptionally(new RuntimeException(errorMsg)); + // 路由失败后立即移除,避免重复处理 + pendingConfirmations.remove(messageId, future); + } else { + log.warn("路由失败的消息未找到对应的Future - CorrelationId: {}", correlationId); } + } else { + log.error("路由失败的消息没有CorrelationId"); } }; } @@ -250,6 +358,33 @@ public class RabbitMQProducerService { } } + /** + * 获取未确认的消息数量(监控用) + */ + public int getPendingConfirmationsCount() { + return pendingConfirmations.size(); + } + + /** + * 清理过期的确认Future(防止内存泄漏) + */ + public void cleanupExpiredConfirmations() { + int initialSize = pendingConfirmations.size(); + long now = System.currentTimeMillis(); + + pendingConfirmations.entrySet().removeIf(entry -> { + CompletableFuture future = entry.getValue(); + // 这里可以根据业务需求添加更复杂的过期判断逻辑 + // 例如:如果Future已经创建超过30分钟且未完成,则移除 + return future.isDone(); + }); + + int removed = initialSize - pendingConfirmations.size(); + if (removed > 0) { + log.info("清理了 {} 个已完成的确认Future", removed); + } + } + /** * 任务数据类 */ @@ -259,4 +394,4 @@ public class RabbitMQProducerService { private String taskName; private String uploadPath; } -} +} \ No newline at end of file