招标解析功能

This commit is contained in:
cwchen 2025-11-29 13:01:46 +08:00
parent 148833cc40
commit 31521edfd3
7 changed files with 84 additions and 914 deletions

View File

@ -2,9 +2,9 @@ spring:
rabbitmq:
host: 192.168.0.14
port: 5672
username: guest
password: guest
virtual-host: /
username: bidAdmin
password: bidAdmin
virtual-host: /smart_bid
# 生产者配置
publisher-confirm-type: correlated # 消息确认
publisher-returns: true # 开启返回模式

View File

@ -2,10 +2,6 @@ package com.bonus.common.domain.rabbitmq.dto;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@ -15,9 +11,6 @@ import java.util.Map;
* @description: RabbitMQ 消息实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RabbitMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
@ -40,220 +33,11 @@ public class RabbitMqMessage implements Serializable {
/**
* 消息创建时间戳
*/
@Builder.Default
private Long timestamp = System.currentTimeMillis();
/**
* 消息来源系统
*/
private String sourceSystem;
/**
* 消息目标系统
*/
private String targetSystem;
/**
* 消息优先级0-9数值越大优先级越高
*/
@Builder.Default
private Integer priority = 5;
/**
* 重试次数
*/
@Builder.Default
private Integer retryCount = 0;
/**
* 最大重试次数
*/
@Builder.Default
private Integer maxRetryCount = 3;
/**
* 消息过期时间毫秒时间戳
*/
private Long expirationTime;
private Long timestamp = System.nanoTime();
/**
* 业务数据扩展字段存储额外的业务信息
*/
@Builder.Default
private Map<String, Object> businessData = new HashMap<>();
/**
* 消息状态
*/
private String status;
/**
* 错误信息处理失败时记录
*/
private String errorMessage;
/**
* 处理开始时间
*/
private Long processStartTime;
/**
* 处理结束时间
*/
private Long processEndTime;
/**
* 版本号用于乐观锁
*/
@Builder.Default
private Integer version = 1;
// 常用任务类型常量
public static class TaskType {
public static final String OCR_PROCESS = "OCR_PROCESS";
public static final String FILE_UPLOAD = "FILE_UPLOAD";
public static final String IMAGE_PROCESS = "IMAGE_PROCESS";
public static final String DOCUMENT_EXTRACT = "DOCUMENT_EXTRACT";
public static final String DATA_SYNC = "DATA_SYNC";
public static final String NOTIFICATION = "NOTIFICATION";
}
// 消息状态常量
public static class Status {
public static final String PENDING = "PENDING";
public static final String PROCESSING = "PROCESSING";
public static final String SUCCESS = "SUCCESS";
public static final String FAILED = "FAILED";
public static final String RETRYING = "RETRYING";
public static final String EXPIRED = "EXPIRED";
}
/**
* 添加业务数据
*/
public void addBusinessData(String key, Object value) {
if (this.businessData == null) {
this.businessData = new HashMap<>();
}
this.businessData.put(key, value);
}
/**
* 获取业务数据
*/
public Object getBusinessData(String key) {
return this.businessData != null ? this.businessData.get(key) : null;
}
/**
* 增加重试次数
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* 检查是否达到最大重试次数
*/
public boolean isMaxRetryReached() {
return this.retryCount >= this.maxRetryCount;
}
/**
* 检查消息是否过期
*/
public boolean isExpired() {
return this.expirationTime != null && System.currentTimeMillis() > this.expirationTime;
}
/**
* 设置过期时间相对时间单位毫秒
*/
public void setExpirationRelative(long ttlMillis) {
this.expirationTime = System.currentTimeMillis() + ttlMillis;
}
/**
* 开始处理
*/
public void startProcess() {
this.status = Status.PROCESSING;
this.processStartTime = System.currentTimeMillis();
}
/**
* 处理成功
*/
public void processSuccess() {
this.status = Status.SUCCESS;
this.processEndTime = System.currentTimeMillis();
}
/**
* 处理失败
*/
public void processFailed(String errorMessage) {
this.status = Status.FAILED;
this.errorMessage = errorMessage;
this.processEndTime = System.currentTimeMillis();
}
/**
* 重试处理
*/
public void retryProcess() {
this.status = Status.RETRYING;
incrementRetryCount();
}
/**
* 计算处理耗时毫秒
*/
public Long getProcessDuration() {
if (processStartTime != null && processEndTime != null) {
return processEndTime - processStartTime;
}
return null;
}
/**
* 创建简单的消息快速构建
*/
public static RabbitMqMessage createSimple(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(Status.PENDING)
.build();
}
/**
* 创建带业务数据的消息
*/
public static RabbitMqMessage createWithBusinessData(String taskName, String uploadPath,
Map<String, Object> businessData) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.businessData(businessData != null ? new HashMap<>(businessData) : new HashMap<>())
.status(Status.PENDING)
.build();
}
@Override
public String toString() {
return "RabbitMqMessage{" +
"taskName='" + taskName + '\'' +
", messageId='" + messageId + '\'' +
", uploadPath='" + uploadPath + '\'' +
", timestamp=" + timestamp +
", status='" + status + '\'' +
", retryCount=" + retryCount +
", maxRetryCount=" + maxRetryCount +
'}';
}
}

