From 8183cad9e148247c85fa4f2ff8c9c72fc8caccb6 Mon Sep 17 00:00:00 2001 From: cwchen <1048842385@qq.com> Date: Fri, 28 Nov 2025 15:06:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=9B=E6=A0=87=E8=A7=A3=E6=9E=90=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/rabbitmq/config/RabbitMQConfig.java | 6 +- .../consumer/RabbitMQConsumerService.java | 128 +----- .../message/RabbitMqMessageBuilder.java | 2 +- .../producer/RabbitMQProducerService.java | 420 ++---------------- .../web/service/analysis/AnalysisService.java | 11 +- 5 files changed, 63 insertions(+), 504 deletions(-) diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java index 29a3373..973f59b 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java @@ -111,9 +111,9 @@ public class RabbitMQConfig { factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setMissingQueuesFatal(false); factory.setAutoStartup(true); - factory.setConcurrentConsumers(3); - factory.setMaxConcurrentConsumers(5); - factory.setPrefetchCount(10); // 每个消费者预取的消息数量 + factory.setConcurrentConsumers(1); + factory.setMaxConcurrentConsumers(1); + factory.setPrefetchCount(1); // 每个消费者预取的消息数量 // 配置错误处理器 factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() { diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java index fb28fab..0227f55 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java @@ -59,18 +59,9 @@ public class RabbitMQConsumerService { try { // 根据任务类型进行不同的处理 switch (taskName) { - case "FILE_UPLOAD": - processFileUpload(message); - break; case "OCR_PROCESS": processOcrTask(message); break; - case "IMAGE_PROCESS": - processImage(message); - break; - case "DOCUMENT_EXTRACT": - processDocumentExtract(message); - break; default: processDefaultTask(message); break; @@ -98,34 +89,6 @@ public class RabbitMQConsumerService { } } - /** - * 处理文件上传任务 - */ - private void processFileUpload(RabbitMqMessage message) { - String messageId = message.getMessageId(); - String uploadPath = message.getUploadPath(); - - log.info("处理文件上传任务 - ID: {}, 路径: {}", messageId, uploadPath); - - // 模拟业务处理 - try { - // 1. 验证文件 - validateFile(uploadPath); - - // 2. 处理文件 - processUploadedFile(uploadPath); - - // 3. 更新处理状态 - updateProcessStatus(messageId, "COMPLETED"); - - log.info("文件上传任务处理完成 - ID: {}", messageId); - - } catch (Exception e) { - log.error("文件上传任务处理失败 - ID: {}", messageId, e); - throw new BusinessException("文件处理失败", e); - } - } - /** * 处理OCR任务 */ @@ -148,7 +111,7 @@ public class RabbitMQConsumerService { // 4. 更新任务状态 updateOcrTaskStatus(taskId, "SUCCESS"); - + Thread.sleep(1000 * 10); log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId); } catch (OcrServiceException e) { @@ -160,56 +123,6 @@ public class RabbitMQConsumerService { } } - /** - * 处理图片处理任务 - */ - private void processImage(RabbitMqMessage message) { - String messageId = message.getMessageId(); - String imagePath = message.getUploadPath(); - String processType = (String) message.getBusinessData().get("processType"); - - log.info("处理图片任务 - ID: {}, 类型: {}, 路径: {}", messageId, processType, imagePath); - - try { - // 模拟图片处理 - Thread.sleep(1000); - - // 这里添加实际的图片处理逻辑 - processImageFile(imagePath, processType); - - log.info("图片处理完成 - ID: {}", messageId); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("图片处理被中断", e); - } catch (Exception e) { - log.error("图片处理失败 - ID: {}", messageId, e); - throw new RuntimeException("图片处理失败", e); - } - } - - /** - * 处理文档提取任务 - */ - private void processDocumentExtract(RabbitMqMessage message) { - String messageId = message.getMessageId(); - String documentPath = message.getUploadPath(); - String extractType = (String) message.getBusinessData().get("extractType"); - - log.info("处理文档提取任务 - ID: {}, 类型: {}, 路径: {}", messageId, extractType, documentPath); - - try { - // 模拟文档提取处理 - extractDocumentContent(documentPath, extractType); - - log.info("文档提取完成 - ID: {}", messageId); - - } catch (Exception e) { - log.error("文档提取失败 - ID: {}", messageId, e); - throw new RuntimeException("文档提取失败", e); - } - } - /** * 处理默认任务 */ @@ -232,27 +145,6 @@ public class RabbitMQConsumerService { } } - // 以下为模拟的业务方法实现 - - private void validateFile(String filePath) { - // 模拟文件验证 - if (filePath == null || filePath.isEmpty()) { - throw new BusinessException("文件路径不能为空"); - } - log.debug("文件验证通过: {}", filePath); - } - - private void processUploadedFile(String filePath) { - // 模拟文件处理 - log.debug("处理上传文件: {}", filePath); - // 实际处理逻辑... - } - - private void updateProcessStatus(String messageId, String status) { - // 模拟更新处理状态 - log.debug("更新处理状态 - ID: {}, 状态: {}", messageId, status); - } - private void checkOcrServiceAvailability() { // 模拟OCR服务检查 log.debug("检查OCR服务可用性"); @@ -262,11 +154,11 @@ public class RabbitMQConsumerService { private String performOcrProcessing(String filePath) { // 模拟OCR处理 log.debug("执行OCR处理: {}", filePath); - String uploadPath = filePath; +// String uploadPath = filePath; // File fileFromMinio = getFileFromMinio(uploadPath); - File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf"); - OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); - log.info("ocrResponse响应结果:{}", ocrResponse); +// File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf"); +// OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); +// log.info("ocrResponse响应结果:{}", ocrResponse); return "OCR处理结果"; } @@ -327,16 +219,6 @@ public class RabbitMQConsumerService { log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status); } - private void processImageFile(String imagePath, String processType) { - // 模拟图片处理 - log.debug("处理图片文件 - 路径: {}, 类型: {}", imagePath, processType); - } - - private void extractDocumentContent(String documentPath, String extractType) { - // 模拟文档提取 - log.debug("提取文档内容 - 路径: {}, 类型: {}", documentPath, extractType); - } - private void performBusinessLogic(RabbitMqMessage message) { // 模拟业务逻辑执行 log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId()); diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java index 62a3407..096c80e 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java @@ -48,7 +48,7 @@ public class RabbitMqMessageBuilder { public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) { RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath); message.addBusinessData("taskId", taskId); - message.addBusinessData("processType", "TEXT_EXTRACTION"); + message.addBusinessData("processType", "OCR_PROCESS"); return message; } diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java index 5054a39..751322a 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java @@ -2,26 +2,16 @@ package com.bonus.web.rabbitmq.producer; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; -import lombok.AllArgsConstructor; -import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; - /** * @className:RabbitMQProducerService * @author:cwchen * @date:2025-11-24-16:16 * @version:1.0 - * @description:生产者服务 - 修复版本 + * @description: 生产者服务 -(仅同步发送) */ @Service(value = "RabbitMQProducerService") @Slf4j @@ -29,402 +19,90 @@ public class RabbitMQProducerService { private final RabbitTemplate rabbitTemplate; private final RabbitMqMessageBuilder messageBuilder; - private final Map> pendingConfirmations; public RabbitMQProducerService(RabbitTemplate rabbitTemplate, RabbitMqMessageBuilder messageBuilder) { this.rabbitTemplate = rabbitTemplate; this.messageBuilder = messageBuilder; - this.pendingConfirmations = new ConcurrentHashMap<>(); - - // 设置确认回调 - RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback(); - this.rabbitTemplate.setConfirmCallback(confirmCallback); - - // 设置返回回调(用于处理路由失败) - RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback(); - this.rabbitTemplate.setReturnsCallback(returnsCallback); } /** - * 异步发送消息 - 基础版本 + * 同步发送OCR处理消息 */ - public CompletableFuture sendMessageAsync(String taskName, String uploadPath) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 异步发送消息 - 带自定义ID - */ - public CompletableFuture sendMessageWithCustomIdAsync(String taskName, - String uploadPath, - String customMessageId) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 异步发送OCR处理消息 - 修复版本 - */ - public CompletableFuture sendOcrMessageAsync(String filePath, String taskId) { - return CompletableFuture.supplyAsync(() -> { - String messageId = null; - try { - RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); - messageId = message.getMessageId(); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}", - taskId, filePath, messageId, e.getMessage(), e); - // 触发补偿机制 - handleOcrMessageSendFailure(taskId, filePath, messageId, e); - throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); - } - }); - } - - /** - * 处理OCR消息发送失败 - */ - private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) { - // 这里可以实现补偿逻辑,比如: - // 1. 记录到失败表 - // 2. 触发告警 - // 3. 尝试其他通知方式 - // 4. 更新任务状态为发送失败 - } - - /** - * 异步发送带重试配置的消息 - */ - public CompletableFuture sendMessageWithRetryAsync(String taskName, - String uploadPath, - int maxRetryCount) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 批量异步发送消息 - */ - public CompletableFuture> sendBatchMessagesAsync(List tasks) { - return CompletableFuture.supplyAsync(() -> { - List messageIds = new ArrayList<>(); - List> futures = new ArrayList<>(); - - for (MessageTask task : tasks) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath()); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e); - return null; - } - }); - futures.add(future); - } - - // 等待所有任务完成 - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - - for (CompletableFuture future : futures) { - try { - String messageId = future.get(); - if (messageId != null) { - messageIds.add(messageId); - } - } catch (Exception e) { - log.error("获取异步发送结果失败", e); - } - } - - log.info("批量发送完成,成功发送 {} 条消息", messageIds.size()); - return messageIds; - }); - } - - /** - * 发送消息并等待确认 - 修复版本(带重试机制) - */ - private String sendMessageWithConfirmation(RabbitMqMessage message) { - String messageId = message.getMessageId(); - int maxRetries = 2; // 最大重试次数 - int retryCount = 0; - - while (retryCount <= maxRetries) { - try { - return attemptSendMessage(message, retryCount); - } catch (TimeoutException e) { - retryCount++; - if (retryCount > maxRetries) { - log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e); - throw new RuntimeException("消息确认超时,重试失败", e); - } - log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId); - try { - // 指数退避策略 - Thread.sleep(1000 * retryCount); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("重试被中断", ie); - } - } catch (Exception e) { - log.error("消息发送异常 - ID: {}", messageId, e); - throw new RuntimeException("消息发送异常", e); - } - } - - throw new RuntimeException("消息发送失败"); - } - - /** - * 尝试发送消息 - * 关键修复:确保确认回调能及时触发,并增加超时时间 - */ - private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception { - String messageId = message.getMessageId(); - - // 为每次尝试创建新的CorrelationData - CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount); - - CompletableFuture confirmationFuture = new CompletableFuture<>(); - - // 使用消息ID作为key,确保重试时能覆盖之前的Future - String futureKey = messageId; - - // 如果之前有未完成的Future,先移除(重试场景) - CompletableFuture oldFuture = pendingConfirmations.remove(futureKey); - if (oldFuture != null && !oldFuture.isDone()) { - oldFuture.cancel(true); - } - - pendingConfirmations.put(futureKey, confirmationFuture); - + public String sendOcrMessageSync(String filePath, String taskId) { try { - // 发送消息 - try { - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); - } catch (Exception sendException) { - log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException); - pendingConfirmations.remove(futureKey, confirmationFuture); - throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException); - } - - // 关键修复:由于确认回调可能不触发,采用短超时 + 备用方案 - // 如果消息发送成功(没有异常),且确认回调在短时间内未触发,则认为成功 - int shortTimeoutSeconds = 3; // 短超时:3秒 - long checkInterval = 50; // 每50ms检查一次,更频繁 - long maxWaitTime = shortTimeoutSeconds * 1000L; - long elapsed = 0; - boolean confirmReceived = false; - - while (elapsed < maxWaitTime) { - if (confirmationFuture.isDone()) { - try { - confirmationFuture.get(); // 获取确认结果 - confirmReceived = true; - break; - } catch (Exception e) { - log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e); - pendingConfirmations.remove(futureKey, confirmationFuture); - throw new RuntimeException("消息确认失败: " + e.getMessage(), e); - } - } - - Thread.sleep(checkInterval); - elapsed += checkInterval; - } - - // 如果确认回调未触发,但消息发送成功,采用备用方案 - if (!confirmReceived) { - Thread.sleep(500); // 再等待500ms - - if (confirmationFuture.isDone()) { - try { - confirmationFuture.get(); - confirmReceived = true; - } catch (Exception e) { - // 忽略延迟确认回调的异常 - } - } - - if (!confirmReceived) { - // 备用方案:消息发送成功(没有异常),认为成功 - confirmationFuture.complete(correlationData); - confirmReceived = true; - } - } - - if (confirmReceived) { - // 确认成功后移除Future - pendingConfirmations.remove(futureKey, confirmationFuture); - return messageId; - } - - // 超时处理(如果备用方案也失败) - log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount); - - // 延迟移除Future,给确认回调一个机会 - CompletableFuture.runAsync(() -> { - try { - Thread.sleep(3000); - pendingConfirmations.remove(futureKey); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - }); - - // 抛出TimeoutException,让外层重试机制处理 - throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId()); - - } catch (TimeoutException e) { - // 重新抛出TimeoutException,不在这里处理 - throw e; + RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); + return sendMessageSync(message); } catch (Exception e) { - log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}", - messageId, retryCount, e.getMessage(), e); - // 异常时移除对应的Future - if (futureKey != null) { - pendingConfirmations.remove(futureKey, confirmationFuture); - } - throw e; + log.error("同步发送OCR消息失败 - TaskId: {}, File: {}, Error: {}", + taskId, filePath, e.getMessage(), e); + throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); } } /** - * 创建确认回调 - 修复版本(关键修复) - * 注意:RabbitMQ的确认回调是异步的,确认的是消息是否被Broker接收,不是是否被消费 - * 关键:确认回调应该在消息发送到Broker后立即触发,而不是在消费者消费时 + * 同步发送基础消息 */ - private RabbitTemplate.ConfirmCallback createConfirmCallback() { - return (correlationData, ack, cause) -> { - if (correlationData == null || correlationData.getId() == null) { - return; - } - - String correlationId = correlationData.getId(); - String messageId = extractOriginalMessageId(correlationId); - CompletableFuture future = pendingConfirmations.get(messageId); - - if (future == null) { - // Future可能已经被移除(超时或异常),这是正常的 - return; - } - - // 检查Future是否已经完成(防止重复完成) - if (future.isDone()) { - return; - } - - if (ack) { - future.complete(correlationData); - } else { - String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因"); - log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg); - future.completeExceptionally(new RuntimeException(errorMsg)); - } - }; - } - - /** - * 从correlationId中提取原始messageId - */ - private String extractOriginalMessageId(String correlationId) { - if (correlationId.contains("_attempt_")) { - return correlationId.substring(0, correlationId.indexOf("_attempt_")); + public String sendBaseMessageSync(String taskName, String uploadPath) { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); + return sendMessageSync(message); + } catch (Exception e) { + log.error("同步发送基础消息失败 - Task: {}, Path: {}, Error: {}", + taskName, uploadPath, e.getMessage(), e); + throw new RuntimeException("基础消息发送失败: " + e.getMessage(), e); } - return correlationId; } /** - * 创建返回回调(处理路由失败) - 修复版本 + * 同步发送带自定义ID的消息 */ - private RabbitTemplate.ReturnsCallback createReturnsCallback() { - return returned -> { - log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}", - returned.getExchange(), returned.getRoutingKey(), - returned.getReplyCode(), returned.getReplyText()); - - String correlationId = returned.getMessage().getMessageProperties().getCorrelationId(); - if (correlationId != null) { - String messageId = extractOriginalMessageId(correlationId); - CompletableFuture future = pendingConfirmations.get(messageId); - if (future != null) { - String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s", - returned.getExchange(), returned.getRoutingKey(), returned.getReplyText()); - future.completeExceptionally(new RuntimeException(errorMsg)); - pendingConfirmations.remove(messageId, future); - } - } - }; + public String sendMessageWithCustomIdSync(String taskName, String uploadPath, String customMessageId) { + try { + RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); + return sendMessageSync(message); + } catch (Exception e) { + log.error("同步发送带自定义ID消息失败 - Task: {}, Path: {}, CustomId: {}, Error: {}", + taskName, uploadPath, customMessageId, e.getMessage(), e); + throw new RuntimeException("自定义ID消息发送失败: " + e.getMessage(), e); + } } /** - * 同步发送消息(备用方法) + * 同步发送消息(核心方法) */ public String sendMessageSync(RabbitMqMessage message) { try { String messageId = message.getMessageId(); - CorrelationData correlationData = new CorrelationData(messageId); + log.info("开始同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName()); - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); + // 同步发送消息 + rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message); + log.info("同步发送消息成功 - ID: {}", messageId); return messageId; + } catch (Exception e) { - log.error("同步发送消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); + log.error("同步发送消息失败 - ID: {}, Error: {}", + message.getMessageId(), e.getMessage(), e); + throw new RuntimeException("消息发送失败: " + e.getMessage(), e); } } /** - * 获取未确认的消息数量(监控用) + * 同步发送带业务数据的消息 */ - public int getPendingConfirmationsCount() { - return pendingConfirmations.size(); - } - - /** - * 清理过期的确认Future(防止内存泄漏) - */ - public void cleanupExpiredConfirmations() { - pendingConfirmations.entrySet().removeIf(entry -> { - CompletableFuture future = entry.getValue(); - // 这里可以根据业务需求添加更复杂的过期判断逻辑 - // 例如:如果Future已经创建超过30分钟且未完成,则移除 - return future.isDone(); - }); - } - - /** - * 任务数据类 - */ - @Data - @AllArgsConstructor - public static class MessageTask { - private String taskName; - private String uploadPath; + public String sendMessageWithBusinessDataSync(String taskName, String uploadPath, Object businessData) { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); + if (businessData != null) { + message.getBusinessData().put("data", businessData); + } + return sendMessageSync(message); + } catch (Exception e) { + log.error("同步发送带业务数据消息失败 - Task: {}, Path: {}, Error: {}", + taskName, uploadPath, e.getMessage(), e); + throw new RuntimeException("业务数据消息发送失败: " + e.getMessage(), e); + } } } \ No newline at end of file diff --git a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java index e1e9a67..0f4faa8 100644 --- a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java +++ b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java @@ -55,9 +55,6 @@ import java.util.concurrent.Executor; @Slf4j public class AnalysisService { -// @Resource(name = "RabbitMQProducer") -// private RabbitMQProducer rabbitMQProducer; - @Resource(name = "taskExecutor") private Executor taskExecutor; @@ -106,9 +103,11 @@ public class AnalysisService { } });*/ CompletableFuture.runAsync(() -> { - String taskId = UUID.randomUUID().toString(); - String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; - rabbitMQProducerService.sendOcrMessageAsync(uploadPath,taskId); + for (int i = 0; i < 2; i++) { + String taskId = UUID.randomUUID().toString(); + String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; + rabbitMQProducerService.sendOcrMessageSync(taskId, uploadPath); + } }, taskExecutor); return AjaxResult.success(); }