diff --git a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java index f0db0cf..37a2c85 100644 --- a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java +++ b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java @@ -178,6 +178,7 @@ public class AnalysisService { msg.setProId(dto.getProId()); msg.setTemplateId(dto.getTemplateId()); msg.setAnalysisLabelId(dto.getAnalysisLabelId()); + msg.setCompositionType(1); msg.setTaskName("OCR_PROCESS"); asyncTaskList.add(msg); } @@ -381,6 +382,7 @@ public class AnalysisService { msg.setUploadPath(dto.getFiles().get(i).getFilePath()); msg.setProId(dto.getProId()); msg.setBidId(dto.getBidId()); + msg.setCompositionType(2); msg.setTaskName("OCR_PROCESS"); asyncTaskList.add(msg); } diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java b/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java index f30fbe8..3f5da9e 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java @@ -7,6 +7,7 @@ import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.vo.AnalysisBidVo; import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo; import com.bonus.common.domain.analysis.vo.AnalysisVo; +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import org.apache.ibatis.annotations.Param; import org.springframework.stereotype.Repository; @@ -146,4 +147,31 @@ public interface IASAnalysisMapper { * @date 2025/11/30 13:29 */ List getBidCompositionByBid(AnalysisVo analysisVo); + + /** + * 更新项目或者标段的文件组成的解析状态 + * @param message + * @return void + * @author cwchen + * @date 2025/11/30 14:37 + */ + void updateProCompositionState(RabbitMqMessage message); + + /** + * 更新项目或者标段的解析状态 + * @param message + * @return void + * @author cwchen + * @date 2025/11/30 14:52 + */ + void updateProOrBidState(RabbitMqMessage message); + + /** + * 查询项目或标段的组成文件的解析状态 + * @param message + * @return void + * @author cwchen + * @date 2025/11/30 15:03 + */ + List getProCompositionState(RabbitMqMessage message); } diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java b/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java index 326f016..7801b6e 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java @@ -7,6 +7,7 @@ import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.vo.AnalysisBidVo; import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo; import com.bonus.common.domain.analysis.vo.AnalysisVo; +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import java.util.List; @@ -144,4 +145,31 @@ public interface IASAnalysisService { * @date 2025/11/30 13:28 */ List getBidCompositionByBid(AnalysisVo analysisVo); + + /** + * 更新项目或者标段的文件组成的解析状态 + * @param message + * @return void + * @author cwchen + * @date 2025/11/30 14:36 + */ + void updateProCompositionState(RabbitMqMessage message); + + /** + * 更新项目或者标段的解析状态 + * @param message + * @return void + * @author cwchen + * @date 2025/11/30 14:51 + */ + void updateProOrBidState(RabbitMqMessage message); + + /** + * 查询项目或标段的组成文件的解析状态 + * @param message + * @return List + * @author cwchen + * @date 2025/11/30 15:02 + */ + List getProCompositionState(RabbitMqMessage message); } diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java b/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java index 072c948..3964ee3 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java @@ -9,6 +9,7 @@ import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.vo.AnalysisBidVo; import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo; import com.bonus.common.domain.analysis.vo.AnalysisVo; +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -119,4 +120,19 @@ public class ASAnalysisServiceImpl implements IASAnalysisService { return new ArrayList<>(); } } + + @Override + public void updateProCompositionState(RabbitMqMessage message) { + analysisMapper.updateProCompositionState(message); + } + + @Override + public void updateProOrBidState(RabbitMqMessage message) { + analysisMapper.updateProOrBidState(message); + } + + @Override + public List getProCompositionState(RabbitMqMessage message) { + return analysisMapper.getProCompositionState(message); + } } diff --git a/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml b/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml index e0e6417..d90a7b8 100644 --- a/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml +++ b/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml @@ -139,6 +139,20 @@ FROM tb_template_composition WHERE template_id = #{templateId} AND composition_type = '2' + + + + @@ -159,4 +173,22 @@ DELETE FROM tb_pro_bid_analysis_result WHERE pro_id = #{proId} + + + + UPDATE tb_pro_composition SET analysis_state = #{analysisState} WHERE pro_id = #{proId} AND composition_type = '1' + + + UPDATE tb_pro_composition SET analysis_state = #{analysisState} WHERE pro_id = #{bidId} AND composition_type = '2' + + + + + + UPDATE tb_pro SET analysis_status = #{analysisState} WHERE pro_id = #{proId} + + + UPDATE tb_pro_bid SET parsing_state = #{analysisState} WHERE bid_id = #{bidId} + + diff --git a/bonus-common/src/main/java/com/bonus/common/domain/analysis/po/ProComposition.java b/bonus-common/src/main/java/com/bonus/common/domain/analysis/po/ProComposition.java index b00fb2f..74d8d49 100644 --- a/bonus-common/src/main/java/com/bonus/common/domain/analysis/po/ProComposition.java +++ b/bonus-common/src/main/java/com/bonus/common/domain/analysis/po/ProComposition.java @@ -27,6 +27,9 @@ public class ProComposition { /**模板组成类型 1.项目文件 2.标段/标包文件*/ private String compositionType; + /**解析状态 0.解析中 1.解析成功 2.解析失败*/ + private String analysisState; + /**文件*/ List fileVoList; } diff --git a/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java index b15c135..1155d00 100644 --- a/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java +++ b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java @@ -60,4 +60,14 @@ public class RabbitMqMessage implements Serializable { */ private Map businessData = new HashMap<>(); + /** + * 模板组成类型 1.项目文件 2.标段文件 + * */ + private int compositionType; + + /** + * 解析状态 0.待解析 1.解析成功 2.解析失败 + * */ + private String analysisState; + } diff --git a/bonus-ocr/src/main/java/com/bonus/ocr/service/AnalysisOcrService.java b/bonus-ocr/src/main/java/com/bonus/ocr/service/AnalysisOcrService.java index 11ae4b3..5287520 100644 --- a/bonus-ocr/src/main/java/com/bonus/ocr/service/AnalysisOcrService.java +++ b/bonus-ocr/src/main/java/com/bonus/ocr/service/AnalysisOcrService.java @@ -146,27 +146,6 @@ public class AnalysisOcrService { return httpPost; } - /** - * 创建HTTP POST请求2 - */ - /*private HttpPost createHttpPost2(AnalysisOcrRequest analysisOcrRequest) { - HttpPost httpPost = new HttpPost(extractInfoUrl); - - try { - // 将请求对象转换为JSON字符串 - String jsonRequest = convertToJson(analysisOcrRequest); - // 设置请求体为JSON - StringEntity entity = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON); - httpPost.setEntity(entity); - // 设置请求头 - httpPost.setHeader("Accept", "application/json"); - httpPost.setHeader("Content-Type", "application/json; charset=UTF-8"); - } catch (Exception e) { - throw new RuntimeException("创建HTTP POST请求失败", e); - } - return httpPost; - }*/ - private HttpPost createHttpPost2(AnalysisOcrRequest analysisOcrRequest) { HttpPost httpPost = new HttpPost(extractInfoUrl); diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumer.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumer.java deleted file mode 100644 index 79ca9fe..0000000 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumer.java +++ /dev/null @@ -1,367 +0,0 @@ -/* -package com.bonus.web.rabbitmq.consumer; - -import com.bonus.common.domain.ocr.dto.OcrRequest; -import com.bonus.common.domain.ocr.vo.OcrResponse; -import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import com.bonus.file.config.MinioConfig; -import com.bonus.file.util.MinioUtil; -import com.bonus.ocr.service.OcrService; -import com.bonus.web.rabbitmq.config.RabbitMQConfig; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.rabbitmq.client.Channel; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.amqp.AmqpRejectAndDontRequeueException; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ThreadLocalRandom; - -*/ -/** - * @className:RabbitMQConsumer - * @author:cwchen - * @date:2025-11-13-13:32 - * @version:1.0 - * @description:mq消费者 - *//* - -@Component(value = "RabbitMQConsumer") -@Slf4j -public class RabbitMQConsumer { - - @Resource - private OcrService ocrService; - - @Resource - private MinioConfig minioConfig; - - @Resource - private MinioUtil minioUtil; - - private final ObjectMapper objectMapper = new ObjectMapper(); - - */ -/** - * 主队列消息处理 - * 优化:确保异常处理逻辑正确,消息确认机制清晰 - * 关键:在手动确认模式下,成功时手动ACK,失败时抛出异常让重试机制处理 - *//* - - @RabbitListener(queues = "myQueue", containerFactory = "multiConsumerFactory") - public void handleMessage(org.springframework.amqp.core.Message amqpMessage, - Channel channel) throws IOException { - - RabbitMqMessage rabbitMq = null; - long deliveryTag = 0; - String messageId = "unknown"; - - try { - // 解析消息 - rabbitMq = parseMessage(amqpMessage); - deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); - messageId = rabbitMq.getMessageId(); - - log.info("开始处理消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName()); - - // 业务处理 - processBusiness(rabbitMq); - - // 处理成功 - 确认消息 - channel.basicAck(deliveryTag, false); - log.info("✅ 消息处理完成并确认 - ID: {}", messageId); - - } catch (Exception e) { - log.error("❌ 消息处理异常 - ID: {}, deliveryTag: {}, 异常信息: {}", - messageId, deliveryTag, e.getMessage(), e); - - // 检查是否是 AmqpRejectAndDontRequeueException(重试次数耗尽,消息已发送到死信队列) - AmqpRejectAndDontRequeueException rejectException = findRejectException(e); - - if (rejectException != null) { - // 关键修复:消息已发送到死信队列,需要手动拒绝原消息 - log.error("🚫 重试次数耗尽,消息已发送到死信队列,准备拒绝原消息 - ID: {}, deliveryTag: {}", - messageId, deliveryTag); - - try { - if (deliveryTag > 0) { - // 手动拒绝消息(NACK,不重新入队) - // requeue=false 确保消息不会重新入队,服务重启后不会重新处理 - channel.basicNack(deliveryTag, false, false); - log.error("✅ 原消息已永久拒绝并从主队列中删除 - ID: {}, deliveryTag: {}。服务重启后不会重新处理此消息。", - messageId, deliveryTag); - // 消息已经被处理(发送到死信队列并永久拒绝),不重新抛出异常 - return; - } else { - log.error("❌ deliveryTag 无效,无法拒绝消息 - ID: {}, deliveryTag: {}", - messageId, deliveryTag); - } - } catch (IOException ioException) { - log.error("❌ 拒绝消息失败 - ID: {}, deliveryTag: {}, 异常: {}", - messageId, deliveryTag, ioException.getMessage(), ioException); - } - // 即使拒绝失败,也不重新抛出异常,避免重复处理 - return; - } - - // 其他异常:抛出异常触发重试机制 - throw new RuntimeException("消息处理失败: " + e.getMessage(), e); - } - } - - */ -/** - * 递归查找 AmqpRejectAndDontRequeueException - *//* - - private AmqpRejectAndDontRequeueException findRejectException(Throwable e) { - if (e == null) { - return null; - } - - if (e instanceof AmqpRejectAndDontRequeueException) { - return (AmqpRejectAndDontRequeueException) e; - } - - // 检查异常消息中是否包含死信队列相关的关键字 - String errorMessage = e.getMessage(); - if (errorMessage != null && - (errorMessage.contains("死信队列") || - errorMessage.contains("重试次数耗尽") || - errorMessage.contains("消息已发送到死信队列"))) { - // 递归查找异常链 - Throwable cause = e.getCause(); - int depth = 0; - while (cause != null && depth < 10) { - if (cause instanceof AmqpRejectAndDontRequeueException) { - return (AmqpRejectAndDontRequeueException) cause; - } - cause = cause.getCause(); - depth++; - } - } - - // 递归查找 cause - return findRejectException(e.getCause()); - } - - */ -/** - * 死信队列消息处理 - 监听重试次数耗尽的消息 - *//* - - @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE, containerFactory = "deadLetterContainerFactory") - public void handleDeadLetterMessage(org.springframework.amqp.core.Message amqpMessage, - Channel channel) throws IOException { - - RabbitMqMessage rabbitMq = parseMessage(amqpMessage); - long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); - String messageId = rabbitMq.getMessageId(); - - try { - log.error("🚨 收到死信消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName()); - - // 处理死信消息:记录日志、发送告警、人工干预等 - processDeadLetterMessage(rabbitMq, amqpMessage); - - // 确认死信消息 - channel.basicAck(deliveryTag, false); - log.info("死信消息处理完成 - ID: {}", messageId); - - } catch (Exception e) { - log.error("死信消息处理异常 - ID: {}", messageId, e); - // 死信消息处理失败,可以选择重新入队或丢弃 - channel.basicNack(deliveryTag, false, false); - } - } - - */ -/** - * 处理死信消息 - *//* - - private void processDeadLetterMessage(RabbitMqMessage message, org.springframework.amqp.core.Message amqpMessage) { - String messageId = message.getMessageId(); - - try { - // 1. 记录死信消息到数据库或日志文件 - logDeadLetterToDatabase(message); - - // 2. 发送告警通知 - sendDeadLetterAlert(message); - - // 3. 记录详细错误信息 - Map deadLetterInfo = collectDeadLetterInfo(message, amqpMessage); - log.error("死信消息详情 - ID: {}, 信息: {}", messageId, deadLetterInfo); - - } catch (Exception e) { - log.error("处理死信消息时发生异常 - ID: {}", messageId, e); - } - } - - */ -/** - * 记录死信消息到数据库(示例) - *//* - - private void logDeadLetterToDatabase(RabbitMqMessage message) { - // 这里可以集成数据库操作,记录死信消息 - // 例如:messageId, taskName, uploadPath, 失败时间, 失败原因等 - log.warn("记录死信消息到数据库 - ID: {}, 任务: {}", - message.getMessageId(), message.getTaskName()); - } - - */ -/** - * 发送死信告警 - *//* - - private void sendDeadLetterAlert(RabbitMqMessage message) { - try { - String alertMessage = String.format( - "🚨 OCR处理死信告警\n" + - "消息ID: %s\n" + - "任务名称: %s\n" + - "文件路径: %s\n" + - "时间: %s\n" + - "请及时处理!", - message.getMessageId(), - message.getTaskName(), - message.getUploadPath(), - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) - ); - - log.error("死信告警: {}", alertMessage); - - // 这里可以集成实际的告警系统 - // 例如:发送邮件、短信、钉钉、企业微信等 - // alertService.send(alertMessage); - - } catch (Exception e) { - log.error("发送死信告警失败", e); - } - } - - */ -/** - * 收集死信消息信息 - *//* - - private Map collectDeadLetterInfo(RabbitMqMessage message, - org.springframework.amqp.core.Message amqpMessage) { - Map info = new HashMap<>(); - - info.put("messageId", message.getMessageId()); - info.put("taskName", message.getTaskName()); - info.put("uploadPath", message.getUploadPath()); - info.put("deadLetterTime", new Date()); - info.put("originalHeaders", amqpMessage.getMessageProperties().getHeaders()); - - return info; - } - - private RabbitMqMessage parseMessage(org.springframework.amqp.core.Message message) { - try { - String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); - RabbitMqMessage rabbitMq = objectMapper.readValue(messageBody, RabbitMqMessage.class); - - if (rabbitMq.getMessageId() == null) { - String messageId = message.getMessageProperties().getMessageId(); - rabbitMq.setMessageId(messageId != null ? messageId : generateMessageId()); - } - return rabbitMq; - - } catch (Exception e) { - log.error("消息解析失败", e); - throw new RuntimeException("消息解析失败", e); - } - } - - private void processBusiness(RabbitMqMessage message) { - String uploadPath = message.getUploadPath(); - validateUploadPath(uploadPath); - File fileFromMinio = getFileFromMinio(uploadPath); - OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); - processOcrResult(message, ocrResponse); - } - - private void validateUploadPath(String uploadPath) { - if (StringUtils.isBlank(uploadPath)) { - throw new RuntimeException("文件路径不能为空"); - } - } - - private File getFileFromMinio(String uploadPath) { - try { - File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath); - if (file == null || !file.exists()) { - throw new RuntimeException("Minio文件不存在: " + uploadPath); - } - return file; - } catch (Exception e) { - throw new RuntimeException("获取Minio文件失败: " + uploadPath, e); - } - } - - private OcrResponse performOcrRecognition(File file) { - try { -// OcrRequest ocrRequest = buildOcrRequest(file); -// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest); - OcrResponse ocrResponse = null; - // 修复:检查 OCR 响应是否为 null - if (Objects.isNull(ocrResponse)) { - throw new RuntimeException("OCR服务返回结果为空"); - } - - if (!isOcrResponseValid(ocrResponse)) { - throw new RuntimeException("OCR识别结果无效"); - } - - log.info("OCR识别成功 - 数据: {}", ocrResponse.getData()); - return ocrResponse; - - } catch (Exception e) { - log.error("OCR识别失败", e); - throw new RuntimeException("OCR识别失败: " + e.getMessage(), e); - } - } - - private OcrRequest buildOcrRequest(File file) { - OcrRequest ocrRequest = new OcrRequest(); - ocrRequest.setFile(file); - ocrRequest.setType("image/png"); - ocrRequest.setFields_json("姓名,公民身份号码"); - return ocrRequest; - } - - private boolean isOcrResponseValid(OcrResponse ocrResponse) { - return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty(); - } - - public void processOcrResult(RabbitMqMessage originalMessage, OcrResponse ocrResponse) { - try { - log.info("处理OCR结果 - 消息ID: {}, 任务: {}", - originalMessage.getMessageId(), originalMessage.getTaskName()); - // 业务逻辑... - log.info("OCR结果处理完成 - 消息ID: {}", originalMessage.getMessageId()); - } catch (Exception e) { - log.error("OCR结果处理失败 - 消息ID: {}", originalMessage.getMessageId(), e); - throw new RuntimeException("OCR结果处理失败", e); - } - } - - private String generateMessageId() { - return "gen_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000); - } -} -*/ diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java index cab6c6a..63e5d48 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/consumer/RabbitMQConsumerService.java @@ -1,6 +1,7 @@ package com.bonus.rabbitmq.consumer; import com.bonus.analysis.service.IASAnalysisService; +import com.bonus.common.domain.analysis.po.ProComposition; import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo; import com.bonus.common.domain.ocr.dto.AnalysisOcrRequest; import com.bonus.common.domain.ocr.vo.AnalysisResponse; @@ -107,6 +108,15 @@ public class RabbitMQConsumerService { * @date 2025/11/29 13:25 */ private void processBusiness(RabbitMqMessage message) { + // 更新项目或者标段的解析状态 + message.setAnalysisState("0"); + analysisService.updateProOrBidState(message); + // 更新项目或者标段的文件组成的解析状态 + message.setAnalysisState("0"); + analysisService.updateProCompositionState(message); + // 查询项目或标段的组成文件的解析状态 + List proCompositionList = analysisService.getProCompositionState(message); + // 招标解析执行一次处理 String uploadPath = message.getUploadPath(); File fileFromMinio = getFileFromMinio(uploadPath); AnalysisResponse ocrResponse = performAnalysisRecognition(fileFromMinio); @@ -117,10 +127,11 @@ public class RabbitMQConsumerService { .map(Object::toString) .orElse(null); if(StringUtils.isNotBlank(folderPath)) { - // 一次处理解析成功后执行二次处理 + // 招标解析执行二次处理 performAnalysisRecognition2(message, folderPath); }else{ // 解析失败 + message.setAnalysisState("2"); } } diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducer.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducer.java deleted file mode 100644 index 5d44b8d..0000000 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/producer/RabbitMQProducer.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - -package com.bonus.web.rabbitmq.producer; - -import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.*; -import org.springframework.amqp.rabbit.connection.CorrelationData; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.nio.charset.StandardCharsets; -import java.util.Date; -import java.util.Map; -import java.util.concurrent.*; - - -*/ -/** - * @className:RabbitMQProducer - * @author:cwchen - * @date:2025-11-13-13:32 - * @version:1.0 - * @description:mq生产者 - *//* - - -@Component(value = "RabbitMQProducer") -@Slf4j -public class RabbitMQProducer { - - @Autowired - private RabbitTemplate rabbitTemplate; - - private final ObjectMapper objectMapper = new ObjectMapper(); - private final Map> pendingConfirmations = new ConcurrentHashMap<>(); - private final ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor(); - - - */ -/** - * 发送消息 - 主入口,默认使用异步确认 - *//* - - - public CompletableFuture send(String exchange, String routingKey, RabbitMqMessage message) { - return sendAsync(exchange, routingKey, message); - } - - - */ -/** - * 异步发送 - *//* - - - public CompletableFuture sendAsync(String exchange, String routingKey, RabbitMqMessage message) { - String messageId = generateMessageId(); - message.setMessageId(messageId); - - CompletableFuture future = new CompletableFuture<>(); - pendingConfirmations.put(messageId, future); - - CorrelationData correlationData = new CorrelationData(messageId); - - try { - org.springframework.amqp.core.Message amqpMessage = buildMessage(message, messageId); - rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, correlationData); - log.debug("消息发送完成,等待异步确认 - ID: {}", messageId); - // 30分钟超时 - scheduleTimeout(messageId, future, 30, TimeUnit.MINUTES); - return future; - } catch (Exception e) { - pendingConfirmations.remove(messageId); - future.completeExceptionally(e); - log.error("消息发送异常 - ID: {}", messageId, e); - return future; - } - } - - - */ -/** - * 初始化确认回调 - *//* - - - @PostConstruct - public void init() { - rabbitTemplate.setConfirmCallback(this::handleConfirm); - // 优雅关闭 - Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); - } - - - */ -/** - * 处理确认回调 - *//* - - - private void handleConfirm(CorrelationData correlationData, boolean ack, String cause) { - if (correlationData == null || correlationData.getId() == null) { - return; - } - String messageId = correlationData.getId(); - CompletableFuture future = pendingConfirmations.remove(messageId); - - if (future != null) { - if (ack) { - future.complete(true); - } else { - log.error("Broker确认失败 - ID: {}, 原因: {}", messageId, cause); - future.complete(false); - } - } - } - - - */ -/** - * 调度超时 - *//* - - - private void scheduleTimeout(String messageId, CompletableFuture future, - long timeout, TimeUnit unit) { - timeoutScheduler.schedule(() -> { - if (pendingConfirmations.remove(messageId) != null && !future.isDone()) { - log.warn("消息确认超时 - ID: {}", messageId); - future.complete(false); - } - }, timeout, unit); - } - - - */ -/** - * 资源清理 - *//* - - - @PreDestroy - public void shutdown() { - timeoutScheduler.shutdown(); - try { - if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) { - timeoutScheduler.shutdownNow(); - } - } catch (InterruptedException e) { - timeoutScheduler.shutdownNow(); - Thread.currentThread().interrupt(); - } - - // 处理未完成的消息 - pendingConfirmations.forEach((id, future) -> { - if (!future.isDone()) { - future.complete(false); - } - }); - pendingConfirmations.clear(); - } - - */ -/** - * 生成消息ID - *//* - - - private String generateMessageId() { - return "rabbitmq_msg" + System.currentTimeMillis() + "_" + - ThreadLocalRandom.current().nextInt(100000, 999999); - } - - - */ -/** - * 构建消息对象 - *//* - - - private org.springframework.amqp.core.Message buildMessage(RabbitMqMessage message, String messageId) { - try { - String jsonStr = objectMapper.writeValueAsString(message); - return MessageBuilder - .withBody(jsonStr.getBytes(StandardCharsets.UTF_8)) - .setContentType(MessageProperties.CONTENT_TYPE_JSON) - .setContentEncoding("UTF-8") - .setMessageId(messageId) - .setTimestamp(new Date()) - .setDeliveryMode(MessageDeliveryMode.PERSISTENT) - .setHeader("source", "ocr-service") - .setHeader("businessType", extractBusinessType(message.getUploadPath())) - .build(); - } catch (Exception e) { - throw new RuntimeException("消息构建失败: " + messageId, e); - } - } - - private String extractBusinessType(String uploadPath) { - if (uploadPath == null) return "unknown"; - if (uploadPath.contains("personnelDatabase")) return "personnel"; - if (uploadPath.contains("toolsDatabase")) return "tool"; - return "other"; - } -} - -*/ diff --git a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java index 27f8e81..0f242ad 100644 --- a/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java +++ b/bonus-rabbitmq/src/main/java/com/bonus/rabbitmq/service/SendRabbitMqService.java @@ -6,7 +6,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; -import java.util.Map; /** * @className:SendRabbitMqService