招标解析异步任务

This commit is contained in:
cwchen 2025-11-25 10:10:44 +08:00
parent 66226c8168
commit 6443da08ea
18 changed files with 2051 additions and 64 deletions

View File

@ -4,15 +4,14 @@ import com.bonus.common.annotation.RequiresPermissions;
import com.bonus.common.annotation.SysLog;
import com.bonus.common.core.controller.BaseController;
//import com.bonus.web.service.analysis.AnalysisService;
import com.bonus.common.core.domain.AjaxResult;
import com.bonus.common.core.page.TableDataInfo;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import com.bonus.common.enums.OperaType;
import com.bonus.web.service.analysis.AnalysisService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
@ -31,11 +30,11 @@ public class AnalysisController extends BaseController {
@Resource(name = "AnalysisService")
private AnalysisService analysisService;
/*@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息")
@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息")
@GetMapping("/testAsyncMq")
public AjaxResult testAsyncMq() {
return analysisService.testAsyncMq();
}*/
}
@ApiOperation(value = "招标解析", notes = "查询列表")
@GetMapping("getList")
@ -46,4 +45,12 @@ public class AnalysisController extends BaseController {
List<AnalysisVo> list = analysisService.getList(dto);
return getDataTable(list);
}
@ApiOperation(value = "招标解析", notes = "新建项目")
@PostMapping("saveData")
@SysLog(title = "招标解析", module = "招标解析->新建项目", businessType = OperaType.INSERT, details = "新建项目", logType = 1)
@RequiresPermissions("analysis:analysis:add")
public AjaxResult saveData(@RequestBody AnalysisDto.TemplateDto dto) {
return analysisService.saveData(dto);
}
}

View File

@ -0,0 +1,69 @@
/*
package com.bonus.web.controller.rabbitmq;
import com.bonus.common.domain.rabbitmq.vo.OrderMessage;
import com.bonus.rabbitmq.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
*/
/**
* @className:RabbitMQController
* @author:cwchen
* @date:2025-11-04-18:24
* @version:1.0
* @description:
*//*
@RestController
@RequestMapping("/api/rabbitmq")
public class RabbitMQController {
@Resource(name = "MessageProducer")
private MessageProducer messageProducer;
*/
/**
* 发送测试消息
*//*
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody Map<String, String> request) {
String productName = request.get("productName");
BigDecimal amount = new BigDecimal(request.get("amount"));
String orderId = "ORD" + System.currentTimeMillis();
OrderMessage orderMessage = new OrderMessage(orderId, productName, amount);
boolean success = messageProducer.sendOrderMessage(orderMessage);
if (success) {
return ResponseEntity.ok("消息发送成功: " + orderId);
} else {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息发送失败");
}
}
*/
/**
* 批量发送消息
*//*
@PostMapping("/send-batch")
public ResponseEntity<String> sendBatchMessages(@RequestParam(defaultValue = "5") int count) {
for (int i = 1; i <= count; i++) {
String orderId = "ORD" + System.currentTimeMillis() + "_" + i;
OrderMessage orderMessage = new OrderMessage(orderId, "产品" + i,
new BigDecimal(100 + i * 10));
messageProducer.sendOrderMessage(orderMessage);
}
return ResponseEntity.ok("批量发送 " + count + " 条消息完成");
}
}*/

View File

@ -0,0 +1,226 @@
package com.bonus.web.rabbitmq.config;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* @className:RabbitMQConfig
* @author:cwchen
* @date:2025-11-13-13:30
* @version:1.0
* @description:rabbitmq配置类 - 手动确认模式
*/
@Configuration
@EnableRetry
@Slf4j
public class RabbitMQConfig {
/**
* JSON消息转换器
*/
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper());
return converter;
}
/**
* 类型映射器
*/
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("rabbitMqMessage", RabbitMqMessage.class);
classMapper.setIdClassMapping(idClassMapping);
classMapper.setDefaultType(Map.class);
return classMapper;
}
/**
* 声明主队列交换机绑定
*/
@Bean
public Declarables declarables() {
// 主队列
Queue mainQueue = new Queue("myQueue", true, false, false);
DirectExchange mainExchange = new DirectExchange("myExchange", true, false);
Binding mainBinding = BindingBuilder.bind(mainQueue)
.to(mainExchange)
.with("myRoutingKey");
log.info("声明RabbitMQ组件 - 主队列: {}, 主交换机: {}, 路由键: {}",
"myQueue", "myExchange", "myRoutingKey");
return new Declarables(mainQueue, mainExchange, mainBinding);
}
/**
* 错误处理器 Bean - 用于手动确认模式下的异常处理
*/
@Bean
public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
return (amqpMessage, message, exception) -> {
log.error("消息处理失败,将拒绝消息并重新入队: {}", exception.getMessage(), exception);
// 返回原始消息让重试机制处理
throw exception;
};
}
/**
* 多消费者容器工厂 - 手动确认模式
*/
@Bean
public SimpleRabbitListenerContainerFactory multiConsumerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
// 更改为手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMissingQueuesFatal(false);
factory.setAutoStartup(true);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(5);
factory.setPrefetchCount(10); // 每个消费者预取的消息数量
// 配置错误处理器
factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() {
@Override
public boolean isFatal(Throwable t) {
// 对于业务异常不认为是致命错误
if (t instanceof RuntimeException) {
String message = t.getMessage();
if (message != null && (message.contains("消息处理失败") ||
message.contains("OCR") ||
message.contains("Minio"))) {
return false;
}
}
// 其他异常直接拒绝
return false;
}
}));
// 使用简单的拒绝重试策略 - 重试后直接拒绝消息
MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
// 创建重试拦截器
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 总共执行3次1次原始 + 2次重试
.backOffOptions(2000L, 2.0, 10000L) // 初始2秒倍数2最大10秒
.recoverer(messageRecoverer)
.build();
factory.setAdviceChain(interceptor);
log.info("多消费者容器工厂配置完成 - 使用手动确认模式");
log.info("重试配置最多重试2次初始延迟2秒最大延迟10秒");
return factory;
}
/**
* RabbitTemplate主配置
*/
@Bean
@Primary
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 基础配置
rabbitTemplate.setMessageConverter(jsonMessageConverter);
rabbitTemplate.setMandatory(true);
// 设置重试模板用于发送消息的重试
rabbitTemplate.setRetryTemplate(createRabbitTemplateRetryTemplate());
// 返回回调
/*rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回应码: {}, 回应信息: {}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText());
});*/
log.info("RabbitTemplate配置完成");
return rabbitTemplate;
}
/**
* 创建 RabbitTemplate 专用的重试模板
*/
private RetryTemplate createRabbitTemplateRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 发送消息的重试策略最多重试2次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
// 退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000L);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(5000L);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
@Bean
@Primary
public CachingConnectionFactory connectionFactory(
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(25);
connectionFactory.setChannelCheckoutTimeout(2000);
log.info("RabbitMQ连接工厂配置完成 - 主机: {}, 端口: {}", host, port);
return connectionFactory;
}
}

