招标解析功能

This commit is contained in:
cwchen 2025-11-29 11:00:51 +08:00
parent 8a19c0779e
commit 148833cc40
12 changed files with 115 additions and 71 deletions

View File

@ -1,5 +1,6 @@
package com.bonus; package com.bonus;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@ -12,6 +13,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
*/ */
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
@EnableScheduling @EnableScheduling
@EnableRabbit
public class BonusApplication public class BonusApplication
{ {
public static void main(String[] args) public static void main(String[] args)

View File

@ -10,26 +10,19 @@ import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo; import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo; import com.bonus.common.domain.analysis.vo.AnalysisVo;
import com.bonus.common.domain.file.po.ResourceFilePo;
import com.bonus.common.domain.file.vo.ResourceFileVo; import com.bonus.common.domain.file.vo.ResourceFileVo;
import com.bonus.common.domain.file.vo.SysFile; import com.bonus.common.domain.file.vo.SysFile;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.domain.ocr.dto.OcrRequest; import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.WordConvertPdfResponse; import com.bonus.common.domain.ocr.vo.WordConvertPdfResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.common.utils.Base64ToPdfConverter; import com.bonus.common.utils.Base64ToPdfConverter;
import com.bonus.common.utils.FileUtil;
import com.bonus.common.utils.ValidatorsUtils; import com.bonus.common.utils.ValidatorsUtils;
import com.bonus.common.utils.uuid.UUID;
import com.bonus.file.service.FileUploadService; import com.bonus.file.service.FileUploadService;
import com.bonus.file.service.SourceFileService; import com.bonus.file.service.SourceFileService;
import com.bonus.ocr.service.OcrService; import com.bonus.framework.manager.AsyncManager;
import com.bonus.ocr.service.WordConvertPdfService; import com.bonus.ocr.service.WordConvertPdfService;
import com.bonus.web.rabbitmq.producer.RabbitMQProducerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.transaction.interceptor.TransactionAspectSupport;
@ -37,10 +30,7 @@ import org.springframework.transaction.interceptor.TransactionAspectSupport;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -56,7 +46,7 @@ import java.util.concurrent.Executor;
@Slf4j @Slf4j
public class AnalysisService { public class AnalysisService {
@Resource(name = "taskExecutor") @Resource(name = "rabbitmqTaskExecutor")
private Executor taskExecutor; private Executor taskExecutor;
@Resource(name = "IASAnalysisService") @Resource(name = "IASAnalysisService")
@ -65,8 +55,7 @@ public class AnalysisService {
@Resource(name = "ValidatorsUtils") @Resource(name = "ValidatorsUtils")
private ValidatorsUtils validatorsUtils; private ValidatorsUtils validatorsUtils;
@Autowired
private RabbitMQProducerService rabbitMQProducerService;
@Resource(name = "WordConvertPdfService") @Resource(name = "WordConvertPdfService")
private WordConvertPdfService wordConvertPdfService; private WordConvertPdfService wordConvertPdfService;
@ -103,13 +92,37 @@ public class AnalysisService {
} }
} }
});*/ });*/
CompletableFuture.runAsync(() -> { /*CompletableFuture.runAsync(() -> {
for (int i = 0; i < 1; i++) {
}, taskExecutor);*/
/*for (int i = 0; i < 2; i++) {
String taskId = UUID.randomUUID().toString(); String taskId = UUID.randomUUID().toString();
String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png";
rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath);
// 确保消息按顺序发送
rabbitMQProducerService.sendMessageAsync(taskId, uploadPath)
.thenAccept(result -> {
// 处理成功结果
log.info("消息发送成功: {}", result);
})
.exceptionally(ex -> {
// 处理异常
log.error("消息发送失败: {}", ex.getMessage());
return null;
})
.join(); // 确保每个消息发送完成后再进行下一个
}*/
List<Map<String,Object>> list = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Map<String, Object> map = new HashMap<>();
String taskId = UUID.randomUUID().toString();
String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png";
map.put("taskId", taskId);
map.put("uploadPath", uploadPath);
list.add(map);
} }
}, taskExecutor); AsyncManager.me().executeSendRabbitMqMessage(list);
return AjaxResult.success(); return AjaxResult.success();
} }

View File

@ -68,6 +68,10 @@
<groupId>com.bonus</groupId> <groupId>com.bonus</groupId>
<artifactId>bonus-file</artifactId> <artifactId>bonus-file</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.bonus</groupId>
<artifactId>bonus-rabbitmq</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -10,6 +10,7 @@ import com.bonus.common.domain.file.po.ResourceFileRecordPo;
import com.bonus.common.utils.Threads; import com.bonus.common.utils.Threads;
import com.bonus.common.utils.spring.SpringUtils; import com.bonus.common.utils.spring.SpringUtils;
import com.bonus.file.service.SourceFileService; import com.bonus.file.service.SourceFileService;
import com.bonus.rabbitmq.service.SendRabbitMqService;
import com.bonus.system.domain.SysLogsVo; import com.bonus.system.domain.SysLogsVo;
import com.bonus.system.service.ISysOperLogService; import com.bonus.system.service.ISysOperLogService;
@ -131,6 +132,24 @@ public class AsyncManager
return execute(callable); return execute(callable);
} }
/**
* 发送 mq 消息
* @param list
* @return Future<Void>
* @author cwchen
* @date 2025/11/29 10:51
*/
public Future<Void> executeSendRabbitMqMessage(List<Map<String,Object>> list) {
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
SpringUtils.getBean(SendRabbitMqService.class).sendRabbitMq(list);
return null; // 必须返回 null
}
};
return execute(callable);
}
/** /**
* 停止任务线程池 * 停止任务线程池
*/ */

View File

