招标解析异步任务

This commit is contained in:
cwchen 2025-11-25 10:19:47 +08:00
parent 6443da08ea
commit 15f1053a33
1 changed files with 168 additions and 33 deletions

View File

@ -17,13 +17,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.UUID;
/** /**
* @className:RabbitMQProducerService * @className:RabbitMQProducerService
* @author:cwchen * @author:cwchen
* @date:2025-11-24-16:16 * @date:2025-11-24-16:16
* @version:1.0 * @version:1.0
* @description:生产者服务 * @description:生产者服务 - 修复版本
*/ */
@Service(value = "RabbitMQProducerService") @Service(value = "RabbitMQProducerService")
@Slf4j @Slf4j
@ -78,20 +79,47 @@ public class RabbitMQProducerService {
} }
/** /**
* 异步发送OCR处理消息 * 异步发送OCR处理消息 - 修复版本
*/ */
public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) { public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) {
return CompletableFuture.supplyAsync(() -> { return CompletableFuture.supplyAsync(() -> {
String messageId = null;
try { try {
log.info("开始发送OCR消息 - TaskId: {}, File: {}", taskId, filePath);
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
return sendMessageWithConfirmation(message); messageId = message.getMessageId();
String result = sendMessageWithConfirmation(message);
log.info("OCR消息发送成功 - TaskId: {}, MessageId: {}", taskId, result);
return result;
} catch (Exception e) { } catch (Exception e) {
log.error("异步发送OCR消息失败: {}", e.getMessage(), e); log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}",
throw new RuntimeException("OCR消息发送失败", e); taskId, filePath, messageId, e.getMessage(), e);
// 触发补偿机制
handleOcrMessageSendFailure(taskId, filePath, messageId, e);
throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e);
} }
}); });
} }
/**
* 处理OCR消息发送失败
*/
private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) {
try {
log.warn("处理OCR消息发送失败补偿 - TaskId: {}, MessageId: {}", taskId, messageId);
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
// 示例记录到日志文件或数据库
// failureRecordService.recordOcrMessageFailure(taskId, filePath, messageId, e.getMessage());
} catch (Exception ex) {
log.error("处理OCR消息发送失败补偿时发生异常", ex);
}
}
/** /**
* 异步发送带重试配置的消息 * 异步发送带重试配置的消息
*/ */
@ -150,47 +178,90 @@ public class RabbitMQProducerService {
} }
/** /**
* 发送消息并等待确认 * 发送消息并等待确认 - 修复版本带重试机制
*/ */
private String sendMessageWithConfirmation(RabbitMqMessage message) { private String sendMessageWithConfirmation(RabbitMqMessage message) {
String messageId = message.getMessageId(); String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId); int maxRetries = 2; // 最大重试次数
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
return attemptSendMessage(message, retryCount);
} catch (TimeoutException e) {
retryCount++;
if (retryCount > maxRetries) {
log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e);
throw new RuntimeException("消息确认超时,重试失败", e);
}
log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId);
try {
// 指数退避策略
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
} catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e);
throw new RuntimeException("消息发送异常", e);
}
}
throw new RuntimeException("消息发送失败");
}
/**
* 尝试发送消息
*/
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
String messageId = message.getMessageId();
// 为每次尝试创建新的CorrelationData
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
// 创建确认Future
CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>(); CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>();
// 保存到全局的Map中确保确认回调可以访问 // 使用消息ID作为key确保重试时能覆盖之前的Future
pendingConfirmations.put(messageId, confirmationFuture); String futureKey = messageId;
pendingConfirmations.put(futureKey, confirmationFuture);
try { try {
log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}", log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}",
messageId, message.getTaskName(), message.getUploadPath()); messageId, message.getTaskName(), message.getUploadPath(), retryCount);
// 发送消息 // 发送消息
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
log.info("消息已发送到Broker - ID: {}", messageId); log.info("消息已发送到Broker - ID: {}", messageId);
// 等待确认带超时 // 动态超时时间
CorrelationData result = confirmationFuture.get(10, TimeUnit.SECONDS); int timeoutSeconds = retryCount == 0 ? 10 : 15;
CorrelationData result = confirmationFuture.get(timeoutSeconds, TimeUnit.SECONDS);
log.info("消息发送确认成功 - ID: {}", messageId); log.info("消息发送确认成功 - ID: {}", messageId);
return messageId; return messageId;
} catch (TimeoutException e) { } catch (TimeoutException e) {
log.error("消息确认超时 - ID: {}", messageId, e); log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount, e);
// 超时时移除对应的Future // 超时时移除对应的Future
pendingConfirmations.remove(messageId); pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息确认超时", e); throw e;
} catch (Exception e) { } catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e); log.error("消息发送异常 - ID: {}, 重试次数: {}", messageId, retryCount, e);
pendingConfirmations.remove(messageId); // 异常时移除对应的Future
throw new RuntimeException("消息发送异常", e); pendingConfirmations.remove(futureKey, confirmationFuture);
throw e;
} finally {
// 最终清理如果Future还未完成确保移除
if (!confirmationFuture.isDone()) {
pendingConfirmations.remove(futureKey, confirmationFuture);
}
} }
} }
/** /**
* 创建确认回调 * 创建确认回调 - 修复版本关键修复
*/ */
private RabbitTemplate.ConfirmCallback createConfirmCallback() { private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> { return (correlationData, ack, cause) -> {
@ -199,19 +270,47 @@ public class RabbitMQProducerService {
return; return;
} }
String messageId = correlationData.getId(); String correlationId = correlationData.getId();
if (correlationId == null) {
log.error("确认回调中correlationId为null");
return;
}
// 从correlationId中提取原始messageId去掉重试后缀
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future == null) {
log.warn("未找到对应的确认Future, 可能已超时或已完成 - ID: {}, CorrelationId: {}", messageId, correlationId);
return;
}
if (ack) { if (ack) {
log.debug("消息发送确认成功 - ID: {}", messageId); log.info("消息发送确认成功 - ID: {}, CorrelationId: {}", messageId, correlationId);
// 这里可以更新消息状态为已确认 // 完成Future解除阻塞
future.complete(correlationData);
} else { } else {
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, cause); String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
// 这里可以处理发送失败的消息 log.error("消息发送确认失败 - ID: {}, CorrelationId: {}, 原因: {}",
messageId, correlationId, errorMsg);
// 完成Future并传递异常
future.completeExceptionally(new RuntimeException(errorMsg));
} }
}; };
} }
/** /**
* 创建返回回调处理路由失败 * 从correlationId中提取原始messageId
*/
private String extractOriginalMessageId(String correlationId) {
if (correlationId.contains("_attempt_")) {
return correlationId.substring(0, correlationId.indexOf("_attempt_"));
}
return correlationId;
}
/**
* 创建返回回调处理路由失败 - 修复版本
*/ */
private RabbitTemplate.ReturnsCallback createReturnsCallback() { private RabbitTemplate.ReturnsCallback createReturnsCallback() {
return returned -> { return returned -> {
@ -219,13 +318,22 @@ public class RabbitMQProducerService {
returned.getExchange(), returned.getRoutingKey(), returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText()); returned.getReplyCode(), returned.getReplyText());
// 如果有CorrelationData可以在这里处理 // 处理返回的消息
if (returned.getMessage().getMessageProperties().getCorrelationId() != null) { String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
String messageId = returned.getMessage().getMessageProperties().getCorrelationId(); if (correlationId != null) {
CompletableFuture<CorrelationData> future = pendingConfirmations.remove(messageId); String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future != null) { if (future != null) {
future.completeExceptionally(new RuntimeException("消息路由失败: " + returned.getReplyText())); String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
future.completeExceptionally(new RuntimeException(errorMsg));
// 路由失败后立即移除避免重复处理
pendingConfirmations.remove(messageId, future);
} else {
log.warn("路由失败的消息未找到对应的Future - CorrelationId: {}", correlationId);
} }
} else {
log.error("路由失败的消息没有CorrelationId");
} }
}; };
} }
@ -250,6 +358,33 @@ public class RabbitMQProducerService {
} }
} }
/**
* 获取未确认的消息数量监控用
*/
public int getPendingConfirmationsCount() {
return pendingConfirmations.size();
}
/**
* 清理过期的确认Future防止内存泄漏
*/
public void cleanupExpiredConfirmations() {
int initialSize = pendingConfirmations.size();
long now = System.currentTimeMillis();
pendingConfirmations.entrySet().removeIf(entry -> {
CompletableFuture<CorrelationData> future = entry.getValue();
// 这里可以根据业务需求添加更复杂的过期判断逻辑
// 例如如果Future已经创建超过30分钟且未完成则移除
return future.isDone();
});
int removed = initialSize - pendingConfirmations.size();
if (removed > 0) {
log.info("清理了 {} 个已完成的确认Future", removed);
}
}
/** /**
* 任务数据类 * 任务数据类
*/ */
@ -259,4 +394,4 @@ public class RabbitMQProducerService {
private String taskName; private String taskName;
private String uploadPath; private String uploadPath;
} }
} }