View File

@ -0,0 +1,367 @@
/*
package com.bonus.web.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.OcrService;
import com.bonus.web.rabbitmq.config.RabbitMQConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
*/
/**
* @className:RabbitMQConsumer
* @author:cwchen
* @date:2025-11-13-13:32
* @version:1.0
* @description:mq消费者
*//*
@Component(value = "RabbitMQConsumer")
@Slf4j
public class RabbitMQConsumer {
@Resource
private OcrService ocrService;
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
private final ObjectMapper objectMapper = new ObjectMapper();
*/
/**
* 主队列消息处理
* 优化确保异常处理逻辑正确消息确认机制清晰
* 关键在手动确认模式下成功时手动ACK失败时抛出异常让重试机制处理
*//*
@RabbitListener(queues = "myQueue", containerFactory = "multiConsumerFactory")
public void handleMessage(org.springframework.amqp.core.Message amqpMessage,
Channel channel) throws IOException {
RabbitMqMessage rabbitMq = null;
long deliveryTag = 0;
String messageId = "unknown";
try {
// 解析消息
rabbitMq = parseMessage(amqpMessage);
deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
messageId = rabbitMq.getMessageId();
log.info("开始处理消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
// 业务处理
processBusiness(rabbitMq);
// 处理成功 - 确认消息
channel.basicAck(deliveryTag, false);
log.info("✅ 消息处理完成并确认 - ID: {}", messageId);
} catch (Exception e) {
log.error("❌ 消息处理异常 - ID: {}, deliveryTag: {}, 异常信息: {}",
messageId, deliveryTag, e.getMessage(), e);
// 检查是否是 AmqpRejectAndDontRequeueException重试次数耗尽消息已发送到死信队列
AmqpRejectAndDontRequeueException rejectException = findRejectException(e);
if (rejectException != null) {
// 关键修复消息已发送到死信队列需要手动拒绝原消息
log.error("🚫 重试次数耗尽,消息已发送到死信队列,准备拒绝原消息 - ID: {}, deliveryTag: {}",
messageId, deliveryTag);
try {
if (deliveryTag > 0) {
// 手动拒绝消息NACK不重新入队
// requeue=false 确保消息不会重新入队服务重启后不会重新处理
channel.basicNack(deliveryTag, false, false);
log.error("✅ 原消息已永久拒绝并从主队列中删除 - ID: {}, deliveryTag: {}。服务重启后不会重新处理此消息。",
messageId, deliveryTag);
// 消息已经被处理发送到死信队列并永久拒绝不重新抛出异常
return;
} else {
log.error("❌ deliveryTag 无效,无法拒绝消息 - ID: {}, deliveryTag: {}",
messageId, deliveryTag);
}
} catch (IOException ioException) {
log.error("❌ 拒绝消息失败 - ID: {}, deliveryTag: {}, 异常: {}",
messageId, deliveryTag, ioException.getMessage(), ioException);
}
// 即使拒绝失败也不重新抛出异常避免重复处理
return;
}
// 其他异常抛出异常触发重试机制
throw new RuntimeException("消息处理失败: " + e.getMessage(), e);
}
}
*/
/**
* 递归查找 AmqpRejectAndDontRequeueException
*//*
private AmqpRejectAndDontRequeueException findRejectException(Throwable e) {
if (e == null) {
return null;
}
if (e instanceof AmqpRejectAndDontRequeueException) {
return (AmqpRejectAndDontRequeueException) e;
}
// 检查异常消息中是否包含死信队列相关的关键字
String errorMessage = e.getMessage();
if (errorMessage != null &&
(errorMessage.contains("死信队列") ||
errorMessage.contains("重试次数耗尽") ||
errorMessage.contains("消息已发送到死信队列"))) {
// 递归查找异常链
Throwable cause = e.getCause();
int depth = 0;
while (cause != null && depth < 10) {
if (cause instanceof AmqpRejectAndDontRequeueException) {
return (AmqpRejectAndDontRequeueException) cause;
}
cause = cause.getCause();
depth++;
}
}
// 递归查找 cause
return findRejectException(e.getCause());
}
*/
/**
* 死信队列消息处理 - 监听重试次数耗尽的消息
*//*
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE, containerFactory = "deadLetterContainerFactory")
public void handleDeadLetterMessage(org.springframework.amqp.core.Message amqpMessage,
Channel channel) throws IOException {
RabbitMqMessage rabbitMq = parseMessage(amqpMessage);
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String messageId = rabbitMq.getMessageId();
try {
log.error("🚨 收到死信消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
// 处理死信消息记录日志发送告警人工干预等
processDeadLetterMessage(rabbitMq, amqpMessage);
// 确认死信消息
channel.basicAck(deliveryTag, false);
log.info("死信消息处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("死信消息处理异常 - ID: {}", messageId, e);
// 死信消息处理失败可以选择重新入队或丢弃
channel.basicNack(deliveryTag, false, false);
}
}
*/
/**
* 处理死信消息
*//*
private void processDeadLetterMessage(RabbitMqMessage message, org.springframework.amqp.core.Message amqpMessage) {
String messageId = message.getMessageId();
try {
// 1. 记录死信消息到数据库或日志文件
logDeadLetterToDatabase(message);
// 2. 发送告警通知
sendDeadLetterAlert(message);
// 3. 记录详细错误信息
Map<String, Object> deadLetterInfo = collectDeadLetterInfo(message, amqpMessage);
log.error("死信消息详情 - ID: {}, 信息: {}", messageId, deadLetterInfo);
} catch (Exception e) {
log.error("处理死信消息时发生异常 - ID: {}", messageId, e);
}
}
*/
/**
* 记录死信消息到数据库示例
*//*
private void logDeadLetterToDatabase(RabbitMqMessage message) {
// 这里可以集成数据库操作记录死信消息
// 例如messageId, taskName, uploadPath, 失败时间, 失败原因等
log.warn("记录死信消息到数据库 - ID: {}, 任务: {}",
message.getMessageId(), message.getTaskName());
}
*/
/**
* 发送死信告警
*//*
private void sendDeadLetterAlert(RabbitMqMessage message) {
try {
String alertMessage = String.format(
"🚨 OCR处理死信告警\n" +
"消息ID: %s\n" +
"任务名称: %s\n" +
"文件路径: %s\n" +
"时间: %s\n" +
"请及时处理!",
message.getMessageId(),
message.getTaskName(),
message.getUploadPath(),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
);
log.error("死信告警: {}", alertMessage);
// 这里可以集成实际的告警系统
// 例如发送邮件短信钉钉企业微信等
// alertService.send(alertMessage);
} catch (Exception e) {
log.error("发送死信告警失败", e);
}
}
*/
/**
* 收集死信消息信息
*//*
private Map<String, Object> collectDeadLetterInfo(RabbitMqMessage message,
org.springframework.amqp.core.Message amqpMessage) {
Map<String, Object> info = new HashMap<>();
info.put("messageId", message.getMessageId());
info.put("taskName", message.getTaskName());
info.put("uploadPath", message.getUploadPath());
info.put("deadLetterTime", new Date());
info.put("originalHeaders", amqpMessage.getMessageProperties().getHeaders());
return info;
}
private RabbitMqMessage parseMessage(org.springframework.amqp.core.Message message) {
try {
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
RabbitMqMessage rabbitMq = objectMapper.readValue(messageBody, RabbitMqMessage.class);
if (rabbitMq.getMessageId() == null) {
String messageId = message.getMessageProperties().getMessageId();
rabbitMq.setMessageId(messageId != null ? messageId : generateMessageId());
}
return rabbitMq;
} catch (Exception e) {
log.error("消息解析失败", e);
throw new RuntimeException("消息解析失败", e);
}
}
private void processBusiness(RabbitMqMessage message) {
String uploadPath = message.getUploadPath();
validateUploadPath(uploadPath);
File fileFromMinio = getFileFromMinio(uploadPath);
OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
processOcrResult(message, ocrResponse);
}
private void validateUploadPath(String uploadPath) {
if (StringUtils.isBlank(uploadPath)) {
throw new RuntimeException("文件路径不能为空");
}
}
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new RuntimeException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new RuntimeException("获取Minio文件失败: " + uploadPath, e);
}
}
private OcrResponse performOcrRecognition(File file) {
try {
// OcrRequest ocrRequest = buildOcrRequest(file);
// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
OcrResponse ocrResponse = null;
// 修复检查 OCR 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new RuntimeException("OCR服务返回结果为空");
}
if (!isOcrResponseValid(ocrResponse)) {
throw new RuntimeException("OCR识别结果无效");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
private OcrRequest buildOcrRequest(File file) {
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType("image/png");
ocrRequest.setFields_json("姓名,公民身份号码");
return ocrRequest;
}
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
}
public void processOcrResult(RabbitMqMessage originalMessage, OcrResponse ocrResponse) {
try {
log.info("处理OCR结果 - 消息ID: {}, 任务: {}",
originalMessage.getMessageId(), originalMessage.getTaskName());
// 业务逻辑...
log.info("OCR结果处理完成 - 消息ID: {}", originalMessage.getMessageId());
} catch (Exception e) {
log.error("OCR结果处理失败 - 消息ID: {}", originalMessage.getMessageId(), e);
throw new RuntimeException("OCR结果处理失败", e);
}
}
private String generateMessageId() {
return "gen_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000);
}
}
*/