View File

@ -3,16 +3,11 @@ package com.bonus.rabbitmq.config;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@ -22,7 +17,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@ -107,15 +101,18 @@ public class RabbitMQConfig {
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
// 更改为手动确认模式确保按顺序一个一个处理
// 手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 当监听方法抛异常时不重新入队防止单条坏消息一直卡住后续消息
factory.setDefaultRequeueRejected(false);
factory.setMissingQueuesFatal(false);
factory.setAutoStartup(true);
factory.setConcurrentConsumers(1); // 设置为1确保只有一个消费者处理消息
factory.setMaxConcurrentConsumers(1); // 最大消费者数为1
factory.setPrefetchCount(1); // 每次只取一条消息
// 为了便于排查问题先强制顺序单线程消费确认一条消息后再取下一条
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
log.info("多消费者容器工厂配置完成 - 使用手动确认模式");
log.info("多消费者容器工厂配置完成 - 手动确认模式单线程顺序消费defaultRequeueRejected=false");
return factory;
}

View File

@ -1,249 +1,81 @@
package com.bonus.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.OcrService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.nio.charset.StandardCharsets;
/**
* @className:RabbitMQConsumerService
* @author:cwchen
* @date:2025-11-24-16:19
* @version:1.0
* @description:消费者服务
* @description:简化版消费者服务 - 手动解析原始消息保证队列中的每条消息都能被尝试消费
*/
@Component(value = "")
@Component
@Slf4j
public class RabbitMQConsumerService {
@Resource
private OcrService ocrService;
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
private final ObjectMapper objectMapper = new ObjectMapper();
@RabbitListener(
queues = "myQueue",
containerFactory = "multiConsumerFactory", // 使用上面配置的工厂保证按顺序消费
errorHandler = "rabbitListenerErrorHandler"
containerFactory = "multiConsumerFactory" // 使用上面配置的工厂保证按顺序消费
)
public void handleMessage(RabbitMqMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
public void handleMessage(Message amqpMessage, Channel channel) {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String rawBody = new String(amqpMessage.getBody(), StandardCharsets.UTF_8);
log.info("🎯 RabbitMQConsumerService 收到原始消息 - deliveryTag: {}, body: {}",
deliveryTag, rawBody);
RabbitMqMessage message;
try {
// 手动反序列化 JSON RabbitMqMessage避免类型映射问题导致监听方法不执行
message = objectMapper.readValue(rawBody, RabbitMqMessage.class);
} catch (Exception e) {
log.error("❌ 消息反序列化失败,直接拒绝 - deliveryTag: {}, body: {}",
deliveryTag, rawBody, e);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("basicReject 失败 - deliveryTag: {}", deliveryTag, ioException);
}
return;
}
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("开始处理消息 - ID: {}, 任务: {}, 投递标签: {}",
messageId, taskName, deliveryTag);
try {
// 根据任务类型进行不同的处理
switch (taskName) {
case "OCR_PROCESS":
processOcrTask(message);
break;
default:
processDefaultTask(message);
break;
}
log.info("🛠 开始处理消息内容 - ID: {}, 任务: {}, 业务数据: {}",
messageId, taskName, message.getBusinessData());
// 模拟少量业务处理耗时避免长时间阻塞
/*try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}*/
// 处理成功手动确认消息
channel.basicAck(deliveryTag, false); // 确认消息已处理
log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
channel.basicAck(deliveryTag, false);
log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
} catch (BusinessException e) {
// 业务异常记录日志但不重试直接拒绝消息并不重新入队
log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e);
} catch (Exception e) {
log.error("❌ 处理消息异常,拒绝消息并不重新入队 - ID: {}, 投递标签: {}",
messageId, deliveryTag, e);
try {
channel.basicReject(deliveryTag, false); // 拒绝消息
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("拒绝消息失败 - ID: {}", messageId, ioException);
}
} catch (Exception e) {
// 其他异常触发重试
log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
// 抛出异常重试拦截器会处理重试
throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e);
}
}
/**
* 处理OCR任务
*/
private void processOcrTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String filePath = message.getUploadPath();
String taskId = (String) message.getBusinessData().get("taskId");
log.info("处理OCR任务 - ID: {}, 任务ID: {}, 文件: {}", messageId, taskId, filePath);
try {
// 1. 检查OCR服务可用性
checkOcrServiceAvailability();
// 2. 执行OCR处理
String ocrResult = performOcrProcessing(filePath);
// 3. 保存OCR结果
saveOcrResult(taskId, ocrResult);
// 4. 更新任务状态
updateOcrTaskStatus(taskId, "SUCCESS");
Thread.sleep(1000 * 10);
log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId);
} catch (OcrServiceException e) {
log.error("OCR服务异常 - ID: {}", messageId, e);
throw new BusinessException("OCR处理失败", e);
} catch (Exception e) {
log.error("OCR任务处理异常 - ID: {}", messageId, e);
throw new RuntimeException("OCR处理异常需要重试", e);
}
}
/**
* 处理默认任务
*/
private void processDefaultTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("处理默认任务 - ID: {}, 任务名: {}", messageId, taskName);
// 模拟业务处理
try {
// 这里添加实际的任务处理逻辑
performBusinessLogic(message);
log.info("默认任务处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("默认任务处理失败 - ID: {}", messageId, e);
throw new RuntimeException("任务处理失败", e);
}
}
private void checkOcrServiceAvailability() {
// 模拟OCR服务检查
log.debug("检查OCR服务可用性");
// 实际检查逻辑...
}
private String performOcrProcessing(String filePath) {
// 模拟OCR处理
log.debug("执行OCR处理: {}", filePath);
// String uploadPath = filePath;
// File fileFromMinio = getFileFromMinio(uploadPath);
// File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf");
// OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
// log.info("ocrResponse响应结果:{}", ocrResponse);
return "OCR处理结果";
}
private OcrResponse performOcrRecognition(File file) {
try {
OcrRequest ocrRequest = buildOcrRequest(file);
// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
OcrResponse ocrResponse = null;
// 修复检查 OCR 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new BusinessException("OCR服务返回结果为空");
}
if (!isOcrResponseValid(ocrResponse)) {
throw new BusinessException("OCR识别结果无效");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
}
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new OcrServiceException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new OcrServiceException("获取Minio文件失败: " + uploadPath, e);
}
}
private OcrRequest buildOcrRequest(File file) {
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType("application/pdf");
ocrRequest.setFields_json("项目名称,合同签订时间,合同金额,建设地点,建设单位,建设单位电话,建设单位开户行,建设单位账号");
return ocrRequest;
}
private void saveOcrResult(String taskId, String result) {
// 模拟保存OCR结果
log.debug("保存OCR结果 - 任务ID: {}", taskId);
}
private void updateOcrTaskStatus(String taskId, String status) {
// 模拟更新OCR任务状态
log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status);
}
private void performBusinessLogic(RabbitMqMessage message) {
// 模拟业务逻辑执行
log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId());
}
/**
* 自定义业务异常
*/
public static class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* OCR服务异常
*/
public static class OcrServiceException extends RuntimeException {
public OcrServiceException(String message) {
super(message);
}
public OcrServiceException(String message, Throwable cause) {
super(message, cause);
log.error("basicReject 失败 - ID: {}, deliveryTag: {}", messageId, deliveryTag, ioException);
}
}
}
}

