diff --git a/bonus-admin/src/main/java/com/bonus/BonusApplication.java b/bonus-admin/src/main/java/com/bonus/BonusApplication.java index d714fe0..6fd8d94 100644 --- a/bonus-admin/src/main/java/com/bonus/BonusApplication.java +++ b/bonus-admin/src/main/java/com/bonus/BonusApplication.java @@ -1,5 +1,6 @@ package com.bonus; +import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @@ -12,6 +13,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; */ @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) @EnableScheduling +@EnableRabbit public class BonusApplication { public static void main(String[] args) diff --git a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java index a45910e..48333f2 100644 --- a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java +++ b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java @@ -10,26 +10,19 @@ import com.bonus.common.domain.analysis.dto.AnalysisProDto; import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.vo.AnalysisBidVo; import com.bonus.common.domain.analysis.vo.AnalysisVo; -import com.bonus.common.domain.file.po.ResourceFilePo; import com.bonus.common.domain.file.vo.ResourceFileVo; import com.bonus.common.domain.file.vo.SysFile; -import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto; import com.bonus.common.domain.ocr.dto.OcrRequest; import com.bonus.common.domain.ocr.vo.WordConvertPdfResponse; -import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.common.utils.Base64ToPdfConverter; -import com.bonus.common.utils.FileUtil; import com.bonus.common.utils.ValidatorsUtils; -import com.bonus.common.utils.uuid.UUID; import com.bonus.file.service.FileUploadService; import com.bonus.file.service.SourceFileService; -import com.bonus.ocr.service.OcrService; +import com.bonus.framework.manager.AsyncManager; import com.bonus.ocr.service.WordConvertPdfService; -import com.bonus.web.rabbitmq.producer.RabbitMQProducerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.interceptor.TransactionAspectSupport; @@ -37,10 +30,7 @@ import org.springframework.transaction.interceptor.TransactionAspectSupport; import javax.annotation.Resource; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; +import java.util.*; import java.util.concurrent.Executor; @@ -56,7 +46,7 @@ import java.util.concurrent.Executor; @Slf4j public class AnalysisService { - @Resource(name = "taskExecutor") + @Resource(name = "rabbitmqTaskExecutor") private Executor taskExecutor; @Resource(name = "IASAnalysisService") @@ -65,8 +55,7 @@ public class AnalysisService { @Resource(name = "ValidatorsUtils") private ValidatorsUtils validatorsUtils; - @Autowired - private RabbitMQProducerService rabbitMQProducerService; + @Resource(name = "WordConvertPdfService") private WordConvertPdfService wordConvertPdfService; @@ -103,13 +92,37 @@ public class AnalysisService { } } });*/ - CompletableFuture.runAsync(() -> { - for (int i = 0; i < 1; i++) { - String taskId = UUID.randomUUID().toString(); - String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; - rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath); - } - }, taskExecutor); + /*CompletableFuture.runAsync(() -> { + + }, taskExecutor);*/ + + /*for (int i = 0; i < 2; i++) { + String taskId = UUID.randomUUID().toString(); + String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; + + // 确保消息按顺序发送 + rabbitMQProducerService.sendMessageAsync(taskId, uploadPath) + .thenAccept(result -> { + // 处理成功结果 + log.info("消息发送成功: {}", result); + }) + .exceptionally(ex -> { + // 处理异常 + log.error("消息发送失败: {}", ex.getMessage()); + return null; + }) + .join(); // 确保每个消息发送完成后再进行下一个 + }*/ + List> list = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + Map map = new HashMap<>(); + String taskId = UUID.randomUUID().toString(); + String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; + map.put("taskId", taskId); + map.put("uploadPath", uploadPath); + list.add(map); + } + AsyncManager.me().executeSendRabbitMqMessage(list); return AjaxResult.success(); } diff --git a/bonus-framework/pom.xml b/bonus-framework/pom.xml index aae3b8c..fda78ce 100644 --- a/bonus-framework/pom.xml +++ b/bonus-framework/pom.xml @@ -68,6 +68,10 @@ com.bonus bonus-file + + com.bonus + bonus-rabbitmq + diff --git a/bonus-framework/src/main/java/com/bonus/framework/manager/AsyncManager.java b/bonus-framework/src/main/java/com/bonus/framework/manager/AsyncManager.java index a1ddaf2..f041f2d 100644 --- a/bonus-framework/src/main/java/com/bonus/framework/manager/AsyncManager.java +++ b/bonus-framework/src/main/java/com/bonus/framework/manager/AsyncManager.java @@ -10,6 +10,7 @@ import com.bonus.common.domain.file.po.ResourceFileRecordPo; import com.bonus.common.utils.Threads; import com.bonus.common.utils.spring.SpringUtils; import com.bonus.file.service.SourceFileService; +import com.bonus.rabbitmq.service.SendRabbitMqService; import com.bonus.system.domain.SysLogsVo; import com.bonus.system.service.ISysOperLogService; @@ -131,6 +132,24 @@ public class AsyncManager return execute(callable); } + /** + * 发送 mq 消息 + * @param list + * @return Future + * @author cwchen + * @date 2025/11/29 10:51 + */ + public Future executeSendRabbitMqMessage(List> list) { + Callable callable = new Callable() { + @Override + public Void call() throws Exception { + SpringUtils.getBean(SendRabbitMqService.class).sendRabbitMq(list); + return null; // 必须返回 null + } + }; + return execute(callable); + } + /** * 停止任务线程池 */ diff --git a/bonus-rabbitmq/pom.xml b/bonus-rabbitmq/pom.xml index f81e6f4..684a734 100644 --- a/bonus-rabbitmq/pom.xml +++ b/bonus-rabbitmq/pom.xml @@ -21,6 +21,14 @@ com.bonus bonus-common + + com.bonus + bonus-ocr + + + com.bonus + bonus-file + diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java similarity index 81% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java index 973f59b..3950bee 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java @@ -1,4 +1,4 @@ -package com.bonus.web.rabbitmq.config; +package com.bonus.rabbitmq.config; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import lombok.extern.slf4j.Slf4j; @@ -107,48 +107,19 @@ public class RabbitMQConfig { factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jsonMessageConverter); - // 更改为手动确认模式 + // 更改为手动确认模式,确保按顺序一个一个处理 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setMissingQueuesFatal(false); factory.setAutoStartup(true); - factory.setConcurrentConsumers(1); - factory.setMaxConcurrentConsumers(1); - factory.setPrefetchCount(1); // 每个消费者预取的消息数量 + factory.setConcurrentConsumers(1); // 设置为1,确保只有一个消费者处理消息 + factory.setMaxConcurrentConsumers(1); // 最大消费者数为1 + factory.setPrefetchCount(1); // 每次只取一条消息 - // 配置错误处理器 - factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() { - @Override - public boolean isFatal(Throwable t) { - // 对于业务异常,不认为是致命错误 - if (t instanceof RuntimeException) { - String message = t.getMessage(); - if (message != null && (message.contains("消息处理失败") || - message.contains("OCR") || - message.contains("Minio"))) { - return false; - } - } - // 其他异常直接拒绝 - return false; - } - })); - - // 使用简单的拒绝重试策略 - 重试后直接拒绝消息 - MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer(); - - // 创建重试拦截器 - RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless() - .maxAttempts(3) // 总共执行3次(1次原始 + 2次重试) - .backOffOptions(2000L, 2.0, 10000L) // 初始2秒,倍数2,最大10秒 - .recoverer(messageRecoverer) - .build(); - - factory.setAdviceChain(interceptor); log.info("多消费者容器工厂配置完成 - 使用手动确认模式"); - log.info("重试配置:最多重试2次,初始延迟2秒,最大延迟10秒"); return factory; } + /** * RabbitTemplate主配置 */ diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumer.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumer.java similarity index 100% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumer.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumer.java diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java similarity index 92% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java index 0227f55..b3be051 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java @@ -1,4 +1,4 @@ -package com.bonus.web.rabbitmq.consumer; +package com.bonus.rabbitmq.consumer; import com.bonus.common.domain.ocr.dto.OcrRequest; import com.bonus.common.domain.ocr.vo.OcrResponse; @@ -38,12 +38,9 @@ public class RabbitMQConsumerService { @Resource private MinioUtil minioUtil; - /** - * 主消息消费者 - 使用手动确认模式 - */ @RabbitListener( queues = "myQueue", - containerFactory = "multiConsumerFactory", + containerFactory = "multiConsumerFactory", // 使用上面配置的工厂,保证按顺序消费 errorHandler = "rabbitListenerErrorHandler" ) public void handleMessage(RabbitMqMessage message, @@ -68,27 +65,27 @@ public class RabbitMQConsumerService { } // 处理成功,手动确认消息 - channel.basicAck(deliveryTag, false); + channel.basicAck(deliveryTag, false); // 确认消息已处理 log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag); } catch (BusinessException e) { // 业务异常,记录日志但不重试,直接拒绝消息并不重新入队 log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e); try { - channel.basicReject(deliveryTag, false); + channel.basicReject(deliveryTag, false); // 拒绝消息 } catch (IOException ioException) { log.error("拒绝消息失败 - ID: {}", messageId, ioException); } } catch (Exception e) { - // 其他异常:直接抛出异常,让重试拦截器处理 - // 关键修复:不要手动调用 basicReject(deliveryTag, true),否则会绕过重试拦截器的重试次数限制 - // 重试拦截器会在重试次数耗尽后调用 RejectAndDontRequeueRecoverer,它会拒绝消息并不重新入队 + // 其他异常,触发重试 log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e); - // 直接抛出异常,让重试拦截器处理重试逻辑 + // 抛出异常,重试拦截器会处理重试 throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e); } } + + /** * 处理OCR任务 */ diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java similarity index 99% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java index 096c80e..557b4f6 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java @@ -1,4 +1,4 @@ -package com.bonus.web.rabbitmq.message; +package com.bonus.rabbitmq.message; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import lombok.extern.slf4j.Slf4j; diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducer.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducer.java similarity index 100% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducer.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducer.java diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java similarity index 99% rename from bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java rename to bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java index 42b70b4..599f358 100644 --- a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java @@ -1,7 +1,7 @@ -package com.bonus.web.rabbitmq.producer; +package com.bonus.rabbitmq.producer; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; +import com.bonus.rabbitmq.message.RabbitMqMessageBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java new file mode 100644 index 0000000..723ff39 --- /dev/null +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java @@ -0,0 +1,30 @@ +package com.bonus.rabbitmq.service; + +import com.bonus.rabbitmq.producer.RabbitMQProducerService; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; + +/** + * @className:SendRabbitMqService + * @author:cwchen + * @date:2025-11-29-10:48 + * @version:1.0 + * @description:发送rabbitmq消息 + */ +@Service(value = "SendRabbitMqService") +public class SendRabbitMqService { + + @Resource(name = "RabbitMQProducerService") + private RabbitMQProducerService rabbitMQProducerService; + + public void sendRabbitMq(List> list){ + for (Map map : list) { + String taskId = map.get("taskId").toString(); + String uploadPath = map.get("uploadPath").toString(); + rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath); + } + } +}