View File

@ -0,0 +1,304 @@
package com.bonus.web.rabbitmq.consumer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @className:RabbitMQConsumerService
* @author:cwchen
* @date:2025-11-24-16:19
* @version:1.0
* @description:消费者服务
*/
@Component(value = "")
@Slf4j
public class RabbitMQConsumerService {
/**
* 主消息消费者 - 使用手动确认模式
*/
@RabbitListener(
queues = "myQueue",
containerFactory = "multiConsumerFactory",
errorHandler = "rabbitListenerErrorHandler"
)
public void handleMessage(RabbitMqMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("开始处理消息 - ID: {}, 任务: {}, 投递标签: {}",
messageId, taskName, deliveryTag);
try {
// 根据任务类型进行不同的处理
switch (taskName) {
case "FILE_UPLOAD":
processFileUpload(message);
break;
case "OCR_PROCESS":
processOcrTask(message);
break;
case "IMAGE_PROCESS":
processImage(message);
break;
case "DOCUMENT_EXTRACT":
processDocumentExtract(message);
break;
default:
processDefaultTask(message);
break;
}
// 处理成功手动确认消息
channel.basicAck(deliveryTag, false);
log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
} catch (BusinessException e) {
// 业务异常记录日志但不重试
log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("拒绝消息失败 - ID: {}", messageId, ioException);
}
} catch (Exception e) {
// 其他异常记录日志并重新入队进行重试
log.error("处理消息异常,将重新入队 - ID: {}", messageId, e);
try {
channel.basicReject(deliveryTag, true);
} catch (IOException ioException) {
log.error("拒绝消息失败 - ID: {}", messageId, ioException);
}
// 重新抛出异常让重试拦截器处理
throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e);
}
}
/**
* 处理文件上传任务
*/
private void processFileUpload(RabbitMqMessage message) {
String messageId = message.getMessageId();
String uploadPath = message.getUploadPath();
log.info("处理文件上传任务 - ID: {}, 路径: {}", messageId, uploadPath);
// 模拟业务处理
try {
// 1. 验证文件
validateFile(uploadPath);
// 2. 处理文件
processUploadedFile(uploadPath);
// 3. 更新处理状态
updateProcessStatus(messageId, "COMPLETED");
log.info("文件上传任务处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("文件上传任务处理失败 - ID: {}", messageId, e);
throw new BusinessException("文件处理失败", e);
}
}
/**
* 处理OCR任务
*/
private void processOcrTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String filePath = message.getUploadPath();
String taskId = (String) message.getBusinessData().get("taskId");
log.info("处理OCR任务 - ID: {}, 任务ID: {}, 文件: {}", messageId, taskId, filePath);
try {
// 1. 检查OCR服务可用性
checkOcrServiceAvailability();
// 2. 执行OCR处理
String ocrResult = performOcrProcessing(filePath);
// 3. 保存OCR结果
saveOcrResult(taskId, ocrResult);
// 4. 更新任务状态
updateOcrTaskStatus(taskId, "SUCCESS");
log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId);
} catch (OcrServiceException e) {
log.error("OCR服务异常 - ID: {}", messageId, e);
throw new BusinessException("OCR处理失败", e);
} catch (Exception e) {
log.error("OCR任务处理异常 - ID: {}", messageId, e);
throw new RuntimeException("OCR处理异常需要重试", e);
}
}
/**
* 处理图片处理任务
*/
private void processImage(RabbitMqMessage message) {
String messageId = message.getMessageId();
String imagePath = message.getUploadPath();
String processType = (String) message.getBusinessData().get("processType");
log.info("处理图片任务 - ID: {}, 类型: {}, 路径: {}", messageId, processType, imagePath);
try {
// 模拟图片处理
Thread.sleep(1000);
// 这里添加实际的图片处理逻辑
processImageFile(imagePath, processType);
log.info("图片处理完成 - ID: {}", messageId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("图片处理被中断", e);
} catch (Exception e) {
log.error("图片处理失败 - ID: {}", messageId, e);
throw new RuntimeException("图片处理失败", e);
}
}
/**
* 处理文档提取任务
*/
private void processDocumentExtract(RabbitMqMessage message) {
String messageId = message.getMessageId();
String documentPath = message.getUploadPath();
String extractType = (String) message.getBusinessData().get("extractType");
log.info("处理文档提取任务 - ID: {}, 类型: {}, 路径: {}", messageId, extractType, documentPath);
try {
// 模拟文档提取处理
extractDocumentContent(documentPath, extractType);
log.info("文档提取完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("文档提取失败 - ID: {}", messageId, e);
throw new RuntimeException("文档提取失败", e);
}
}
/**
* 处理默认任务
*/
private void processDefaultTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("处理默认任务 - ID: {}, 任务名: {}", messageId, taskName);
// 模拟业务处理
try {
// 这里添加实际的任务处理逻辑
performBusinessLogic(message);
log.info("默认任务处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("默认任务处理失败 - ID: {}", messageId, e);
throw new RuntimeException("任务处理失败", e);
}
}
// 以下为模拟的业务方法实现
private void validateFile(String filePath) {
// 模拟文件验证
if (filePath == null || filePath.isEmpty()) {
throw new BusinessException("文件路径不能为空");
}
log.debug("文件验证通过: {}", filePath);
}
private void processUploadedFile(String filePath) {
// 模拟文件处理
log.debug("处理上传文件: {}", filePath);
// 实际处理逻辑...
}
private void updateProcessStatus(String messageId, String status) {
// 模拟更新处理状态
log.debug("更新处理状态 - ID: {}, 状态: {}", messageId, status);
}
private void checkOcrServiceAvailability() {
// 模拟OCR服务检查
log.debug("检查OCR服务可用性");
// 实际检查逻辑...
}
private String performOcrProcessing(String filePath) {
// 模拟OCR处理
log.debug("执行OCR处理: {}", filePath);
return "OCR处理结果";
}
private void saveOcrResult(String taskId, String result) {
// 模拟保存OCR结果
log.debug("保存OCR结果 - 任务ID: {}", taskId);
}
private void updateOcrTaskStatus(String taskId, String status) {
// 模拟更新OCR任务状态
log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status);
}
private void processImageFile(String imagePath, String processType) {
// 模拟图片处理
log.debug("处理图片文件 - 路径: {}, 类型: {}", imagePath, processType);
}
private void extractDocumentContent(String documentPath, String extractType) {
// 模拟文档提取
log.debug("提取文档内容 - 路径: {}, 类型: {}", documentPath, extractType);
}
private void performBusinessLogic(RabbitMqMessage message) {
// 模拟业务逻辑执行
log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId());
}
/**
* 自定义业务异常
*/
public static class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* OCR服务异常
*/
public static class OcrServiceException extends RuntimeException {
public OcrServiceException(String message) {
super(message);
}
public OcrServiceException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@ -0,0 +1,141 @@
package com.bonus.web.rabbitmq.message;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @className: RabbitMqMessageBuilder
* @description: RabbitMQ 消息构建器
*/
@Component
@Slf4j
public class RabbitMqMessageBuilder {
/**
* 构建基础消息
*/
public RabbitMqMessage buildBaseMessage(String taskName, String uploadPath) {
return RabbitMqMessage.createSimple(taskName, uploadPath);
}
/**
* 构建带自定义消息ID的消息
*/
public RabbitMqMessage buildMessageWithCustomId(String taskName, String uploadPath, String customMessageId) {
return RabbitMqMessage.builder()
.messageId(customMessageId)
.taskName(taskName)
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建文件处理消息
*/
public RabbitMqMessage buildFileProcessMessage(String filePath) {
return RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.FILE_UPLOAD, filePath);
}
/**
* 构建OCR处理消息
*/
public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath);
message.addBusinessData("taskId", taskId);
message.addBusinessData("processType", "TEXT_EXTRACTION");
return message;
}
/**
* 构建带重试配置的消息
*/
public RabbitMqMessage buildMessageWithRetryConfig(String taskName, String uploadPath, int maxRetryCount) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.maxRetryCount(maxRetryCount)
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建高优先级消息
*/
public RabbitMqMessage buildHighPriorityMessage(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.priority(9) // 最高优先级
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建带过期时间的消息
*/
public RabbitMqMessage buildMessageWithTtl(String taskName, String uploadPath, long ttlMillis) {
RabbitMqMessage message = RabbitMqMessage.createSimple(taskName, uploadPath);
message.setExpirationRelative(ttlMillis);
return message;
}
/**
* 构建完整业务消息
*/
public RabbitMqMessage buildBusinessMessage(String taskName, String uploadPath,
String sourceSystem, String targetSystem,
Map<String, Object> businessData) {
RabbitMqMessage message = RabbitMqMessage.createWithBusinessData(taskName, uploadPath, businessData);
message.setSourceSystem(sourceSystem);
message.setTargetSystem(targetSystem);
return message;
}
/**
* 构建图片处理消息
*/
public RabbitMqMessage buildImageProcessMessage(String imagePath, String processType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.IMAGE_PROCESS, imagePath);
message.addBusinessData("processType", processType);
message.addBusinessData("imageFormat", getFileExtension(imagePath));
return message;
}
/**
* 构建文档提取消息
*/
public RabbitMqMessage buildDocumentExtractMessage(String documentPath, String extractType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.DOCUMENT_EXTRACT, documentPath);
message.addBusinessData("extractType", extractType);
message.addBusinessData("documentType", getFileExtension(documentPath));
return message;
}
/**
* 获取文件扩展名
*/
private String getFileExtension(String filePath) {
if (filePath == null || filePath.lastIndexOf(".") == -1) {
return "unknown";
}
return filePath.substring(filePath.lastIndexOf(".") + 1).toLowerCase();
}
/**
* 生成文件相关的消息ID
*/
private String generateFileMessageId(String taskName, String filePath) {
String fileHash = Integer.toHexString(filePath.hashCode());
return taskName + "_" + fileHash + "_" + System.currentTimeMillis();
}
}

