smart-bid-service/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java

215 lines
8.1 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.bonus.rabbitmq.consumer;
import com.bonus.analysis.service.IASAnalysisService;
import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo;
import com.bonus.common.domain.ocr.dto.AnalysisOcrRequest;
import com.bonus.common.domain.ocr.vo.AnalysisResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.common.utils.FileUtil;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.AnalysisOcrService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* @className:RabbitMQConsumerService
* @author:cwchen
* @date:2025-11-24-16:19
* @version:1.0
* @description:简化版消费者服务 - 手动解析原始消息,保证队列中的每条消息都能被尝试消费
*/
@Component
@Slf4j
public class RabbitMQConsumerService {
private final ObjectMapper objectMapper = new ObjectMapper();
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
@Resource(name = "AnalysisOcrService")
private AnalysisOcrService analysisOcrService;
@Resource(name = "IASAnalysisService")
private IASAnalysisService analysisService;
@RabbitListener(
queues = "myQueue",
containerFactory = "multiConsumerFactory" // 使用上面配置的工厂,保证按顺序消费
)
public void handleMessage(Message amqpMessage, Channel channel) {
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String rawBody = new String(amqpMessage.getBody(), StandardCharsets.UTF_8);
log.info("🎯 RabbitMQConsumerService 收到原始消息 - deliveryTag: {}, body: {}",
deliveryTag, rawBody);
RabbitMqMessage message;
try {
// 手动反序列化 JSON 为 RabbitMqMessage避免类型映射问题导致监听方法不执行
message = objectMapper.readValue(rawBody, RabbitMqMessage.class);
} catch (Exception e) {
log.error("❌ 消息反序列化失败,直接拒绝 - deliveryTag: {}, body: {}",
deliveryTag, rawBody, e);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("basicReject 失败 - deliveryTag: {}", deliveryTag, ioException);
}
return;
}
String messageId = message.getMessageId();
String taskName = message.getTaskName();
try {
log.info("🛠 开始处理消息内容 - ID: {}, 任务: {}, 业务数据: {}",
messageId, taskName, message.getBusinessData());
processBusiness(message);
// 处理成功,手动确认消息
channel.basicAck(deliveryTag, false);
log.info("✅ 消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
} catch (Exception e) {
log.error("❌ 处理消息异常,拒绝消息并不重新入队 - ID: {}, 投递标签: {}",
messageId, deliveryTag, e);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("basicReject 失败 - ID: {}, deliveryTag: {}", messageId, deliveryTag, ioException);
}
}
}
/**
* 处理招标解析算法服务业务
* @param message
* @return void
* @author cwchen
* @date 2025/11/29 13:25
*/
private void processBusiness(RabbitMqMessage message) {
String uploadPath = message.getUploadPath();
File fileFromMinio = getFileFromMinio(uploadPath);
AnalysisResponse ocrResponse = performAnalysisRecognition(fileFromMinio);
String folderPath = Optional.ofNullable(ocrResponse)
.map(AnalysisResponse::getData)
.map(data -> data.get("folder_path"))
.filter(Objects::nonNull)
.map(Object::toString)
.orElse(null);
if(StringUtils.isNotBlank(folderPath)) {
// 一次处理解析成功后执行二次处理
performAnalysisRecognition2(message, folderPath);
}else{
// 解析失败
}
}
/**
* 从minio中获取文件
* @param uploadPath
* @return File
* @author cwchen
* @date 2025/11/29 13:23
*/
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio2(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new RuntimeException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new RuntimeException("获取Minio文件失败: " + uploadPath, e);
}
}
/**
* 调用算法服务
* @param file
* @return AnalysisResponse
* @author cwchen
* @date 2025/11/29 13:30
*/
private AnalysisResponse performAnalysisRecognition(File file) {
try {
AnalysisOcrRequest analysisOcrRequest = buildOcrRequest(file);
AnalysisResponse ocrResponse = analysisOcrService.callOcrService(analysisOcrRequest);
// 修复:检查 招标解析算法服务 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new RuntimeException("招标解析算法服务返回结果为空");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
public void performAnalysisRecognition2(RabbitMqMessage message,String folderPath){
try {
List<AnalysisLabelItemOcrVo> labelItemVoList = analysisService.getAnalysisLabels(message.getAnalysisLabelId(),message.getTemplateId());
AnalysisOcrRequest analysisOcrRequest = buildOcrRequest2(folderPath, labelItemVoList);
AnalysisResponse ocrResponse2 = analysisOcrService.callOcrService(analysisOcrRequest);
// 修复:检查 招标解析算法服务 响应是否为 null
if (Objects.isNull(ocrResponse2)) {
throw new RuntimeException("招标解析算法服务返回结果为空");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse2.getData());
} catch (IOException e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
/**
* 构建招标解析算法服务请求 - 一次处理
* @param file
* @return OcrRequest
* @author cwchen
* @date 2025/11/29 13:29
*/
private AnalysisOcrRequest buildOcrRequest(File file) {
AnalysisOcrRequest ocrRequest = new AnalysisOcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType(FileUtil.getMimeTypeByFilename(file.getName()));
ocrRequest.setAnalysisType("1");
return ocrRequest;
}
/**
* 构建招标解析算法服务请求 - 二次处理
* @return OcrRequest
* @author cwchen
* @date 2025/11/29 13:29
*/
private AnalysisOcrRequest buildOcrRequest2(String filePath,List<AnalysisLabelItemOcrVo> list) {
AnalysisOcrRequest ocrRequest = new AnalysisOcrRequest();
ocrRequest.setCover_keys("");
ocrRequest.setExtraction_items(list);
ocrRequest.setDoc_folder_path(filePath);
ocrRequest.setAnalysisType("2");
return ocrRequest;
}
}