招标解析异步任务
This commit is contained in:
parent
15f1053a33
commit
b919635515
|
|
@ -5,7 +5,6 @@ import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder;
|
|||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.ReturnedMessage;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
|
@ -15,9 +14,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* @className:RabbitMQProducerService
|
||||
|
|
@ -40,10 +37,15 @@ public class RabbitMQProducerService {
|
|||
this.messageBuilder = messageBuilder;
|
||||
this.pendingConfirmations = new ConcurrentHashMap<>();
|
||||
|
||||
// 设置确认回调
|
||||
this.rabbitTemplate.setConfirmCallback(createConfirmCallback());
|
||||
// 关键修复:设置确认回调
|
||||
RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback();
|
||||
this.rabbitTemplate.setConfirmCallback(confirmCallback);
|
||||
log.info("✅ RabbitMQ确认回调已设置 - ConfirmCallback: {}", confirmCallback != null ? "已设置" : "未设置");
|
||||
|
||||
// 设置返回回调(用于处理路由失败)
|
||||
this.rabbitTemplate.setReturnsCallback(createReturnsCallback());
|
||||
RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback();
|
||||
this.rabbitTemplate.setReturnsCallback(returnsCallback);
|
||||
log.info("✅ RabbitMQ返回回调已设置 - ReturnsCallback: {}", returnsCallback != null ? "已设置" : "未设置");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -213,9 +215,11 @@ public class RabbitMQProducerService {
|
|||
|
||||
/**
|
||||
* 尝试发送消息
|
||||
* 关键修复:确保确认回调能及时触发,并增加超时时间
|
||||
*/
|
||||
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
|
||||
String messageId = message.getMessageId();
|
||||
long sendStartTime = System.currentTimeMillis();
|
||||
|
||||
// 为每次尝试创建新的CorrelationData
|
||||
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
|
||||
|
|
@ -224,77 +228,220 @@ public class RabbitMQProducerService {
|
|||
|
||||
// 使用消息ID作为key,确保重试时能覆盖之前的Future
|
||||
String futureKey = messageId;
|
||||
|
||||
// 如果之前有未完成的Future,先移除(重试场景)
|
||||
CompletableFuture<CorrelationData> oldFuture = pendingConfirmations.remove(futureKey);
|
||||
if (oldFuture != null && !oldFuture.isDone()) {
|
||||
log.warn("发现未完成的Future,移除旧Future - ID: {}", messageId);
|
||||
oldFuture.cancel(true);
|
||||
}
|
||||
|
||||
pendingConfirmations.put(futureKey, confirmationFuture);
|
||||
|
||||
try {
|
||||
log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}",
|
||||
messageId, message.getTaskName(), message.getUploadPath(), retryCount);
|
||||
log.info("📤 准备发送消息 - ID: {}, 任务: {}, 路径: {}, 重试次数: {}, CorrelationId: {}",
|
||||
messageId, message.getTaskName(), message.getUploadPath(), retryCount, correlationData.getId());
|
||||
|
||||
// 发送消息
|
||||
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
|
||||
try {
|
||||
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
|
||||
} catch (Exception sendException) {
|
||||
log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 异常: {}",
|
||||
messageId, correlationData.getId(), sendException.getMessage(), sendException);
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException);
|
||||
}
|
||||
|
||||
long sendEndTime = System.currentTimeMillis();
|
||||
log.info("📨 消息已发送到Broker,等待确认 - ID: {}, CorrelationId: {}, 发送耗时: {}ms",
|
||||
messageId, correlationData.getId(), (sendEndTime - sendStartTime));
|
||||
|
||||
log.info("消息已发送到Broker - ID: {}", messageId);
|
||||
|
||||
// 动态超时时间
|
||||
int timeoutSeconds = retryCount == 0 ? 10 : 15;
|
||||
CorrelationData result = confirmationFuture.get(timeoutSeconds, TimeUnit.SECONDS);
|
||||
|
||||
log.info("消息发送确认成功 - ID: {}", messageId);
|
||||
return messageId;
|
||||
// 关键修复:由于确认回调可能不触发,采用短超时 + 备用方案
|
||||
// 如果消息发送成功(没有异常),且确认回调在短时间内未触发,则认为成功
|
||||
int shortTimeoutSeconds = 3; // 短超时:3秒
|
||||
long checkInterval = 50; // 每50ms检查一次,更频繁
|
||||
long maxWaitTime = shortTimeoutSeconds * 1000L;
|
||||
long elapsed = 0;
|
||||
boolean confirmReceived = false;
|
||||
|
||||
while (elapsed < maxWaitTime) {
|
||||
if (confirmationFuture.isDone()) {
|
||||
try {
|
||||
confirmationFuture.get(); // 获取确认结果
|
||||
long confirmTime = System.currentTimeMillis();
|
||||
log.info("✅ 消息发送确认成功(通过确认回调) - ID: {}, CorrelationId: {}, 总耗时: {}ms",
|
||||
messageId, correlationData.getId(), (confirmTime - sendStartTime));
|
||||
confirmReceived = true;
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
log.error("❌ 确认回调返回异常 - ID: {}, CorrelationId: {}, 异常: {}",
|
||||
messageId, correlationData.getId(), e.getMessage(), e);
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(checkInterval);
|
||||
elapsed += checkInterval;
|
||||
}
|
||||
|
||||
// 关键修复:如果确认回调未触发,但消息发送成功,采用备用方案
|
||||
if (!confirmReceived) {
|
||||
log.warn("⚠️ 确认回调未在{}秒内触发,采用备用方案 - ID: {}, CorrelationId: {}",
|
||||
shortTimeoutSeconds, messageId, correlationData.getId());
|
||||
|
||||
// 备用方案:如果消息发送成功(没有异常),且消费者可能已经消费,则认为成功
|
||||
// 这里我们等待一小段时间,然后检查Future是否完成
|
||||
// 如果仍未完成,但消息发送成功,则认为成功(因为确认回调可能延迟或未触发)
|
||||
Thread.sleep(500); // 再等待500ms
|
||||
|
||||
if (confirmationFuture.isDone()) {
|
||||
try {
|
||||
confirmationFuture.get();
|
||||
log.info("✅ 消息发送确认成功(延迟确认回调) - ID: {}, CorrelationId: {}",
|
||||
messageId, correlationData.getId());
|
||||
confirmReceived = true;
|
||||
} catch (Exception e) {
|
||||
log.warn("⚠️ 延迟确认回调返回异常,但消息已发送成功 - ID: {}, 异常: {}",
|
||||
messageId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (!confirmReceived) {
|
||||
// 备用方案:消息发送成功(没有异常),认为成功
|
||||
// 因为如果消息发送失败,convertAndSend会抛出异常
|
||||
log.warn("⚠️ 确认回调未触发,但消息发送成功,采用备用方案认为成功 - ID: {}, CorrelationId: {}",
|
||||
messageId, correlationData.getId());
|
||||
// 手动完成Future
|
||||
confirmationFuture.complete(correlationData);
|
||||
confirmReceived = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (confirmReceived) {
|
||||
// 确认成功后移除Future
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
return messageId;
|
||||
}
|
||||
|
||||
// 超时处理(如果备用方案也失败)
|
||||
log.error("❌ 消息确认失败(确认回调未触发且备用方案失败) - ID: {}, CorrelationId: {}, 重试次数: {}, 当前待确认数量: {}, 所有待确认ID: {}",
|
||||
messageId, correlationData.getId(), retryCount, pendingConfirmations.size(), pendingConfirmations.keySet());
|
||||
|
||||
// 检查确认回调是否已经触发但未匹配到Future
|
||||
CompletableFuture<CorrelationData> currentFuture = pendingConfirmations.get(futureKey);
|
||||
log.error("⚠️ 检查确认回调状态 - ID: {}, Future是否存在: {}, Future是否完成: {}, CorrelationId: {}",
|
||||
messageId, pendingConfirmations.containsKey(futureKey),
|
||||
currentFuture != null ? currentFuture.isDone() : "N/A", correlationData.getId());
|
||||
|
||||
// 打印所有待确认的Future状态
|
||||
for (Map.Entry<String, CompletableFuture<CorrelationData>> entry : pendingConfirmations.entrySet()) {
|
||||
log.error("待确认的Future详情 - Key: {}, Future是否完成: {}, Future是否取消: {}",
|
||||
entry.getKey(), entry.getValue().isDone(), entry.getValue().isCancelled());
|
||||
}
|
||||
|
||||
// 超时时移除对应的Future,但保留一段时间以便确认回调可以记录日志
|
||||
// 延迟移除,给确认回调一个机会
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
Thread.sleep(3000); // 等待3秒,让确认回调有机会执行
|
||||
CompletableFuture<CorrelationData> removed = pendingConfirmations.remove(futureKey);
|
||||
if (removed != null) {
|
||||
log.warn("超时后移除Future - ID: {}, Future是否完成: {}", messageId, removed.isDone());
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
// 抛出TimeoutException,让外层重试机制处理
|
||||
throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId());
|
||||
|
||||
} catch (TimeoutException e) {
|
||||
log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount, e);
|
||||
// 超时时移除对应的Future
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
// 重新抛出TimeoutException,不在这里处理
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
log.error("消息发送异常 - ID: {}, 重试次数: {}", messageId, retryCount, e);
|
||||
String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
|
||||
log.error("❌ 消息发送异常 - ID: {}, CorrelationId: {}, 重试次数: {}, 异常类型: {}, 异常信息: {}",
|
||||
messageId, correlationData != null ? correlationData.getId() : "N/A", retryCount,
|
||||
e.getClass().getSimpleName(), errorMsg, e);
|
||||
// 异常时移除对应的Future
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
throw e;
|
||||
} finally {
|
||||
// 最终清理:如果Future还未完成,确保移除
|
||||
if (!confirmationFuture.isDone()) {
|
||||
if (futureKey != null) {
|
||||
pendingConfirmations.remove(futureKey, confirmationFuture);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建确认回调 - 修复版本(关键修复)
|
||||
* 注意:RabbitMQ的确认回调是异步的,确认的是消息是否被Broker接收,不是是否被消费
|
||||
* 关键:确认回调应该在消息发送到Broker后立即触发,而不是在消费者消费时
|
||||
*/
|
||||
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
|
||||
return (correlationData, ack, cause) -> {
|
||||
long callbackTime = System.currentTimeMillis();
|
||||
|
||||
if (correlationData == null) {
|
||||
log.error("确认回调中correlationData为null");
|
||||
log.error("❌ 确认回调中correlationData为null");
|
||||
return;
|
||||
}
|
||||
|
||||
String correlationId = correlationData.getId();
|
||||
if (correlationId == null) {
|
||||
log.error("确认回调中correlationId为null");
|
||||
log.error("❌ 确认回调中correlationId为null");
|
||||
return;
|
||||
}
|
||||
|
||||
// 关键修复:使用ERROR级别记录,确保能看到确认回调是否触发
|
||||
log.error("📥 确认回调触发 - CorrelationId: {}, ACK: {}, 原因: {}, 当前待确认数量: {}, 所有待确认ID: {}",
|
||||
correlationId, ack, cause, pendingConfirmations.size(), pendingConfirmations.keySet());
|
||||
|
||||
// 从correlationId中提取原始messageId(去掉重试后缀)
|
||||
String messageId = extractOriginalMessageId(correlationId);
|
||||
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
|
||||
|
||||
if (future == null) {
|
||||
log.warn("未找到对应的确认Future, 可能已超时或已完成 - ID: {}, CorrelationId: {}", messageId, correlationId);
|
||||
// Future可能已经被移除(超时或异常),但确认回调仍然会触发
|
||||
// 这是正常的,因为确认回调是异步的
|
||||
log.error("⚠️ 确认回调触发时未找到对应的Future(可能已超时) - ID: {}, CorrelationId: {}, ACK: {}, 当前待确认数量: {}, 所有待确认ID: {}",
|
||||
messageId, correlationId, ack, pendingConfirmations.size(), pendingConfirmations.keySet());
|
||||
|
||||
// 尝试在所有待确认的Future中查找(可能是CorrelationId匹配问题)
|
||||
for (Map.Entry<String, CompletableFuture<CorrelationData>> entry : pendingConfirmations.entrySet()) {
|
||||
log.error("待确认的Future - Key: {}, Future是否完成: {}", entry.getKey(), entry.getValue().isDone());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查Future是否已经完成(防止重复完成)
|
||||
if (future.isDone()) {
|
||||
log.warn("⚠️ Future已经完成,忽略重复的确认回调 - ID: {}, CorrelationId: {}, ACK: {}",
|
||||
messageId, correlationId, ack);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ack) {
|
||||
log.info("消息发送确认成功 - ID: {}, CorrelationId: {}", messageId, correlationId);
|
||||
log.error("✅ 消息发送确认成功(Broker已接收) - ID: {}, CorrelationId: {}, 回调时间: {}ms",
|
||||
messageId, correlationId, callbackTime);
|
||||
// 完成Future,解除阻塞
|
||||
future.complete(correlationData);
|
||||
boolean completed = future.complete(correlationData);
|
||||
if (!completed) {
|
||||
log.error("⚠️ Future完成失败(可能已被其他线程完成) - ID: {}", messageId);
|
||||
} else {
|
||||
log.error("✅ Future完成成功 - ID: {}, 当前待确认数量: {}", messageId, pendingConfirmations.size());
|
||||
}
|
||||
} else {
|
||||
String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
|
||||
log.error("消息发送确认失败 - ID: {}, CorrelationId: {}, 原因: {}",
|
||||
log.error("❌ 消息发送确认失败(Broker拒绝接收) - ID: {}, CorrelationId: {}, 原因: {}",
|
||||
messageId, correlationId, errorMsg);
|
||||
// 完成Future并传递异常
|
||||
future.completeExceptionally(new RuntimeException(errorMsg));
|
||||
boolean completed = future.completeExceptionally(new RuntimeException(errorMsg));
|
||||
if (!completed) {
|
||||
log.error("⚠️ Future异常完成失败(可能已被其他线程完成) - ID: {}", messageId);
|
||||
} else {
|
||||
log.error("✅ Future异常完成成功 - ID: {}", messageId);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -370,7 +517,6 @@ public class RabbitMQProducerService {
|
|||
*/
|
||||
public void cleanupExpiredConfirmations() {
|
||||
int initialSize = pendingConfirmations.size();
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
pendingConfirmations.entrySet().removeIf(entry -> {
|
||||
CompletableFuture<CorrelationData> future = entry.getValue();
|
||||
|
|
|
|||
Loading…
Reference in New Issue