diff --git a/bonus-admin/src/main/resources/application-rabbitmq.yml b/bonus-admin/src/main/resources/application-rabbitmq.yml index d09b34e..8320094 100644 --- a/bonus-admin/src/main/resources/application-rabbitmq.yml +++ b/bonus-admin/src/main/resources/application-rabbitmq.yml @@ -2,9 +2,9 @@ spring: rabbitmq: host: 192.168.0.14 port: 5672 - username: guest - password: guest - virtual-host: / + username: bidAdmin + password: bidAdmin + virtual-host: /smart_bid # 生产者配置 publisher-confirm-type: correlated # 消息确认 publisher-returns: true # 开启返回模式 diff --git a/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java index a1c6e46..9be76cf 100644 --- a/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java +++ b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java @@ -2,10 +2,6 @@ package com.bonus.common.domain.rabbitmq.dto; import lombok.Data; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.NoArgsConstructor; - import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -15,9 +11,6 @@ import java.util.Map; * @description: RabbitMQ 消息实体类 */ @Data -@Builder -@NoArgsConstructor -@AllArgsConstructor public class RabbitMqMessage implements Serializable { private static final long serialVersionUID = 1L; @@ -40,220 +33,11 @@ public class RabbitMqMessage implements Serializable { /** * 消息创建时间戳 */ - @Builder.Default - private Long timestamp = System.currentTimeMillis(); - - /** - * 消息来源系统 - */ - private String sourceSystem; - - /** - * 消息目标系统 - */ - private String targetSystem; - - /** - * 消息优先级(0-9,数值越大优先级越高) - */ - @Builder.Default - private Integer priority = 5; - - /** - * 重试次数 - */ - @Builder.Default - private Integer retryCount = 0; - - /** - * 最大重试次数 - */ - @Builder.Default - private Integer maxRetryCount = 3; - - /** - * 消息过期时间(毫秒时间戳) - */ - private Long expirationTime; + private Long timestamp = System.nanoTime(); /** * 业务数据(扩展字段,存储额外的业务信息) */ - @Builder.Default private Map businessData = new HashMap<>(); - /** - * 消息状态 - */ - private String status; - - /** - * 错误信息(处理失败时记录) - */ - private String errorMessage; - - /** - * 处理开始时间 - */ - private Long processStartTime; - - /** - * 处理结束时间 - */ - private Long processEndTime; - - /** - * 版本号(用于乐观锁) - */ - @Builder.Default - private Integer version = 1; - - // 常用任务类型常量 - public static class TaskType { - public static final String OCR_PROCESS = "OCR_PROCESS"; - public static final String FILE_UPLOAD = "FILE_UPLOAD"; - public static final String IMAGE_PROCESS = "IMAGE_PROCESS"; - public static final String DOCUMENT_EXTRACT = "DOCUMENT_EXTRACT"; - public static final String DATA_SYNC = "DATA_SYNC"; - public static final String NOTIFICATION = "NOTIFICATION"; - } - - // 消息状态常量 - public static class Status { - public static final String PENDING = "PENDING"; - public static final String PROCESSING = "PROCESSING"; - public static final String SUCCESS = "SUCCESS"; - public static final String FAILED = "FAILED"; - public static final String RETRYING = "RETRYING"; - public static final String EXPIRED = "EXPIRED"; - } - - /** - * 添加业务数据 - */ - public void addBusinessData(String key, Object value) { - if (this.businessData == null) { - this.businessData = new HashMap<>(); - } - this.businessData.put(key, value); - } - - /** - * 获取业务数据 - */ - public Object getBusinessData(String key) { - return this.businessData != null ? this.businessData.get(key) : null; - } - - /** - * 增加重试次数 - */ - public void incrementRetryCount() { - this.retryCount++; - } - - /** - * 检查是否达到最大重试次数 - */ - public boolean isMaxRetryReached() { - return this.retryCount >= this.maxRetryCount; - } - - /** - * 检查消息是否过期 - */ - public boolean isExpired() { - return this.expirationTime != null && System.currentTimeMillis() > this.expirationTime; - } - - /** - * 设置过期时间(相对时间,单位:毫秒) - */ - public void setExpirationRelative(long ttlMillis) { - this.expirationTime = System.currentTimeMillis() + ttlMillis; - } - - /** - * 开始处理 - */ - public void startProcess() { - this.status = Status.PROCESSING; - this.processStartTime = System.currentTimeMillis(); - } - - /** - * 处理成功 - */ - public void processSuccess() { - this.status = Status.SUCCESS; - this.processEndTime = System.currentTimeMillis(); - } - - /** - * 处理失败 - */ - public void processFailed(String errorMessage) { - this.status = Status.FAILED; - this.errorMessage = errorMessage; - this.processEndTime = System.currentTimeMillis(); - } - - /** - * 重试处理 - */ - public void retryProcess() { - this.status = Status.RETRYING; - incrementRetryCount(); - } - - /** - * 计算处理耗时(毫秒) - */ - public Long getProcessDuration() { - if (processStartTime != null && processEndTime != null) { - return processEndTime - processStartTime; - } - return null; - } - - /** - * 创建简单的消息(快速构建) - */ - public static RabbitMqMessage createSimple(String taskName, String uploadPath) { - return RabbitMqMessage.builder() - .taskName(taskName) - .messageId(java.util.UUID.randomUUID().toString()) - .uploadPath(uploadPath) - .timestamp(System.currentTimeMillis()) - .status(Status.PENDING) - .build(); - } - - /** - * 创建带业务数据的消息 - */ - public static RabbitMqMessage createWithBusinessData(String taskName, String uploadPath, - Map businessData) { - return RabbitMqMessage.builder() - .taskName(taskName) - .messageId(java.util.UUID.randomUUID().toString()) - .uploadPath(uploadPath) - .timestamp(System.currentTimeMillis()) - .businessData(businessData != null ? new HashMap<>(businessData) : new HashMap<>()) - .status(Status.PENDING) - .build(); - } - - @Override - public String toString() { - return "RabbitMqMessage{" + - "taskName='" + taskName + '\'' + - ", messageId='" + messageId + '\'' + - ", uploadPath='" + uploadPath + '\'' + - ", timestamp=" + timestamp + - ", status='" + status + '\'' + - ", retryCount=" + retryCount + - ", maxRetryCount=" + maxRetryCount + - '}'; - } } diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java index 3950bee..3cfddbb 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/config/RabbitMQConfig.java @@ -3,16 +3,11 @@ package com.bonus.rabbitmq.config; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; -import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; -import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; -import org.springframework.amqp.rabbit.retry.MessageRecoverer; -import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; @@ -22,7 +17,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.retry.annotation.EnableRetry; import org.springframework.retry.backoff.ExponentialBackOffPolicy; -import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; @@ -107,15 +101,18 @@ public class RabbitMQConfig { factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(jsonMessageConverter); - // 更改为手动确认模式,确保按顺序一个一个处理 + // 手动确认模式 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + // 当监听方法抛异常时,不重新入队,防止单条“坏消息”一直卡住后续消息 + factory.setDefaultRequeueRejected(false); factory.setMissingQueuesFatal(false); factory.setAutoStartup(true); - factory.setConcurrentConsumers(1); // 设置为1,确保只有一个消费者处理消息 - factory.setMaxConcurrentConsumers(1); // 最大消费者数为1 - factory.setPrefetchCount(1); // 每次只取一条消息 + // 为了便于排查问题,先强制顺序单线程消费;确认一条消息后再取下一条 + factory.setConcurrentConsumers(1); + factory.setMaxConcurrentConsumers(1); + factory.setPrefetchCount(1); - log.info("多消费者容器工厂配置完成 - 使用手动确认模式"); + log.info("多消费者容器工厂配置完成 - 手动确认模式,单线程顺序消费,defaultRequeueRejected=false"); return factory; } diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java index b3be051..d176c8d 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java @@ -1,249 +1,81 @@ package com.bonus.rabbitmq.consumer; -import com.bonus.common.domain.ocr.dto.OcrRequest; -import com.bonus.common.domain.ocr.vo.OcrResponse; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import com.bonus.file.config.MinioConfig; -import com.bonus.file.util.MinioUtil; -import com.bonus.ocr.service.OcrService; +import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.amqp.support.AmqpHeaders; -import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; -import javax.annotation.Resource; -import java.io.File; import java.io.IOException; -import java.util.Objects; +import java.nio.charset.StandardCharsets; /** * @className:RabbitMQConsumerService * @author:cwchen * @date:2025-11-24-16:19 * @version:1.0 - * @description:消费者服务 + * @description:简化版消费者服务 - 手动解析原始消息,保证队列中的每条消息都能被尝试消费 */ -@Component(value = "") +@Component @Slf4j public class RabbitMQConsumerService { - @Resource - private OcrService ocrService; - - @Resource - private MinioConfig minioConfig; - - @Resource - private MinioUtil minioUtil; + private final ObjectMapper objectMapper = new ObjectMapper(); @RabbitListener( queues = "myQueue", - containerFactory = "multiConsumerFactory", // 使用上面配置的工厂,保证按顺序消费 - errorHandler = "rabbitListenerErrorHandler" + containerFactory = "multiConsumerFactory" // 使用上面配置的工厂,保证按顺序消费 ) - public void handleMessage(RabbitMqMessage message, - Channel channel, - @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { + public void handleMessage(Message amqpMessage, Channel channel) { + long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); + String rawBody = new String(amqpMessage.getBody(), StandardCharsets.UTF_8); + + log.info("🎯 RabbitMQConsumerService 收到原始消息 - deliveryTag: {}, body: {}", + deliveryTag, rawBody); + + RabbitMqMessage message; + try { + // 手动反序列化 JSON 为 RabbitMqMessage,避免类型映射问题导致监听方法不执行 + message = objectMapper.readValue(rawBody, RabbitMqMessage.class); + } catch (Exception e) { + log.error("❌ 消息反序列化失败,直接拒绝 - deliveryTag: {}, body: {}", + deliveryTag, rawBody, e); + try { + channel.basicReject(deliveryTag, false); + } catch (IOException ioException) { + log.error("basicReject 失败 - deliveryTag: {}", deliveryTag, ioException); + } + return; + } String messageId = message.getMessageId(); String taskName = message.getTaskName(); - log.info("开始处理消息 - ID: {}, 任务: {}, 投递标签: {}", - messageId, taskName, deliveryTag); - try { - // 根据任务类型进行不同的处理 - switch (taskName) { - case "OCR_PROCESS": - processOcrTask(message); - break; - default: - processDefaultTask(message); - break; - } + log.info("🛠 开始处理消息内容 - ID: {}, 任务: {}, 业务数据: {}", + messageId, taskName, message.getBusinessData()); + + // 模拟少量业务处理耗时,避免长时间阻塞 + /*try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + }*/ // 处理成功,手动确认消息 - channel.basicAck(deliveryTag, false); // 确认消息已处理 - log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag); + channel.basicAck(deliveryTag, false); + log.info("✅ 消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag); - } catch (BusinessException e) { - // 业务异常,记录日志但不重试,直接拒绝消息并不重新入队 - log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e); + } catch (Exception e) { + log.error("❌ 处理消息异常,拒绝消息并不重新入队 - ID: {}, 投递标签: {}", + messageId, deliveryTag, e); try { - channel.basicReject(deliveryTag, false); // 拒绝消息 + channel.basicReject(deliveryTag, false); } catch (IOException ioException) { - log.error("拒绝消息失败 - ID: {}", messageId, ioException); + log.error("basicReject 失败 - ID: {}, deliveryTag: {}", messageId, deliveryTag, ioException); } - } catch (Exception e) { - // 其他异常,触发重试 - log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e); - // 抛出异常,重试拦截器会处理重试 - throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e); - } - } - - - - /** - * 处理OCR任务 - */ - private void processOcrTask(RabbitMqMessage message) { - String messageId = message.getMessageId(); - String filePath = message.getUploadPath(); - String taskId = (String) message.getBusinessData().get("taskId"); - - log.info("处理OCR任务 - ID: {}, 任务ID: {}, 文件: {}", messageId, taskId, filePath); - - try { - // 1. 检查OCR服务可用性 - checkOcrServiceAvailability(); - - // 2. 执行OCR处理 - String ocrResult = performOcrProcessing(filePath); - - // 3. 保存OCR结果 - saveOcrResult(taskId, ocrResult); - - // 4. 更新任务状态 - updateOcrTaskStatus(taskId, "SUCCESS"); - Thread.sleep(1000 * 10); - log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId); - - } catch (OcrServiceException e) { - log.error("OCR服务异常 - ID: {}", messageId, e); - throw new BusinessException("OCR处理失败", e); - } catch (Exception e) { - log.error("OCR任务处理异常 - ID: {}", messageId, e); - throw new RuntimeException("OCR处理异常,需要重试", e); - } - } - - /** - * 处理默认任务 - */ - private void processDefaultTask(RabbitMqMessage message) { - String messageId = message.getMessageId(); - String taskName = message.getTaskName(); - - log.info("处理默认任务 - ID: {}, 任务名: {}", messageId, taskName); - - // 模拟业务处理 - try { - // 这里添加实际的任务处理逻辑 - performBusinessLogic(message); - - log.info("默认任务处理完成 - ID: {}", messageId); - - } catch (Exception e) { - log.error("默认任务处理失败 - ID: {}", messageId, e); - throw new RuntimeException("任务处理失败", e); - } - } - - private void checkOcrServiceAvailability() { - // 模拟OCR服务检查 - log.debug("检查OCR服务可用性"); - // 实际检查逻辑... - } - - private String performOcrProcessing(String filePath) { - // 模拟OCR处理 - log.debug("执行OCR处理: {}", filePath); -// String uploadPath = filePath; -// File fileFromMinio = getFileFromMinio(uploadPath); -// File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf"); -// OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); -// log.info("ocrResponse响应结果:{}", ocrResponse); - return "OCR处理结果"; - } - - private OcrResponse performOcrRecognition(File file) { - try { - OcrRequest ocrRequest = buildOcrRequest(file); -// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest); - OcrResponse ocrResponse = null; - // 修复:检查 OCR 响应是否为 null - if (Objects.isNull(ocrResponse)) { - throw new BusinessException("OCR服务返回结果为空"); - } - - if (!isOcrResponseValid(ocrResponse)) { - throw new BusinessException("OCR识别结果无效"); - } - - log.info("OCR识别成功 - 数据: {}", ocrResponse.getData()); - return ocrResponse; - - } catch (Exception e) { - log.error("OCR识别失败", e); - throw new RuntimeException("OCR识别失败: " + e.getMessage(), e); - } - } - - private boolean isOcrResponseValid(OcrResponse ocrResponse) { - return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty(); - } - - private File getFileFromMinio(String uploadPath) { - try { - File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath); - if (file == null || !file.exists()) { - throw new OcrServiceException("Minio文件不存在: " + uploadPath); - } - return file; - } catch (Exception e) { - throw new OcrServiceException("获取Minio文件失败: " + uploadPath, e); - } - } - - private OcrRequest buildOcrRequest(File file) { - OcrRequest ocrRequest = new OcrRequest(); - ocrRequest.setFile(file); - ocrRequest.setType("application/pdf"); - ocrRequest.setFields_json("项目名称,合同签订时间,合同金额,建设地点,建设单位,建设单位电话,建设单位开户行,建设单位账号"); - return ocrRequest; - } - - private void saveOcrResult(String taskId, String result) { - // 模拟保存OCR结果 - log.debug("保存OCR结果 - 任务ID: {}", taskId); - } - - private void updateOcrTaskStatus(String taskId, String status) { - // 模拟更新OCR任务状态 - log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status); - } - - private void performBusinessLogic(RabbitMqMessage message) { - // 模拟业务逻辑执行 - log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId()); - } - - /** - * 自定义业务异常 - */ - public static class BusinessException extends RuntimeException { - public BusinessException(String message) { - super(message); - } - - public BusinessException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * OCR服务异常 - */ - public static class OcrServiceException extends RuntimeException { - public OcrServiceException(String message) { - super(message); - } - - public OcrServiceException(String message, Throwable cause) { - super(message, cause); } } } \ No newline at end of file diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java deleted file mode 100644 index 557b4f6..0000000 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/message/RabbitMqMessageBuilder.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.bonus.rabbitmq.message; - -import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import java.util.Map; -import java.util.UUID; - -/** - * @className: RabbitMqMessageBuilder - * @description: RabbitMQ 消息构建器 - */ -@Component -@Slf4j -public class RabbitMqMessageBuilder { - - /** - * 构建基础消息 - */ - public RabbitMqMessage buildBaseMessage(String taskName, String uploadPath) { - return RabbitMqMessage.createSimple(taskName, uploadPath); - } - - /** - * 构建带自定义消息ID的消息 - */ - public RabbitMqMessage buildMessageWithCustomId(String taskName, String uploadPath, String customMessageId) { - return RabbitMqMessage.builder() - .messageId(customMessageId) - .taskName(taskName) - .uploadPath(uploadPath) - .timestamp(System.currentTimeMillis()) - .status(RabbitMqMessage.Status.PENDING) - .build(); - } - - /** - * 构建文件处理消息 - */ - public RabbitMqMessage buildFileProcessMessage(String filePath) { - return RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.FILE_UPLOAD, filePath); - } - - /** - * 构建OCR处理消息 - */ - public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) { - RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath); - message.addBusinessData("taskId", taskId); - message.addBusinessData("processType", "OCR_PROCESS"); - return message; - } - - /** - * 构建带重试配置的消息 - */ - public RabbitMqMessage buildMessageWithRetryConfig(String taskName, String uploadPath, int maxRetryCount) { - return RabbitMqMessage.builder() - .taskName(taskName) - .messageId(UUID.randomUUID().toString()) - .uploadPath(uploadPath) - .timestamp(System.currentTimeMillis()) - .maxRetryCount(maxRetryCount) - .status(RabbitMqMessage.Status.PENDING) - .build(); - } - - /** - * 构建高优先级消息 - */ - public RabbitMqMessage buildHighPriorityMessage(String taskName, String uploadPath) { - return RabbitMqMessage.builder() - .taskName(taskName) - .messageId(UUID.randomUUID().toString()) - .uploadPath(uploadPath) - .timestamp(System.currentTimeMillis()) - .priority(9) // 最高优先级 - .status(RabbitMqMessage.Status.PENDING) - .build(); - } - - /** - * 构建带过期时间的消息 - */ - public RabbitMqMessage buildMessageWithTtl(String taskName, String uploadPath, long ttlMillis) { - RabbitMqMessage message = RabbitMqMessage.createSimple(taskName, uploadPath); - message.setExpirationRelative(ttlMillis); - return message; - } - - /** - * 构建完整业务消息 - */ - public RabbitMqMessage buildBusinessMessage(String taskName, String uploadPath, - String sourceSystem, String targetSystem, - Map businessData) { - RabbitMqMessage message = RabbitMqMessage.createWithBusinessData(taskName, uploadPath, businessData); - message.setSourceSystem(sourceSystem); - message.setTargetSystem(targetSystem); - return message; - } - - /** - * 构建图片处理消息 - */ - public RabbitMqMessage buildImageProcessMessage(String imagePath, String processType) { - RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.IMAGE_PROCESS, imagePath); - message.addBusinessData("processType", processType); - message.addBusinessData("imageFormat", getFileExtension(imagePath)); - return message; - } - - /** - * 构建文档提取消息 - */ - public RabbitMqMessage buildDocumentExtractMessage(String documentPath, String extractType) { - RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.DOCUMENT_EXTRACT, documentPath); - message.addBusinessData("extractType", extractType); - message.addBusinessData("documentType", getFileExtension(documentPath)); - return message; - } - - /** - * 获取文件扩展名 - */ - private String getFileExtension(String filePath) { - if (filePath == null || filePath.lastIndexOf(".") == -1) { - return "unknown"; - } - return filePath.substring(filePath.lastIndexOf(".") + 1).toLowerCase(); - } - - /** - * 生成文件相关的消息ID - */ - private String generateFileMessageId(String taskName, String filePath) { - String fileHash = Integer.toHexString(filePath.hashCode()); - return taskName + "_" + fileHash + "_" + System.currentTimeMillis(); - } -} \ No newline at end of file diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java index 599f358..f0ead74 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducerService.java @@ -1,7 +1,6 @@ package com.bonus.rabbitmq.producer; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import com.bonus.rabbitmq.message.RabbitMqMessageBuilder; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -14,6 +13,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -21,326 +21,58 @@ import java.util.concurrent.TimeoutException; * @author:cwchen * @date:2025-11-24-16:16 * @version:1.0 - * @description:生产者服务 - 修复版本 + * @description:生产者服务 - 简化版本(无重试机制) */ @Service(value = "RabbitMQProducerService") @Slf4j public class RabbitMQProducerService { private final RabbitTemplate rabbitTemplate; - private final RabbitMqMessageBuilder messageBuilder; private final Map> pendingConfirmations; - public RabbitMQProducerService(RabbitTemplate rabbitTemplate, - RabbitMqMessageBuilder messageBuilder) { + // 配置常量 + private static final int CONFIRMATION_TIMEOUT_SECONDS = 60; + private static final String EXCHANGE_NAME = "myExchange"; + private static final String ROUTING_KEY = "myRoutingKey"; + + public RabbitMQProducerService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; - this.messageBuilder = messageBuilder; this.pendingConfirmations = new ConcurrentHashMap<>(); // 设置确认回调 - RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback(); - this.rabbitTemplate.setConfirmCallback(confirmCallback); - + this.rabbitTemplate.setConfirmCallback(createConfirmCallback()); // 设置返回回调(用于处理路由失败) - RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback(); - this.rabbitTemplate.setReturnsCallback(returnsCallback); + this.rabbitTemplate.setReturnsCallback(createReturnsCallback()); } /** - * 异步发送消息 - 基础版本 - */ - public CompletableFuture sendMessageAsync(String taskName, String uploadPath) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 异步发送消息 - 带自定义ID - */ - public CompletableFuture sendMessageWithCustomIdAsync(String taskName, - String uploadPath, - String customMessageId) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 异步发送OCR处理消息 - 修复版本 - */ - public CompletableFuture sendOcrMessageAsync(String filePath, String taskId) { - return CompletableFuture.supplyAsync(() -> { - String messageId = null; - try { - RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); - messageId = message.getMessageId(); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}", - taskId, filePath, messageId, e.getMessage(), e); - // 触发补偿机制 - handleOcrMessageSendFailure(taskId, filePath, messageId, e); - throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e); - } - }); - } - - /** - * 处理OCR消息发送失败 - */ - private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) { - // 这里可以实现补偿逻辑,比如: - // 1. 记录到失败表 - // 2. 触发告警 - // 3. 尝试其他通知方式 - // 4. 更新任务状态为发送失败 - } - - /** - * 异步发送带重试配置的消息 - */ - public CompletableFuture sendMessageWithRetryAsync(String taskName, - String uploadPath, - int maxRetryCount) { - return CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e); - throw new RuntimeException("消息发送失败", e); - } - }); - } - - /** - * 批量异步发送消息 - */ - public CompletableFuture> sendBatchMessagesAsync(List tasks) { - return CompletableFuture.supplyAsync(() -> { - List messageIds = new ArrayList<>(); - List> futures = new ArrayList<>(); - - for (MessageTask task : tasks) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath()); - return sendMessageWithConfirmation(message); - } catch (Exception e) { - log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e); - return null; - } - }); - futures.add(future); - } - - // 等待所有任务完成 - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - - for (CompletableFuture future : futures) { - try { - String messageId = future.get(); - if (messageId != null) { - messageIds.add(messageId); - } - } catch (Exception e) { - log.error("获取异步发送结果失败", e); - } - } - - log.info("批量发送完成,成功发送 {} 条消息", messageIds.size()); - return messageIds; - }); - } - - /** - * 发送消息并等待确认 - 修复版本(带重试机制) - */ - private String sendMessageWithConfirmation(RabbitMqMessage message) { - String messageId = message.getMessageId(); - int maxRetries = 2; // 最大重试次数 - int retryCount = 0; - - while (retryCount <= maxRetries) { - try { - return attemptSendMessage(message, retryCount); - } catch (TimeoutException e) { - retryCount++; - if (retryCount > maxRetries) { - log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e); - throw new RuntimeException("消息确认超时,重试失败", e); - } - log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId); - try { - // 指数退避策略 - Thread.sleep(1000 * retryCount); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("重试被中断", ie); - } - } catch (Exception e) { - log.error("消息发送异常 - ID: {}", messageId, e); - throw new RuntimeException("消息发送异常", e); - } - } - - throw new RuntimeException("消息发送失败"); - } - - /** - * 尝试发送消息 - * 关键修复:确保确认回调能及时触发,并增加超时时间 - */ - private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception { - String messageId = message.getMessageId(); - - // 为每次尝试创建新的CorrelationData - CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount); - - CompletableFuture confirmationFuture = new CompletableFuture<>(); - - // 使用消息ID作为key,确保重试时能覆盖之前的Future - String futureKey = messageId; - - // 如果之前有未完成的Future,先移除(重试场景) - CompletableFuture oldFuture = pendingConfirmations.remove(futureKey); - if (oldFuture != null && !oldFuture.isDone()) { - oldFuture.cancel(true); - } - - pendingConfirmations.put(futureKey, confirmationFuture); - - try { - // 发送消息 - try { - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); - } catch (Exception sendException) { - log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException); - pendingConfirmations.remove(futureKey, confirmationFuture); - throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException); - } - - // 关键修复:由于确认回调可能不触发,采用短超时 + 备用方案 - // 如果消息发送成功(没有异常),且确认回调在短时间内未触发,则认为成功 - int shortTimeoutSeconds = 3; // 短超时:3秒 - long checkInterval = 50; // 每50ms检查一次,更频繁 - long maxWaitTime = shortTimeoutSeconds * 1000L; - long elapsed = 0; - boolean confirmReceived = false; - - while (elapsed < maxWaitTime) { - if (confirmationFuture.isDone()) { - try { - confirmationFuture.get(); // 获取确认结果 - confirmReceived = true; - break; - } catch (Exception e) { - log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e); - pendingConfirmations.remove(futureKey, confirmationFuture); - throw new RuntimeException("消息确认失败: " + e.getMessage(), e); - } - } - - Thread.sleep(checkInterval); - elapsed += checkInterval; - } - - // 如果确认回调未触发,但消息发送成功,采用备用方案 - if (!confirmReceived) { - Thread.sleep(500); // 再等待500ms - - if (confirmationFuture.isDone()) { - try { - confirmationFuture.get(); - confirmReceived = true; - } catch (Exception e) { - // 忽略延迟确认回调的异常 - } - } - - if (!confirmReceived) { - // 备用方案:消息发送成功(没有异常),认为成功 - confirmationFuture.complete(correlationData); - confirmReceived = true; - } - } - - if (confirmReceived) { - // 确认成功后移除Future - pendingConfirmations.remove(futureKey, confirmationFuture); - return messageId; - } - - // 超时处理(如果备用方案也失败) - log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount); - - // 延迟移除Future,给确认回调一个机会 - CompletableFuture.runAsync(() -> { - try { - Thread.sleep(3000); - pendingConfirmations.remove(futureKey); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - }); - - // 抛出TimeoutException,让外层重试机制处理 - throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId()); - - } catch (TimeoutException e) { - // 重新抛出TimeoutException,不在这里处理 - throw e; - } catch (Exception e) { - log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}", - messageId, retryCount, e.getMessage(), e); - // 异常时移除对应的Future - if (futureKey != null) { - pendingConfirmations.remove(futureKey, confirmationFuture); - } - throw e; - } - } - - /** - * 创建确认回调 - 修复版本(关键修复) - * 注意:RabbitMQ的确认回调是异步的,确认的是消息是否被Broker接收,不是是否被消费 - * 关键:确认回调应该在消息发送到Broker后立即触发,而不是在消费者消费时 + * 创建确认回调 */ private RabbitTemplate.ConfirmCallback createConfirmCallback() { return (correlationData, ack, cause) -> { if (correlationData == null || correlationData.getId() == null) { + log.warn("收到空的确认回调数据"); return; } - String correlationId = correlationData.getId(); - String messageId = extractOriginalMessageId(correlationId); + String messageId = correlationData.getId(); CompletableFuture future = pendingConfirmations.get(messageId); if (future == null) { // Future可能已经被移除(超时或异常),这是正常的 + log.debug("确认回调对应的Future不存在 - ID: {}", messageId); return; } // 检查Future是否已经完成(防止重复完成) if (future.isDone()) { + log.debug("确认回调对应的Future已完成 - ID: {}", messageId); return; } if (ack) { future.complete(correlationData); + log.debug("消息确认成功 - ID: {}", messageId); } else { String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因"); log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg); @@ -350,17 +82,7 @@ public class RabbitMQProducerService { } /** - * 从correlationId中提取原始messageId - */ - private String extractOriginalMessageId(String correlationId) { - if (correlationId.contains("_attempt_")) { - return correlationId.substring(0, correlationId.indexOf("_attempt_")); - } - return correlationId; - } - - /** - * 创建返回回调(处理路由失败) - 修复版本 + * 创建返回回调(处理路由失败) */ private RabbitTemplate.ReturnsCallback createReturnsCallback() { return returned -> { @@ -370,28 +92,26 @@ public class RabbitMQProducerService { String correlationId = returned.getMessage().getMessageProperties().getCorrelationId(); if (correlationId != null) { - String messageId = extractOriginalMessageId(correlationId); - CompletableFuture future = pendingConfirmations.get(messageId); + CompletableFuture future = pendingConfirmations.get(correlationId); if (future != null) { String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s", returned.getExchange(), returned.getRoutingKey(), returned.getReplyText()); future.completeExceptionally(new RuntimeException(errorMsg)); - pendingConfirmations.remove(messageId, future); + pendingConfirmations.remove(correlationId, future); } } }; } /** - * 同步发送消息(备用方法) + * 同步发送消息 */ public String sendMessageSync(RabbitMqMessage message) { try { String messageId = message.getMessageId(); CorrelationData correlationData = new CorrelationData(messageId); - - rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); - + log.info("Sending message: {}", message); + rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData); return messageId; } catch (Exception e) { log.error("同步发送消息失败: {}", e.getMessage(), e); @@ -399,32 +119,5 @@ public class RabbitMQProducerService { } } - /** - * 获取未确认的消息数量(监控用) - */ - public int getPendingConfirmationsCount() { - return pendingConfirmations.size(); - } - /** - * 清理过期的确认Future(防止内存泄漏) - */ - public void cleanupExpiredConfirmations() { - pendingConfirmations.entrySet().removeIf(entry -> { - CompletableFuture future = entry.getValue(); - // 这里可以根据业务需求添加更复杂的过期判断逻辑 - // 例如:如果Future已经创建超过30分钟且未完成,则移除 - return future.isDone(); - }); - } - - /** - * 任务数据类 - */ - @Data - @AllArgsConstructor - public static class MessageTask { - private String taskName; - private String uploadPath; - } } \ No newline at end of file diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java index 723ff39..bf1a113 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java @@ -1,5 +1,6 @@ package com.bonus.rabbitmq.service; +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import com.bonus.rabbitmq.producer.RabbitMQProducerService; import org.springframework.stereotype.Service; @@ -24,7 +25,11 @@ public class SendRabbitMqService { for (Map map : list) { String taskId = map.get("taskId").toString(); String uploadPath = map.get("uploadPath").toString(); - rabbitMQProducerService.sendOcrMessageAsync(taskId, uploadPath); + RabbitMqMessage mqMessage = new RabbitMqMessage(); + mqMessage.setMessageId(taskId); + mqMessage.setUploadPath(uploadPath); + mqMessage.setTaskName("OCR_PROCESS"); + rabbitMQProducerService.sendMessageSync(mqMessage); } } }