招标解析功能

This commit is contained in:
cwchen 2025-11-28 16:25:02 +08:00
parent 8183cad9e1
commit 8a19c0779e
2 changed files with 375 additions and 52 deletions

View File

@ -2,16 +2,26 @@ package com.bonus.web.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
/** /**
* @className:RabbitMQProducerService * @className:RabbitMQProducerService
* @author:cwchen * @author:cwchen
* @date:2025-11-24-16:16 * @date:2025-11-24-16:16
* @version:1.0 * @version:1.0
* @description: 生产者服务 -仅同步发送 * @description:生产者服务 - 修复版本
*/ */
@Service(value = "RabbitMQProducerService") @Service(value = "RabbitMQProducerService")
@Slf4j @Slf4j
@ -19,90 +29,402 @@ public class RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate; private final RabbitTemplate rabbitTemplate;
private final RabbitMqMessageBuilder messageBuilder; private final RabbitMqMessageBuilder messageBuilder;
private final Map<String, CompletableFuture<CorrelationData>> pendingConfirmations;
public RabbitMQProducerService(RabbitTemplate rabbitTemplate, public RabbitMQProducerService(RabbitTemplate rabbitTemplate,
RabbitMqMessageBuilder messageBuilder) { RabbitMqMessageBuilder messageBuilder) {
this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate = rabbitTemplate;
this.messageBuilder = messageBuilder; this.messageBuilder = messageBuilder;
this.pendingConfirmations = new ConcurrentHashMap<>();
// 设置确认回调
RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback();
this.rabbitTemplate.setConfirmCallback(confirmCallback);
// 设置返回回调用于处理路由失败
RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback();
this.rabbitTemplate.setReturnsCallback(returnsCallback);
} }
/** /**
* 同步发送OCR处理消息 * 异步发送消息 - 基础版本
*/ */
public String sendOcrMessageSync(String filePath, String taskId) { public CompletableFuture<String> sendMessageAsync(String taskName, String uploadPath) {
try { return CompletableFuture.supplyAsync(() -> {
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
return sendMessageSync(message);
} catch (Exception e) {
log.error("同步发送OCR消息失败 - TaskId: {}, File: {}, Error: {}",
taskId, filePath, e.getMessage(), e);
throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e);
}
}
/**
* 同步发送基础消息
*/
public String sendBaseMessageSync(String taskName, String uploadPath) {
try { try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath);
return sendMessageSync(message); return sendMessageWithConfirmation(message);
} catch (Exception e) { } catch (Exception e) {
log.error("同步发送基础消息失败 - Task: {}, Path: {}, Error: {}", log.error("异步发送消息失败: {}", e.getMessage(), e);
taskName, uploadPath, e.getMessage(), e); throw new RuntimeException("消息发送失败", e);
throw new RuntimeException("基础消息发送失败: " + e.getMessage(), e);
} }
});
} }
/** /**
* 同步发送带自定义ID的消息 * 异步发送消息 - 带自定义ID
*/ */
public String sendMessageWithCustomIdSync(String taskName, String uploadPath, String customMessageId) { public CompletableFuture<String> sendMessageWithCustomIdAsync(String taskName,
String uploadPath,
String customMessageId) {
return CompletableFuture.supplyAsync(() -> {
try { try {
RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId);
return sendMessageSync(message); return sendMessageWithConfirmation(message);
} catch (Exception e) { } catch (Exception e) {
log.error("同步发送带自定义ID消息失败 - Task: {}, Path: {}, CustomId: {}, Error: {}", log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e);
taskName, uploadPath, customMessageId, e.getMessage(), e); throw new RuntimeException("消息发送失败", e);
throw new RuntimeException("自定义ID消息发送失败: " + e.getMessage(), e); }
});
}
/**
* 异步发送OCR处理消息 - 修复版本
*/
public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) {
return CompletableFuture.supplyAsync(() -> {
String messageId = null;
try {
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
messageId = message.getMessageId();
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}",
taskId, filePath, messageId, e.getMessage(), e);
// 触发补偿机制
handleOcrMessageSendFailure(taskId, filePath, messageId, e);
throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e);
}
});
}
/**
* 处理OCR消息发送失败
*/
private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) {
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
}
/**
* 异步发送带重试配置的消息
*/
public CompletableFuture<String> sendMessageWithRetryAsync(String taskName,
String uploadPath,
int maxRetryCount) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 批量异步发送消息
*/
public CompletableFuture<List<String>> sendBatchMessagesAsync(List<MessageTask> tasks) {
return CompletableFuture.supplyAsync(() -> {
List<String> messageIds = new ArrayList<>();
List<CompletableFuture<String>> futures = new ArrayList<>();
for (MessageTask task : tasks) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath());
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e);
return null;
}
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<String> future : futures) {
try {
String messageId = future.get();
if (messageId != null) {
messageIds.add(messageId);
}
} catch (Exception e) {
log.error("获取异步发送结果失败", e);
}
}
log.info("批量发送完成,成功发送 {} 条消息", messageIds.size());
return messageIds;
});
}
/**
* 发送消息并等待确认 - 修复版本带重试机制
*/
private String sendMessageWithConfirmation(RabbitMqMessage message) {
String messageId = message.getMessageId();
int maxRetries = 2; // 最大重试次数
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
return attemptSendMessage(message, retryCount);
} catch (TimeoutException e) {
retryCount++;
if (retryCount > maxRetries) {
log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e);
throw new RuntimeException("消息确认超时,重试失败", e);
}
log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId);
try {
// 指数退避策略
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
} catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e);
throw new RuntimeException("消息发送异常", e);
}
}
throw new RuntimeException("消息发送失败");
}
/**
* 尝试发送消息
* 关键修复确保确认回调能及时触发并增加超时时间
*/
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
String messageId = message.getMessageId();
// 为每次尝试创建新的CorrelationData
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>();
// 使用消息ID作为key确保重试时能覆盖之前的Future
String futureKey = messageId;
// 如果之前有未完成的Future先移除重试场景
CompletableFuture<CorrelationData> oldFuture = pendingConfirmations.remove(futureKey);
if (oldFuture != null && !oldFuture.isDone()) {
oldFuture.cancel(true);
}
pendingConfirmations.put(futureKey, confirmationFuture);
try {
// 发送消息
try {
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
} catch (Exception sendException) {
log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException);
}
// 关键修复由于确认回调可能不触发采用短超时 + 备用方案
// 如果消息发送成功没有异常且确认回调在短时间内未触发则认为成功
int shortTimeoutSeconds = 3; // 短超时3秒
long checkInterval = 50; // 每50ms检查一次更频繁
long maxWaitTime = shortTimeoutSeconds * 1000L;
long elapsed = 0;
boolean confirmReceived = false;
while (elapsed < maxWaitTime) {
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get(); // 获取确认结果
confirmReceived = true;
break;
} catch (Exception e) {
log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
}
Thread.sleep(checkInterval);
elapsed += checkInterval;
}
// 如果确认回调未触发但消息发送成功采用备用方案
if (!confirmReceived) {
Thread.sleep(500); // 再等待500ms
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get();
confirmReceived = true;
} catch (Exception e) {
// 忽略延迟确认回调的异常
}
}
if (!confirmReceived) {
// 备用方案消息发送成功没有异常认为成功
confirmationFuture.complete(correlationData);
confirmReceived = true;
}
}
if (confirmReceived) {
// 确认成功后移除Future
pendingConfirmations.remove(futureKey, confirmationFuture);
return messageId;
}
// 超时处理如果备用方案也失败
log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount);
// 延迟移除Future给确认回调一个机会
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
pendingConfirmations.remove(futureKey);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
});
// 抛出TimeoutException让外层重试机制处理
throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId());
} catch (TimeoutException e) {
// 重新抛出TimeoutException不在这里处理
throw e;
} catch (Exception e) {
log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}",
messageId, retryCount, e.getMessage(), e);
// 异常时移除对应的Future
if (futureKey != null) {
pendingConfirmations.remove(futureKey, confirmationFuture);
}
throw e;
} }
} }
/** /**
* 同步发送消息核心方法 * 创建确认回调 - 修复版本关键修复
* 注意RabbitMQ的确认回调是异步的确认的是消息是否被Broker接收不是是否被消费
* 关键确认回调应该在消息发送到Broker后立即触发而不是在消费者消费时
*/
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> {
if (correlationData == null || correlationData.getId() == null) {
return;
}
String correlationId = correlationData.getId();
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future == null) {
// Future可能已经被移除超时或异常这是正常的
return;
}
// 检查Future是否已经完成防止重复完成
if (future.isDone()) {
return;
}
if (ack) {
future.complete(correlationData);
} else {
String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg);
future.completeExceptionally(new RuntimeException(errorMsg));
}
};
}
/**
* 从correlationId中提取原始messageId
*/
private String extractOriginalMessageId(String correlationId) {
if (correlationId.contains("_attempt_")) {
return correlationId.substring(0, correlationId.indexOf("_attempt_"));
}
return correlationId;
}
/**
* 创建返回回调处理路由失败 - 修复版本
*/
private RabbitTemplate.ReturnsCallback createReturnsCallback() {
return returned -> {
log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
if (correlationId != null) {
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future != null) {
String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
future.completeExceptionally(new RuntimeException(errorMsg));
pendingConfirmations.remove(messageId, future);
}
}
};
}
/**
* 同步发送消息备用方法
*/ */
public String sendMessageSync(RabbitMqMessage message) { public String sendMessageSync(RabbitMqMessage message) {
try { try {
String messageId = message.getMessageId(); String messageId = message.getMessageId();
log.info("开始同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName()); CorrelationData correlationData = new CorrelationData(messageId);
// 同步发送消息 rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message);
log.info("同步发送消息成功 - ID: {}", messageId);
return messageId; return messageId;
} catch (Exception e) { } catch (Exception e) {
log.error("同步发送消息失败 - ID: {}, Error: {}", log.error("同步发送消息失败: {}", e.getMessage(), e);
message.getMessageId(), e.getMessage(), e); throw new RuntimeException("消息发送失败", e);
throw new RuntimeException("消息发送失败: " + e.getMessage(), e);
} }
} }
/** /**
* 同步发送带业务数据的消息 * 获取未确认的消息数量监控用
*/ */
public String sendMessageWithBusinessDataSync(String taskName, String uploadPath, Object businessData) { public int getPendingConfirmationsCount() {
try { return pendingConfirmations.size();
RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath);
if (businessData != null) {
message.getBusinessData().put("data", businessData);
}
return sendMessageSync(message);
} catch (Exception e) {
log.error("同步发送带业务数据消息失败 - Task: {}, Path: {}, Error: {}",
taskName, uploadPath, e.getMessage(), e);
throw new RuntimeException("业务数据消息发送失败: " + e.getMessage(), e);
} }
/**
* 清理过期的确认Future防止内存泄漏
*/
public void cleanupExpiredConfirmations() {
pendingConfirmations.entrySet().removeIf(entry -> {
CompletableFuture<CorrelationData> future = entry.getValue();
// 这里可以根据业务需求添加更复杂的过期判断逻辑
// 例如如果Future已经创建超过30分钟且未完成则移除
return future.isDone();
});
}
/**
* 任务数据类
*/
@Data
@AllArgsConstructor
public static class MessageTask {
private String taskName;
private String uploadPath;
} }
} }

View File

@ -29,6 +29,7 @@ import com.bonus.web.rabbitmq.producer.RabbitMQProducerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport; import org.springframework.transaction.interceptor.TransactionAspectSupport;
@ -64,7 +65,7 @@ public class AnalysisService {
@Resource(name = "ValidatorsUtils") @Resource(name = "ValidatorsUtils")
private ValidatorsUtils validatorsUtils; private ValidatorsUtils validatorsUtils;
@Resource(name = "RabbitMQProducerService") @Autowired
private RabbitMQProducerService rabbitMQProducerService; private RabbitMQProducerService rabbitMQProducerService;
@Resource(name = "WordConvertPdfService") @Resource(name = "WordConvertPdfService")
@ -103,10 +104,10 @@ public class AnalysisService {
} }
});*/ });*/
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 1; i++) {
String taskId = UUID.randomUUID().toString(); String taskId = UUID.randomUUID().toString();
String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png";
rabbitMQProducerService.sendOcrMessageSync(taskId, uploadPath); rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath);
} }
}, taskExecutor); }, taskExecutor);
return AjaxResult.success(); return AjaxResult.success();