@ -21,6 +21,14 @@
<groupId>com.bonus</groupId> <groupId>com.bonus</groupId>
<artifactId>bonus-common</artifactId> <artifactId>bonus-common</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.bonus</groupId>
<artifactId>bonus-ocr</artifactId>
</dependency>
<dependency>
<groupId>com.bonus</groupId>
<artifactId>bonus-file</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@ -1,4 +1,4 @@
package com.bonus.web.rabbitmq.config; package com.bonus.rabbitmq.config;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -107,48 +107,19 @@ public class RabbitMQConfig {
factory.setConnectionFactory(connectionFactory); factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter); factory.setMessageConverter(jsonMessageConverter);
// 更改为手动确认模式 // 更改为手动确认模式确保按顺序一个一个处理
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMissingQueuesFatal(false); factory.setMissingQueuesFatal(false);
factory.setAutoStartup(true); factory.setAutoStartup(true);
factory.setConcurrentConsumers(1); factory.setConcurrentConsumers(1); // 设置为1确保只有一个消费者处理消息
factory.setMaxConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); // 最大消费者数为1
factory.setPrefetchCount(1); // 个消费者预取的消息数量 factory.setPrefetchCount(1); // 次只取一条消息
// 配置错误处理器
factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() {
@Override
public boolean isFatal(Throwable t) {
// 对于业务异常不认为是致命错误
if (t instanceof RuntimeException) {
String message = t.getMessage();
if (message != null && (message.contains("消息处理失败") ||
message.contains("OCR") ||
message.contains("Minio"))) {
return false;
}
}
// 其他异常直接拒绝
return false;
}
}));
// 使用简单的拒绝重试策略 - 重试后直接拒绝消息
MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
// 创建重试拦截器
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 总共执行3次1次原始 + 2次重试
.backOffOptions(2000L, 2.0, 10000L) // 初始2秒倍数2最大10秒
.recoverer(messageRecoverer)
.build();
factory.setAdviceChain(interceptor);
log.info("多消费者容器工厂配置完成 - 使用手动确认模式"); log.info("多消费者容器工厂配置完成 - 使用手动确认模式");
log.info("重试配置最多重试2次初始延迟2秒最大延迟10秒");
return factory; return factory;
} }
/** /**
* RabbitTemplate主配置 * RabbitTemplate主配置
*/ */

View File

@ -1,4 +1,4 @@
package com.bonus.web.rabbitmq.consumer; package com.bonus.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest; import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse; import com.bonus.common.domain.ocr.vo.OcrResponse;
@ -38,12 +38,9 @@ public class RabbitMQConsumerService {
@Resource @Resource
private MinioUtil minioUtil; private MinioUtil minioUtil;
/**
* 主消息消费者 - 使用手动确认模式
*/
@RabbitListener( @RabbitListener(
queues = "myQueue", queues = "myQueue",
containerFactory = "multiConsumerFactory", containerFactory = "multiConsumerFactory", // 使用上面配置的工厂保证按顺序消费
errorHandler = "rabbitListenerErrorHandler" errorHandler = "rabbitListenerErrorHandler"
) )
public void handleMessage(RabbitMqMessage message, public void handleMessage(RabbitMqMessage message,
@ -68,27 +65,27 @@ public class RabbitMQConsumerService {
} }
// 处理成功手动确认消息 // 处理成功手动确认消息
channel.basicAck(deliveryTag, false); channel.basicAck(deliveryTag, false); // 确认消息已处理
log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag); log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
} catch (BusinessException e) { } catch (BusinessException e) {
// 业务异常记录日志但不重试直接拒绝消息并不重新入队 // 业务异常记录日志但不重试直接拒绝消息并不重新入队
log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e); log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e);
try { try {
channel.basicReject(deliveryTag, false); channel.basicReject(deliveryTag, false); // 拒绝消息
} catch (IOException ioException) { } catch (IOException ioException) {
log.error("拒绝消息失败 - ID: {}", messageId, ioException); log.error("拒绝消息失败 - ID: {}", messageId, ioException);
} }
} catch (Exception e) { } catch (Exception e) {
// 其他异常直接抛出异常让重试拦截器处理 // 其他异常触发重试
// 关键修复不要手动调用 basicReject(deliveryTag, true)否则会绕过重试拦截器的重试次数限制
// 重试拦截器会在重试次数耗尽后调用 RejectAndDontRequeueRecoverer它会拒绝消息并不重新入队
log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e); log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
// 直接抛出异常重试拦截器处理重试逻辑 // 抛出异常重试拦截器会处理重试
throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e); throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e);
} }
} }
/** /**
* 处理OCR任务 * 处理OCR任务
*/ */

View File

@ -1,4 +1,4 @@
package com.bonus.web.rabbitmq.message; package com.bonus.rabbitmq.message;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@ -1,7 +1,7 @@
package com.bonus.web.rabbitmq.producer; package com.bonus.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; import com.bonus.rabbitmq.message.RabbitMqMessageBuilder;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@ -0,0 +1,30 @@
package com.bonus.rabbitmq.service;
import com.bonus.rabbitmq.producer.RabbitMQProducerService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
/**
* @className:SendRabbitMqService
* @author:cwchen
* @date:2025-11-29-10:48
* @version:1.0
* @description:发送rabbitmq消息
*/
@Service(value = "SendRabbitMqService")
public class SendRabbitMqService {
@Resource(name = "RabbitMQProducerService")
private RabbitMQProducerService rabbitMQProducerService;
public void sendRabbitMq(List<Map<String,Object>> list){
for (Map<String,Object> map : list) {
String taskId = map.get("taskId").toString();
String uploadPath = map.get("uploadPath").toString();
rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath);
}
}
}