View File

@ -1,141 +0,0 @@
package com.bonus.rabbitmq.message;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @className: RabbitMqMessageBuilder
* @description: RabbitMQ 消息构建器
*/
@Component
@Slf4j
public class RabbitMqMessageBuilder {
/**
* 构建基础消息
*/
public RabbitMqMessage buildBaseMessage(String taskName, String uploadPath) {
return RabbitMqMessage.createSimple(taskName, uploadPath);
}
/**
* 构建带自定义消息ID的消息
*/
public RabbitMqMessage buildMessageWithCustomId(String taskName, String uploadPath, String customMessageId) {
return RabbitMqMessage.builder()
.messageId(customMessageId)
.taskName(taskName)
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建文件处理消息
*/
public RabbitMqMessage buildFileProcessMessage(String filePath) {
return RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.FILE_UPLOAD, filePath);
}
/**
* 构建OCR处理消息
*/
public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath);
message.addBusinessData("taskId", taskId);
message.addBusinessData("processType", "OCR_PROCESS");
return message;
}
/**
* 构建带重试配置的消息
*/
public RabbitMqMessage buildMessageWithRetryConfig(String taskName, String uploadPath, int maxRetryCount) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.maxRetryCount(maxRetryCount)
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建高优先级消息
*/
public RabbitMqMessage buildHighPriorityMessage(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.priority(9) // 最高优先级
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建带过期时间的消息
*/
public RabbitMqMessage buildMessageWithTtl(String taskName, String uploadPath, long ttlMillis) {
RabbitMqMessage message = RabbitMqMessage.createSimple(taskName, uploadPath);
message.setExpirationRelative(ttlMillis);
return message;
}
/**
* 构建完整业务消息
*/
public RabbitMqMessage buildBusinessMessage(String taskName, String uploadPath,
String sourceSystem, String targetSystem,
Map<String, Object> businessData) {
RabbitMqMessage message = RabbitMqMessage.createWithBusinessData(taskName, uploadPath, businessData);
message.setSourceSystem(sourceSystem);
message.setTargetSystem(targetSystem);
return message;
}
/**
* 构建图片处理消息
*/
public RabbitMqMessage buildImageProcessMessage(String imagePath, String processType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.IMAGE_PROCESS, imagePath);
message.addBusinessData("processType", processType);
message.addBusinessData("imageFormat", getFileExtension(imagePath));
return message;
}
/**
* 构建文档提取消息
*/
public RabbitMqMessage buildDocumentExtractMessage(String documentPath, String extractType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.DOCUMENT_EXTRACT, documentPath);
message.addBusinessData("extractType", extractType);
message.addBusinessData("documentType", getFileExtension(documentPath));
return message;
}
/**
* 获取文件扩展名
*/
private String getFileExtension(String filePath) {
if (filePath == null || filePath.lastIndexOf(".") == -1) {
return "unknown";
}
return filePath.substring(filePath.lastIndexOf(".") + 1).toLowerCase();
}
/**
* 生成文件相关的消息ID
*/
private String generateFileMessageId(String taskName, String filePath) {
String fileHash = Integer.toHexString(filePath.hashCode());
return taskName + "_" + fileHash + "_" + System.currentTimeMillis();
}
}