View File

@ -0,0 +1,212 @@
/*
package com.bonus.web.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;
*/
/**
* @className:RabbitMQProducer
* @author:cwchen
* @date:2025-11-13-13:32
* @version:1.0
* @description:mq生产者
*//*
@Component(value = "RabbitMQProducer")
@Slf4j
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, CompletableFuture<Boolean>> pendingConfirmations = new ConcurrentHashMap<>();
private final ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
*/
/**
* 发送消息 - 主入口默认使用异步确认
*//*
public CompletableFuture<Boolean> send(String exchange, String routingKey, RabbitMqMessage message) {
return sendAsync(exchange, routingKey, message);
}
*/
/**
* 异步发送
*//*
public CompletableFuture<Boolean> sendAsync(String exchange, String routingKey, RabbitMqMessage message) {
String messageId = generateMessageId();
message.setMessageId(messageId);
CompletableFuture<Boolean> future = new CompletableFuture<>();
pendingConfirmations.put(messageId, future);
CorrelationData correlationData = new CorrelationData(messageId);
try {
org.springframework.amqp.core.Message amqpMessage = buildMessage(message, messageId);
rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, correlationData);
log.debug("消息发送完成,等待异步确认 - ID: {}", messageId);
// 30分钟超时
scheduleTimeout(messageId, future, 30, TimeUnit.MINUTES);
return future;
} catch (Exception e) {
pendingConfirmations.remove(messageId);
future.completeExceptionally(e);
log.error("消息发送异常 - ID: {}", messageId, e);
return future;
}
}
*/
/**
* 初始化确认回调
*//*
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this::handleConfirm);
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
*/
/**
* 处理确认回调
*//*
private void handleConfirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData == null || correlationData.getId() == null) {
return;
}
String messageId = correlationData.getId();
CompletableFuture<Boolean> future = pendingConfirmations.remove(messageId);
if (future != null) {
if (ack) {
future.complete(true);
} else {
log.error("Broker确认失败 - ID: {}, 原因: {}", messageId, cause);
future.complete(false);
}
}
}
*/
/**
* 调度超时
*//*
private void scheduleTimeout(String messageId, CompletableFuture<Boolean> future,
long timeout, TimeUnit unit) {
timeoutScheduler.schedule(() -> {
if (pendingConfirmations.remove(messageId) != null && !future.isDone()) {
log.warn("消息确认超时 - ID: {}", messageId);
future.complete(false);
}
}, timeout, unit);
}
*/
/**
* 资源清理
*//*
@PreDestroy
public void shutdown() {
timeoutScheduler.shutdown();
try {
if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutScheduler.shutdownNow();
}
} catch (InterruptedException e) {
timeoutScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
// 处理未完成的消息
pendingConfirmations.forEach((id, future) -> {
if (!future.isDone()) {
future.complete(false);
}
});
pendingConfirmations.clear();
}
*/
/**
* 生成消息ID
*//*
private String generateMessageId() {
return "rabbitmq_msg" + System.currentTimeMillis() + "_" +
ThreadLocalRandom.current().nextInt(100000, 999999);
}
*/
/**
* 构建消息对象
*//*
private org.springframework.amqp.core.Message buildMessage(RabbitMqMessage message, String messageId) {
try {
String jsonStr = objectMapper.writeValueAsString(message);
return MessageBuilder
.withBody(jsonStr.getBytes(StandardCharsets.UTF_8))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setMessageId(messageId)
.setTimestamp(new Date())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("source", "ocr-service")
.setHeader("businessType", extractBusinessType(message.getUploadPath()))
.build();
} catch (Exception e) {
throw new RuntimeException("消息构建失败: " + messageId, e);
}
}
private String extractBusinessType(String uploadPath) {
if (uploadPath == null) return "unknown";
if (uploadPath.contains("personnelDatabase")) return "personnel";
if (uploadPath.contains("toolsDatabase")) return "tool";
return "other";
}
}
*/