View File

@ -1,7 +1,6 @@
package com.bonus.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.rabbitmq.message.RabbitMqMessageBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -14,6 +13,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
@ -21,326 +21,58 @@ import java.util.concurrent.TimeoutException;
* @author:cwchen
* @date:2025-11-24-16:16
* @version:1.0
* @description:生产者服务 - 修复版本
* @description:生产者服务 - 简化版本无重试机制
*/
@Service(value = "RabbitMQProducerService")
@Slf4j
public class RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate;
private final RabbitMqMessageBuilder messageBuilder;
private final Map<String, CompletableFuture<CorrelationData>> pendingConfirmations;
public RabbitMQProducerService(RabbitTemplate rabbitTemplate,
RabbitMqMessageBuilder messageBuilder) {
// 配置常量
private static final int CONFIRMATION_TIMEOUT_SECONDS = 60;
private static final String EXCHANGE_NAME = "myExchange";
private static final String ROUTING_KEY = "myRoutingKey";
public RabbitMQProducerService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.messageBuilder = messageBuilder;
this.pendingConfirmations = new ConcurrentHashMap<>();
// 设置确认回调
RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback();
this.rabbitTemplate.setConfirmCallback(confirmCallback);
this.rabbitTemplate.setConfirmCallback(createConfirmCallback());
// 设置返回回调用于处理路由失败
RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback();
this.rabbitTemplate.setReturnsCallback(returnsCallback);
this.rabbitTemplate.setReturnsCallback(createReturnsCallback());
}
/**
* 异步发送消息 - 基础版本
*/
public CompletableFuture<String> sendMessageAsync(String taskName, String uploadPath) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送消息 - 带自定义ID
*/
public CompletableFuture<String> sendMessageWithCustomIdAsync(String taskName,
String uploadPath,
String customMessageId) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送OCR处理消息 - 修复版本
*/
public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) {
return CompletableFuture.supplyAsync(() -> {
String messageId = null;
try {
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
messageId = message.getMessageId();
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}",
taskId, filePath, messageId, e.getMessage(), e);
// 触发补偿机制
handleOcrMessageSendFailure(taskId, filePath, messageId, e);
throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e);
}
});
}
/**
* 处理OCR消息发送失败
*/
private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) {
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
}
/**
* 异步发送带重试配置的消息
*/
public CompletableFuture<String> sendMessageWithRetryAsync(String taskName,
String uploadPath,
int maxRetryCount) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 批量异步发送消息
*/
public CompletableFuture<List<String>> sendBatchMessagesAsync(List<MessageTask> tasks) {
return CompletableFuture.supplyAsync(() -> {
List<String> messageIds = new ArrayList<>();
List<CompletableFuture<String>> futures = new ArrayList<>();
for (MessageTask task : tasks) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath());
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e);
return null;
}
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<String> future : futures) {
try {
String messageId = future.get();
if (messageId != null) {
messageIds.add(messageId);
}
} catch (Exception e) {
log.error("获取异步发送结果失败", e);
}
}
log.info("批量发送完成,成功发送 {} 条消息", messageIds.size());
return messageIds;
});
}
/**
* 发送消息并等待确认 - 修复版本带重试机制
*/
private String sendMessageWithConfirmation(RabbitMqMessage message) {
String messageId = message.getMessageId();
int maxRetries = 2; // 最大重试次数
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
return attemptSendMessage(message, retryCount);
} catch (TimeoutException e) {
retryCount++;
if (retryCount > maxRetries) {
log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e);
throw new RuntimeException("消息确认超时,重试失败", e);
}
log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId);
try {
// 指数退避策略
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
} catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e);
throw new RuntimeException("消息发送异常", e);
}
}
throw new RuntimeException("消息发送失败");
}
/**
* 尝试发送消息
* 关键修复确保确认回调能及时触发并增加超时时间
*/
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
String messageId = message.getMessageId();
// 为每次尝试创建新的CorrelationData
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>();
// 使用消息ID作为key确保重试时能覆盖之前的Future
String futureKey = messageId;
// 如果之前有未完成的Future先移除重试场景
CompletableFuture<CorrelationData> oldFuture = pendingConfirmations.remove(futureKey);
if (oldFuture != null && !oldFuture.isDone()) {
oldFuture.cancel(true);
}
pendingConfirmations.put(futureKey, confirmationFuture);
try {
// 发送消息
try {
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
} catch (Exception sendException) {
log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException);
}
// 关键修复由于确认回调可能不触发采用短超时 + 备用方案
// 如果消息发送成功没有异常且确认回调在短时间内未触发则认为成功
int shortTimeoutSeconds = 3; // 短超时3秒
long checkInterval = 50; // 每50ms检查一次更频繁
long maxWaitTime = shortTimeoutSeconds * 1000L;
long elapsed = 0;
boolean confirmReceived = false;
while (elapsed < maxWaitTime) {
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get(); // 获取确认结果
confirmReceived = true;
break;
} catch (Exception e) {
log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
}
Thread.sleep(checkInterval);
elapsed += checkInterval;
}
// 如果确认回调未触发但消息发送成功采用备用方案
if (!confirmReceived) {
Thread.sleep(500); // 再等待500ms
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get();
confirmReceived = true;
} catch (Exception e) {
// 忽略延迟确认回调的异常
}
}
if (!confirmReceived) {
// 备用方案消息发送成功没有异常认为成功
confirmationFuture.complete(correlationData);
confirmReceived = true;
}
}
if (confirmReceived) {
// 确认成功后移除Future
pendingConfirmations.remove(futureKey, confirmationFuture);
return messageId;
}
// 超时处理如果备用方案也失败
log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount);
// 延迟移除Future给确认回调一个机会
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
pendingConfirmations.remove(futureKey);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
});
// 抛出TimeoutException让外层重试机制处理
throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId());
} catch (TimeoutException e) {
// 重新抛出TimeoutException不在这里处理
throw e;
} catch (Exception e) {
log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}",
messageId, retryCount, e.getMessage(), e);
// 异常时移除对应的Future
if (futureKey != null) {
pendingConfirmations.remove(futureKey, confirmationFuture);
}
throw e;
}
}
/**
* 创建确认回调 - 修复版本关键修复
* 注意RabbitMQ的确认回调是异步的确认的是消息是否被Broker接收不是是否被消费
* 关键确认回调应该在消息发送到Broker后立即触发而不是在消费者消费时
* 创建确认回调
*/
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> {
if (correlationData == null || correlationData.getId() == null) {
log.warn("收到空的确认回调数据");
return;
}
String correlationId = correlationData.getId();
String messageId = extractOriginalMessageId(correlationId);
String messageId = correlationData.getId();
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future == null) {
// Future可能已经被移除超时或异常这是正常的
log.debug("确认回调对应的Future不存在 - ID: {}", messageId);
return;
}
// 检查Future是否已经完成防止重复完成
if (future.isDone()) {
log.debug("确认回调对应的Future已完成 - ID: {}", messageId);
return;
}
if (ack) {
future.complete(correlationData);
log.debug("消息确认成功 - ID: {}", messageId);
} else {
String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg);
@ -350,17 +82,7 @@ public class RabbitMQProducerService {
}
/**
* 从correlationId中提取原始messageId
*/
private String extractOriginalMessageId(String correlationId) {
if (correlationId.contains("_attempt_")) {
return correlationId.substring(0, correlationId.indexOf("_attempt_"));
}
return correlationId;
}
/**
* 创建返回回调处理路由失败 - 修复版本
* 创建返回回调处理路由失败
*/
private RabbitTemplate.ReturnsCallback createReturnsCallback() {
return returned -> {
@ -370,28 +92,26 @@ public class RabbitMQProducerService {
String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
if (correlationId != null) {
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(correlationId);
if (future != null) {
String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
future.completeExceptionally(new RuntimeException(errorMsg));
pendingConfirmations.remove(messageId, future);
pendingConfirmations.remove(correlationId, future);
}
}
};
}
/**
* 同步发送消息备用方法
* 同步发送消息
*/
public String sendMessageSync(RabbitMqMessage message) {
try {
String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
log.info("Sending message: {}", message);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
return messageId;
} catch (Exception e) {
log.error("同步发送消息失败: {}", e.getMessage(), e);
@ -399,32 +119,5 @@ public class RabbitMQProducerService {
}
}
/**
* 获取未确认的消息数量监控用
*/
public int getPendingConfirmationsCount() {
return pendingConfirmations.size();
}
/**
* 清理过期的确认Future防止内存泄漏
*/
public void cleanupExpiredConfirmations() {
pendingConfirmations.entrySet().removeIf(entry -> {
CompletableFuture<CorrelationData> future = entry.getValue();
// 这里可以根据业务需求添加更复杂的过期判断逻辑
// 例如如果Future已经创建超过30分钟且未完成则移除
return future.isDone();
});
}
/**
* 任务数据类
*/
@Data
@AllArgsConstructor
public static class MessageTask {
private String taskName;
private String uploadPath;
}
}

View File

@ -1,5 +1,6 @@
package com.bonus.rabbitmq.service;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.rabbitmq.producer.RabbitMQProducerService;
import org.springframework.stereotype.Service;
@ -24,7 +25,11 @@ public class SendRabbitMqService {
for (Map<String,Object> map : list) {
String taskId = map.get("taskId").toString();
String uploadPath = map.get("uploadPath").toString();
rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath);
RabbitMqMessage mqMessage = new RabbitMqMessage();
mqMessage.setMessageId(taskId);
mqMessage.setUploadPath(uploadPath);
mqMessage.setTaskName("OCR_PROCESS");
rabbitMQProducerService.sendMessageSync(mqMessage);
}
}
}