View File

@ -0,0 +1,262 @@
package com.bonus.web.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @className:RabbitMQProducerService
* @author:cwchen
* @date:2025-11-24-16:16
* @version:1.0
* @description:生产者服务
*/
@Service(value = "RabbitMQProducerService")
@Slf4j
public class RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate;
private final RabbitMqMessageBuilder messageBuilder;
private final Map<String, CompletableFuture<CorrelationData>> pendingConfirmations;
public RabbitMQProducerService(RabbitTemplate rabbitTemplate,
RabbitMqMessageBuilder messageBuilder) {
this.rabbitTemplate = rabbitTemplate;
this.messageBuilder = messageBuilder;
this.pendingConfirmations = new ConcurrentHashMap<>();
// 设置确认回调
this.rabbitTemplate.setConfirmCallback(createConfirmCallback());
// 设置返回回调用于处理路由失败
this.rabbitTemplate.setReturnsCallback(createReturnsCallback());
}
/**
* 异步发送消息 - 基础版本
*/
public CompletableFuture<String> sendMessageAsync(String taskName, String uploadPath) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送消息 - 带自定义ID
*/
public CompletableFuture<String> sendMessageWithCustomIdAsync(String taskName,
String uploadPath,
String customMessageId) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送OCR处理消息
*/
public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送OCR消息失败: {}", e.getMessage(), e);
throw new RuntimeException("OCR消息发送失败", e);
}
});
}
/**
* 异步发送带重试配置的消息
*/
public CompletableFuture<String> sendMessageWithRetryAsync(String taskName,
String uploadPath,
int maxRetryCount) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 批量异步发送消息
*/
public CompletableFuture<List<String>> sendBatchMessagesAsync(List<MessageTask> tasks) {
return CompletableFuture.supplyAsync(() -> {
List<String> messageIds = new ArrayList<>();
List<CompletableFuture<String>> futures = new ArrayList<>();
for (MessageTask task : tasks) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath());
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e);
return null;
}
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<String> future : futures) {
try {
String messageId = future.get();
if (messageId != null) {
messageIds.add(messageId);
}
} catch (Exception e) {
log.error("获取异步发送结果失败", e);
}
}
log.info("批量发送完成,成功发送 {} 条消息", messageIds.size());
return messageIds;
});
}
/**
* 发送消息并等待确认
*/
private String sendMessageWithConfirmation(RabbitMqMessage message) {
String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId);
// 创建确认Future
CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>();
// 保存到全局的Map中确保确认回调可以访问
pendingConfirmations.put(messageId, confirmationFuture);
try {
log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}",
messageId, message.getTaskName(), message.getUploadPath());
// 发送消息
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
log.info("消息已发送到Broker - ID: {}", messageId);
// 等待确认带超时
CorrelationData result = confirmationFuture.get(10, TimeUnit.SECONDS);
log.info("消息发送确认成功 - ID: {}", messageId);
return messageId;
} catch (TimeoutException e) {
log.error("消息确认超时 - ID: {}", messageId, e);
// 超时时移除对应的Future
pendingConfirmations.remove(messageId);
throw new RuntimeException("消息确认超时", e);
} catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e);
pendingConfirmations.remove(messageId);
throw new RuntimeException("消息发送异常", e);
}
}
/**
* 创建确认回调
*/
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> {
if (correlationData == null) {
log.error("确认回调中correlationData为null");
return;
}
String messageId = correlationData.getId();
if (ack) {
log.debug("消息发送确认成功 - ID: {}", messageId);
// 这里可以更新消息状态为已确认
} else {
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, cause);
// 这里可以处理发送失败的消息
}
};
}
/**
* 创建返回回调处理路由失败
*/
private RabbitTemplate.ReturnsCallback createReturnsCallback() {
return returned -> {
log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
// 如果有CorrelationData可以在这里处理
if (returned.getMessage().getMessageProperties().getCorrelationId() != null) {
String messageId = returned.getMessage().getMessageProperties().getCorrelationId();
CompletableFuture<CorrelationData> future = pendingConfirmations.remove(messageId);
if (future != null) {
future.completeExceptionally(new RuntimeException("消息路由失败: " + returned.getReplyText()));
}
}
};
}
/**
* 同步发送消息备用方法
*/
public String sendMessageSync(RabbitMqMessage message) {
try {
String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId);
log.info("同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName());
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
log.info("同步发送完成 - ID: {}", messageId);
return messageId;
} catch (Exception e) {
log.error("同步发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 任务数据类
*/
@Data
@AllArgsConstructor
public static class MessageTask {
private String taskName;
private String uploadPath;
}
}

View File

@ -5,9 +5,16 @@ import com.bonus.analysis.service.IASAnalysisService;
import com.bonus.common.core.domain.AjaxResult;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.common.utils.ValidatorsUtils;
import com.bonus.common.utils.uuid.UUID;
import com.bonus.web.rabbitmq.producer.RabbitMQProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import javax.annotation.Resource;
import java.util.List;
@ -36,6 +43,12 @@ public class AnalysisService {
@Resource(name = "IASAnalysisService")
private IASAnalysisService analysisService;
@Resource(name = "ValidatorsUtils")
private ValidatorsUtils validatorsUtils;
@Resource(name = "RabbitMQProducerService")
private RabbitMQProducerService rabbitMQProducerService;
public AjaxResult testAsyncMq() {
/*taskExecutor.submit(() -> {
@ -63,6 +76,9 @@ public class AnalysisService {
}
});*/
CompletableFuture.runAsync(() -> {
String taskId = UUID.randomUUID().toString();
String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png";
rabbitMQProducerService.sendOcrMessageAsync(uploadPath,taskId);
}, taskExecutor);
return AjaxResult.success();
}
@ -78,5 +94,34 @@ public class AnalysisService {
public List<AnalysisVo> getList(AnalysisDto dto) {
return analysisService.getList(dto);
}
/**
* 招标解析->新建项目
* @param dto
* @return AjaxResult
* @author cwchen
* @date 2025/11/24 13:54
*/
@Transactional(rollbackFor = Exception.class)
public AjaxResult saveData(AnalysisDto.TemplateDto dto) {
try {
// 校验数据是否合法
String validResult = validatorsUtils.valid(dto, AnalysisDto.TemplateDto.ADD.class);
if (StringUtils.isNotBlank(validResult)) {
return AjaxResult.error(validResult);
}
// 保存项目数据
analysisService.addProData(dto);
// 同步解析规则数据
// 执行异步解析任务
CompletableFuture.runAsync(() -> {
}, taskExecutor);
return AjaxResult.success();
} catch (Exception e) {
log.error(e.toString(),e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return AjaxResult.error();
}
}
}

View File

@ -17,8 +17,8 @@ ignoreUrl:
callBackUrl: /documents/callback
#minio:
# url: http://192.168.31.170:9000
# endpoint: http://192.168.31.170:9000
# url: http://192.168.31.169:9000
# endpoint: http://192.168.31.169:9000
# access-key: name
# secret-key: password
# bucket-name: smart-bid
@ -34,15 +34,16 @@ only-office:
url: http://192.168.0.39:8080/documents/only/office/download
permissions:
# 是否可以编辑
edit: true
print: true
edit: false
print: false
download: true
# 是否可以填写表格如果将mode参数设置为edit则填写表单仅对文档编辑器可用。 默认值与edit或review参数的值一致。
fillForms: false
# 跟踪变化
review: true
review: false
editorConfig:
# onlyoffice回调接口这个接口也需要在springboot后端中实现
mode: view
callbackUrl: http://192.168.0.39:8080/smartBid/documents/callback
lang: zh-CN
coEditing:

View File

@ -1,63 +1,26 @@
spring:
rabbitmq:
# 连接配置
host: localhost
port: 15672
host: 192.168.0.14
port: 5672
username: guest
password: guest
virtual-host: /
# 连接超时和心跳
connection-timeout: 30000
requested-heartbeat: 60
# 通道缓存配置
cache:
channel:
size: 25
checkout-timeout: 2000
# 发布确认
publisher-confirm-type: correlated
publisher-returns: true
# 模板配置
# 生产者配置
publisher-confirm-type: correlated # 消息确认
publisher-returns: true # 开启返回模式
template:
mandatory: true
receive-timeout: 30000
reply-timeout: 30000
mandatory: true # 消息无法路由时返回给生产者
retry:
enabled: true
initial-interval: 2000
max-attempts: 3
multiplier: 1.5
max-interval: 10000
# 监听器配置
enabled: true # 启用重试
initial-interval: 1000ms # 初始间隔
max-attempts: 3 # 最大重试次数
multiplier: 2.0 # 倍数
# 消费者配置
listener:
type: simple
simple:
acknowledge-mode: auto
concurrency: 2
max-concurrency: 10
prefetch: 1
# 重要:解决队列检查超时问题
missing-queues-fatal: false
auto-startup: true
default-requeue-rejected: false
retry:
enabled: true
initial-interval: 3000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
# Actuator 配置
management:
endpoints:
web:
exposure:
include: health,info,metrics
endpoint:
health:
show-details: always
acknowledge-mode: manual # 手动确认
prefetch: 10 # 预取数量
concurrency: 3 # 最小消费者数量
max-concurrency: 5 # 最大消费者数量
missing-queues-fatal: false # 队列不存在时不致命
auto-startup: true # 自动启动

View File

@ -23,4 +23,13 @@ public interface IASAnalysisMapper {
* @date 2025/11/24 11:17
*/
List<AnalysisVo> getList(AnalysisDto dto);
/**
* 保存项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/24 14:05
*/
void addProData(AnalysisDto.TemplateDto dto);
}

View File

@ -22,4 +22,13 @@ public interface IASAnalysisService {
* @date 2025/11/24 11:06
*/
List<AnalysisVo> getList(AnalysisDto dto);
/**
* 保存项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/24 14:04
*/
void addProData(AnalysisDto.TemplateDto dto);
}

View File

@ -29,4 +29,9 @@ public class ASAnalysisServiceImpl implements IASAnalysisService {
public List<AnalysisVo> getList(AnalysisDto dto) {
return analysisMapper.getList(dto);
}
@Override
public void addProData(AnalysisDto.TemplateDto dto) {
analysisMapper.addProData(dto);
}
}

View File

@ -30,4 +30,13 @@
</if>
ORDER BY tp.create_time DESC
</select>
<!--保存项目数据-->
<insert id="addProData" keyProperty="proId" useGeneratedKeys="true" keyColumn="pro_id">
INSERT INTO tb_pro(template_id,create_user_id,create_user_name,update_user_id,
update_user_name,analysis_status)
VALUES (
#{templateId},#{createUserId},#{createUserName},#{updateUserId},#{updateUserName},'0'
)
</insert>
</mapper>

View File

@ -41,4 +41,33 @@ public class AsyncConfig {
executor.initialize();
return executor;
}
/**
* RabbitMQ专用的线程池
*/
@Bean("rabbitmqTaskExecutor")
public Executor rabbitmqTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(20);
// 最大线程数
executor.setMaxPoolSize(100);
// 队列容量
executor.setQueueCapacity(5000);
// 线程存活时间
executor.setKeepAliveSeconds(120);
// 线程名称前缀
executor.setThreadNamePrefix("RabbitMQ-Async-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(120);
// 允许核心线程超时
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
}

View File

@ -1,7 +1,18 @@
package com.bonus.common.domain.analysis.dto;
import com.bonus.common.core.domain.model.LoginUser;
import com.bonus.common.domain.file.po.ResourceFilePo;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.utils.SecurityUtils;
import lombok.Data;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* @className:AnalysisDto
* @author:cwchen
@ -20,4 +31,63 @@ public class AnalysisDto {
private String startDate;
/**开标结束日期*/
private String endDate;
private TemplateDto templateDto;
@Data
public static class TemplateDto{
/**
* 项目id
* */
private Long proId;
@NotNull(message = "模板不能为空", groups = {ADD.class})
private Long templateId;
/**
* 创建人
*/
private Long createUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 创建人姓名
*/
private String createUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
;
/**
* 修改人
*/
private Long updateUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 修改人姓名
*/
private String updateUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
/**
* 资源文件
*/
@NotEmpty(message = "文件不能为空", groups = {ADD.class})
private List<ResourceFilePo> files;
/**
* 新增条件限制
*/
public interface ADD {
}
}
}

View File

@ -0,0 +1,259 @@
package com.bonus.common.domain.rabbitmq.dto;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* @className: RabbitMqMessage
* @description: RabbitMQ 消息实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RabbitMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息名称/任务类型
*/
private String taskName;
/**
* 消息ID唯一标识
*/
private String messageId;
/**
* 文件上传路径
*/
private String uploadPath;
/**
* 消息创建时间戳
*/
@Builder.Default
private Long timestamp = System.currentTimeMillis();
/**
* 消息来源系统
*/
private String sourceSystem;
/**
* 消息目标系统
*/
private String targetSystem;
/**
* 消息优先级0-9数值越大优先级越高
*/
@Builder.Default
private Integer priority = 5;
/**
* 重试次数
*/
@Builder.Default
private Integer retryCount = 0;
/**
* 最大重试次数
*/
@Builder.Default
private Integer maxRetryCount = 3;
/**
* 消息过期时间毫秒时间戳
*/
private Long expirationTime;
/**
* 业务数据扩展字段存储额外的业务信息
*/
@Builder.Default
private Map<String, Object> businessData = new HashMap<>();
/**
* 消息状态
*/
private String status;
/**
* 错误信息处理失败时记录
*/
private String errorMessage;
/**
* 处理开始时间
*/
private Long processStartTime;
/**
* 处理结束时间
*/
private Long processEndTime;
/**
* 版本号用于乐观锁
*/
@Builder.Default
private Integer version = 1;
// 常用任务类型常量
public static class TaskType {
public static final String OCR_PROCESS = "OCR_PROCESS";
public static final String FILE_UPLOAD = "FILE_UPLOAD";
public static final String IMAGE_PROCESS = "IMAGE_PROCESS";
public static final String DOCUMENT_EXTRACT = "DOCUMENT_EXTRACT";
public static final String DATA_SYNC = "DATA_SYNC";
public static final String NOTIFICATION = "NOTIFICATION";
}
// 消息状态常量
public static class Status {
public static final String PENDING = "PENDING";
public static final String PROCESSING = "PROCESSING";
public static final String SUCCESS = "SUCCESS";
public static final String FAILED = "FAILED";
public static final String RETRYING = "RETRYING";
public static final String EXPIRED = "EXPIRED";
}
/**
* 添加业务数据
*/
public void addBusinessData(String key, Object value) {
if (this.businessData == null) {
this.businessData = new HashMap<>();
}
this.businessData.put(key, value);
}
/**
* 获取业务数据
*/
public Object getBusinessData(String key) {
return this.businessData != null ? this.businessData.get(key) : null;
}
/**
* 增加重试次数
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* 检查是否达到最大重试次数
*/
public boolean isMaxRetryReached() {
return this.retryCount >= this.maxRetryCount;
}
/**
* 检查消息是否过期
*/
public boolean isExpired() {
return this.expirationTime != null && System.currentTimeMillis() > this.expirationTime;
}
/**
* 设置过期时间相对时间单位毫秒
*/
public void setExpirationRelative(long ttlMillis) {
this.expirationTime = System.currentTimeMillis() + ttlMillis;
}
/**
* 开始处理
*/
public void startProcess() {
this.status = Status.PROCESSING;
this.processStartTime = System.currentTimeMillis();
}
/**
* 处理成功
*/
public void processSuccess() {
this.status = Status.SUCCESS;
this.processEndTime = System.currentTimeMillis();
}
/**
* 处理失败
*/
public void processFailed(String errorMessage) {
this.status = Status.FAILED;
this.errorMessage = errorMessage;
this.processEndTime = System.currentTimeMillis();
}
/**
* 重试处理
*/
public void retryProcess() {
this.status = Status.RETRYING;
incrementRetryCount();
}
/**
* 计算处理耗时毫秒
*/
public Long getProcessDuration() {
if (processStartTime != null && processEndTime != null) {
return processEndTime - processStartTime;
}
return null;
}
/**
* 创建简单的消息快速构建
*/
public static RabbitMqMessage createSimple(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(Status.PENDING)
.build();
}
/**
* 创建带业务数据的消息
*/
public static RabbitMqMessage createWithBusinessData(String taskName, String uploadPath,
Map<String, Object> businessData) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.businessData(businessData != null ? new HashMap<>(businessData) : new HashMap<>())
.status(Status.PENDING)
.build();
}
@Override
public String toString() {
return "RabbitMqMessage{" +
"taskName='" + taskName + '\'' +
", messageId='" + messageId + '\'' +
", uploadPath='" + uploadPath + '\'' +
", timestamp=" + timestamp +
", status='" + status + '\'' +
", retryCount=" + retryCount +
", maxRetryCount=" + maxRetryCount +
'}';
}
}