diff --git a/bonus-admin/src/main/java/com/bonus/web/controller/analysis/AnalysisController.java b/bonus-admin/src/main/java/com/bonus/web/controller/analysis/AnalysisController.java index 68ee40f..c5ddd4e 100644 --- a/bonus-admin/src/main/java/com/bonus/web/controller/analysis/AnalysisController.java +++ b/bonus-admin/src/main/java/com/bonus/web/controller/analysis/AnalysisController.java @@ -4,15 +4,14 @@ import com.bonus.common.annotation.RequiresPermissions; import com.bonus.common.annotation.SysLog; import com.bonus.common.core.controller.BaseController; //import com.bonus.web.service.analysis.AnalysisService; +import com.bonus.common.core.domain.AjaxResult; import com.bonus.common.core.page.TableDataInfo; import com.bonus.common.domain.analysis.dto.AnalysisDto; import com.bonus.common.domain.analysis.vo.AnalysisVo; import com.bonus.common.enums.OperaType; import com.bonus.web.service.analysis.AnalysisService; import io.swagger.annotations.ApiOperation; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.util.List; @@ -31,11 +30,11 @@ public class AnalysisController extends BaseController { @Resource(name = "AnalysisService") private AnalysisService analysisService; - /*@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息") + @ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息") @GetMapping("/testAsyncMq") public AjaxResult testAsyncMq() { return analysisService.testAsyncMq(); - }*/ + } @ApiOperation(value = "招标解析", notes = "查询列表") @GetMapping("getList") @@ -46,4 +45,12 @@ public class AnalysisController extends BaseController { List list = analysisService.getList(dto); return getDataTable(list); } + + @ApiOperation(value = "招标解析", notes = "新建项目") + @PostMapping("saveData") + @SysLog(title = "招标解析", module = "招标解析->新建项目", businessType = OperaType.INSERT, details = "新建项目", logType = 1) + @RequiresPermissions("analysis:analysis:add") + public AjaxResult saveData(@RequestBody AnalysisDto.TemplateDto dto) { + return analysisService.saveData(dto); + } } diff --git a/bonus-admin/src/main/java/com/bonus/web/controller/rabbitmq/RabbitMQController.java b/bonus-admin/src/main/java/com/bonus/web/controller/rabbitmq/RabbitMQController.java new file mode 100644 index 0000000..ec72d13 --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/controller/rabbitmq/RabbitMQController.java @@ -0,0 +1,69 @@ +/* +package com.bonus.web.controller.rabbitmq; + +import com.bonus.common.domain.rabbitmq.vo.OrderMessage; +import com.bonus.rabbitmq.service.MessageProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.util.Map; + +*/ +/** + * @className:RabbitMQController + * @author:cwchen + * @date:2025-11-04-18:24 + * @version:1.0 + * @description: + *//* + +@RestController +@RequestMapping("/api/rabbitmq") +public class RabbitMQController { + + @Resource(name = "MessageProducer") + private MessageProducer messageProducer; + + */ +/** + * 发送测试消息 + *//* + + @PostMapping("/send") + public ResponseEntity sendMessage(@RequestBody Map request) { + String productName = request.get("productName"); + BigDecimal amount = new BigDecimal(request.get("amount")); + + String orderId = "ORD" + System.currentTimeMillis(); + OrderMessage orderMessage = new OrderMessage(orderId, productName, amount); + + boolean success = messageProducer.sendOrderMessage(orderMessage); + + if (success) { + return ResponseEntity.ok("消息发送成功: " + orderId); + } else { + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) + .body("消息发送失败"); + } + } + + */ +/** + * 批量发送消息 + *//* + + @PostMapping("/send-batch") + public ResponseEntity sendBatchMessages(@RequestParam(defaultValue = "5") int count) { + for (int i = 1; i <= count; i++) { + String orderId = "ORD" + System.currentTimeMillis() + "_" + i; + OrderMessage orderMessage = new OrderMessage(orderId, "产品" + i, + new BigDecimal(100 + i * 10)); + messageProducer.sendOrderMessage(orderMessage); + } + return ResponseEntity.ok("批量发送 " + count + " 条消息完成"); + } +}*/ diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java new file mode 100644 index 0000000..29a3373 --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/config/RabbitMQConfig.java @@ -0,0 +1,226 @@ +package com.bonus.web.rabbitmq.config; + +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; +import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; +import org.springframework.amqp.rabbit.retry.MessageRecoverer; +import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; +import org.springframework.amqp.support.converter.DefaultClassMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.retry.annotation.EnableRetry; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.interceptor.RetryOperationsInterceptor; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.util.HashMap; +import java.util.Map; + +/** + * @className:RabbitMQConfig + * @author:cwchen + * @date:2025-11-13-13:30 + * @version:1.0 + * @description:rabbitmq配置类 - 手动确认模式 + */ + +@Configuration +@EnableRetry +@Slf4j +public class RabbitMQConfig { + + /** + * JSON消息转换器 + */ + @Bean + public MessageConverter jsonMessageConverter() { + Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); + converter.setClassMapper(classMapper()); + return converter; + } + + /** + * 类型映射器 + */ + @Bean + public DefaultClassMapper classMapper() { + DefaultClassMapper classMapper = new DefaultClassMapper(); + Map> idClassMapping = new HashMap<>(); + idClassMapping.put("rabbitMqMessage", RabbitMqMessage.class); + classMapper.setIdClassMapping(idClassMapping); + classMapper.setDefaultType(Map.class); + return classMapper; + } + + /** + * 声明主队列、交换机、绑定 + */ + @Bean + public Declarables declarables() { + // 主队列 + Queue mainQueue = new Queue("myQueue", true, false, false); + DirectExchange mainExchange = new DirectExchange("myExchange", true, false); + Binding mainBinding = BindingBuilder.bind(mainQueue) + .to(mainExchange) + .with("myRoutingKey"); + + log.info("声明RabbitMQ组件 - 主队列: {}, 主交换机: {}, 路由键: {}", + "myQueue", "myExchange", "myRoutingKey"); + + return new Declarables(mainQueue, mainExchange, mainBinding); + } + + /** + * 错误处理器 Bean - 用于手动确认模式下的异常处理 + */ + @Bean + public RabbitListenerErrorHandler rabbitListenerErrorHandler() { + return (amqpMessage, message, exception) -> { + log.error("消息处理失败,将拒绝消息并重新入队: {}", exception.getMessage(), exception); + // 返回原始消息,让重试机制处理 + throw exception; + }; + } + + /** + * 多消费者容器工厂 - 手动确认模式 + */ + @Bean + public SimpleRabbitListenerContainerFactory multiConsumerFactory( + ConnectionFactory connectionFactory, + MessageConverter jsonMessageConverter) { + + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setMessageConverter(jsonMessageConverter); + + // 更改为手动确认模式 + factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + factory.setMissingQueuesFatal(false); + factory.setAutoStartup(true); + factory.setConcurrentConsumers(3); + factory.setMaxConcurrentConsumers(5); + factory.setPrefetchCount(10); // 每个消费者预取的消息数量 + + // 配置错误处理器 + factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() { + @Override + public boolean isFatal(Throwable t) { + // 对于业务异常,不认为是致命错误 + if (t instanceof RuntimeException) { + String message = t.getMessage(); + if (message != null && (message.contains("消息处理失败") || + message.contains("OCR") || + message.contains("Minio"))) { + return false; + } + } + // 其他异常直接拒绝 + return false; + } + })); + + // 使用简单的拒绝重试策略 - 重试后直接拒绝消息 + MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer(); + + // 创建重试拦截器 + RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless() + .maxAttempts(3) // 总共执行3次(1次原始 + 2次重试) + .backOffOptions(2000L, 2.0, 10000L) // 初始2秒,倍数2,最大10秒 + .recoverer(messageRecoverer) + .build(); + + factory.setAdviceChain(interceptor); + log.info("多消费者容器工厂配置完成 - 使用手动确认模式"); + log.info("重试配置:最多重试2次,初始延迟2秒,最大延迟10秒"); + return factory; + } + + /** + * RabbitTemplate主配置 + */ + @Bean + @Primary + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, + MessageConverter jsonMessageConverter) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + + // 基础配置 + rabbitTemplate.setMessageConverter(jsonMessageConverter); + rabbitTemplate.setMandatory(true); + + // 设置重试模板(用于发送消息的重试) + rabbitTemplate.setRetryTemplate(createRabbitTemplateRetryTemplate()); + + // 返回回调 + /*rabbitTemplate.setReturnsCallback(returned -> { + log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回应码: {}, 回应信息: {}", + returned.getExchange(), + returned.getRoutingKey(), + returned.getReplyCode(), + returned.getReplyText()); + });*/ + + log.info("RabbitTemplate配置完成"); + return rabbitTemplate; + } + + /** + * 创建 RabbitTemplate 专用的重试模板 + */ + private RetryTemplate createRabbitTemplateRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + + // 发送消息的重试策略:最多重试2次 + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(2); + + // 退避策略 + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000L); + backOffPolicy.setMultiplier(2.0); + backOffPolicy.setMaxInterval(5000L); + + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(backOffPolicy); + + return retryTemplate; + } + + @Bean + @Primary + public CachingConnectionFactory connectionFactory( + @Value("${spring.rabbitmq.host}") String host, + @Value("${spring.rabbitmq.port}") int port, + @Value("${spring.rabbitmq.username}") String username, + @Value("${spring.rabbitmq.password}") String password, + @Value("${spring.rabbitmq.virtual-host}") String virtualHost) { + + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost(host); + connectionFactory.setPort(port); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setVirtualHost(virtualHost); + + connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); + connectionFactory.setChannelCacheSize(25); + connectionFactory.setChannelCheckoutTimeout(2000); + + log.info("RabbitMQ连接工厂配置完成 - 主机: {}, 端口: {}", host, port); + return connectionFactory; + } +} \ No newline at end of file diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumer.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumer.java new file mode 100644 index 0000000..79ca9fe --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumer.java @@ -0,0 +1,367 @@ +/* +package com.bonus.web.rabbitmq.consumer; + +import com.bonus.common.domain.ocr.dto.OcrRequest; +import com.bonus.common.domain.ocr.vo.OcrResponse; +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.bonus.file.config.MinioConfig; +import com.bonus.file.util.MinioUtil; +import com.bonus.ocr.service.OcrService; +import com.bonus.web.rabbitmq.config.RabbitMQConfig; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; + +*/ +/** + * @className:RabbitMQConsumer + * @author:cwchen + * @date:2025-11-13-13:32 + * @version:1.0 + * @description:mq消费者 + *//* + +@Component(value = "RabbitMQConsumer") +@Slf4j +public class RabbitMQConsumer { + + @Resource + private OcrService ocrService; + + @Resource + private MinioConfig minioConfig; + + @Resource + private MinioUtil minioUtil; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + */ +/** + * 主队列消息处理 + * 优化:确保异常处理逻辑正确,消息确认机制清晰 + * 关键:在手动确认模式下,成功时手动ACK,失败时抛出异常让重试机制处理 + *//* + + @RabbitListener(queues = "myQueue", containerFactory = "multiConsumerFactory") + public void handleMessage(org.springframework.amqp.core.Message amqpMessage, + Channel channel) throws IOException { + + RabbitMqMessage rabbitMq = null; + long deliveryTag = 0; + String messageId = "unknown"; + + try { + // 解析消息 + rabbitMq = parseMessage(amqpMessage); + deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); + messageId = rabbitMq.getMessageId(); + + log.info("开始处理消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName()); + + // 业务处理 + processBusiness(rabbitMq); + + // 处理成功 - 确认消息 + channel.basicAck(deliveryTag, false); + log.info("✅ 消息处理完成并确认 - ID: {}", messageId); + + } catch (Exception e) { + log.error("❌ 消息处理异常 - ID: {}, deliveryTag: {}, 异常信息: {}", + messageId, deliveryTag, e.getMessage(), e); + + // 检查是否是 AmqpRejectAndDontRequeueException(重试次数耗尽,消息已发送到死信队列) + AmqpRejectAndDontRequeueException rejectException = findRejectException(e); + + if (rejectException != null) { + // 关键修复:消息已发送到死信队列,需要手动拒绝原消息 + log.error("🚫 重试次数耗尽,消息已发送到死信队列,准备拒绝原消息 - ID: {}, deliveryTag: {}", + messageId, deliveryTag); + + try { + if (deliveryTag > 0) { + // 手动拒绝消息(NACK,不重新入队) + // requeue=false 确保消息不会重新入队,服务重启后不会重新处理 + channel.basicNack(deliveryTag, false, false); + log.error("✅ 原消息已永久拒绝并从主队列中删除 - ID: {}, deliveryTag: {}。服务重启后不会重新处理此消息。", + messageId, deliveryTag); + // 消息已经被处理(发送到死信队列并永久拒绝),不重新抛出异常 + return; + } else { + log.error("❌ deliveryTag 无效,无法拒绝消息 - ID: {}, deliveryTag: {}", + messageId, deliveryTag); + } + } catch (IOException ioException) { + log.error("❌ 拒绝消息失败 - ID: {}, deliveryTag: {}, 异常: {}", + messageId, deliveryTag, ioException.getMessage(), ioException); + } + // 即使拒绝失败,也不重新抛出异常,避免重复处理 + return; + } + + // 其他异常:抛出异常触发重试机制 + throw new RuntimeException("消息处理失败: " + e.getMessage(), e); + } + } + + */ +/** + * 递归查找 AmqpRejectAndDontRequeueException + *//* + + private AmqpRejectAndDontRequeueException findRejectException(Throwable e) { + if (e == null) { + return null; + } + + if (e instanceof AmqpRejectAndDontRequeueException) { + return (AmqpRejectAndDontRequeueException) e; + } + + // 检查异常消息中是否包含死信队列相关的关键字 + String errorMessage = e.getMessage(); + if (errorMessage != null && + (errorMessage.contains("死信队列") || + errorMessage.contains("重试次数耗尽") || + errorMessage.contains("消息已发送到死信队列"))) { + // 递归查找异常链 + Throwable cause = e.getCause(); + int depth = 0; + while (cause != null && depth < 10) { + if (cause instanceof AmqpRejectAndDontRequeueException) { + return (AmqpRejectAndDontRequeueException) cause; + } + cause = cause.getCause(); + depth++; + } + } + + // 递归查找 cause + return findRejectException(e.getCause()); + } + + */ +/** + * 死信队列消息处理 - 监听重试次数耗尽的消息 + *//* + + @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE, containerFactory = "deadLetterContainerFactory") + public void handleDeadLetterMessage(org.springframework.amqp.core.Message amqpMessage, + Channel channel) throws IOException { + + RabbitMqMessage rabbitMq = parseMessage(amqpMessage); + long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag(); + String messageId = rabbitMq.getMessageId(); + + try { + log.error("🚨 收到死信消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName()); + + // 处理死信消息:记录日志、发送告警、人工干预等 + processDeadLetterMessage(rabbitMq, amqpMessage); + + // 确认死信消息 + channel.basicAck(deliveryTag, false); + log.info("死信消息处理完成 - ID: {}", messageId); + + } catch (Exception e) { + log.error("死信消息处理异常 - ID: {}", messageId, e); + // 死信消息处理失败,可以选择重新入队或丢弃 + channel.basicNack(deliveryTag, false, false); + } + } + + */ +/** + * 处理死信消息 + *//* + + private void processDeadLetterMessage(RabbitMqMessage message, org.springframework.amqp.core.Message amqpMessage) { + String messageId = message.getMessageId(); + + try { + // 1. 记录死信消息到数据库或日志文件 + logDeadLetterToDatabase(message); + + // 2. 发送告警通知 + sendDeadLetterAlert(message); + + // 3. 记录详细错误信息 + Map deadLetterInfo = collectDeadLetterInfo(message, amqpMessage); + log.error("死信消息详情 - ID: {}, 信息: {}", messageId, deadLetterInfo); + + } catch (Exception e) { + log.error("处理死信消息时发生异常 - ID: {}", messageId, e); + } + } + + */ +/** + * 记录死信消息到数据库(示例) + *//* + + private void logDeadLetterToDatabase(RabbitMqMessage message) { + // 这里可以集成数据库操作,记录死信消息 + // 例如:messageId, taskName, uploadPath, 失败时间, 失败原因等 + log.warn("记录死信消息到数据库 - ID: {}, 任务: {}", + message.getMessageId(), message.getTaskName()); + } + + */ +/** + * 发送死信告警 + *//* + + private void sendDeadLetterAlert(RabbitMqMessage message) { + try { + String alertMessage = String.format( + "🚨 OCR处理死信告警\n" + + "消息ID: %s\n" + + "任务名称: %s\n" + + "文件路径: %s\n" + + "时间: %s\n" + + "请及时处理!", + message.getMessageId(), + message.getTaskName(), + message.getUploadPath(), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ); + + log.error("死信告警: {}", alertMessage); + + // 这里可以集成实际的告警系统 + // 例如:发送邮件、短信、钉钉、企业微信等 + // alertService.send(alertMessage); + + } catch (Exception e) { + log.error("发送死信告警失败", e); + } + } + + */ +/** + * 收集死信消息信息 + *//* + + private Map collectDeadLetterInfo(RabbitMqMessage message, + org.springframework.amqp.core.Message amqpMessage) { + Map info = new HashMap<>(); + + info.put("messageId", message.getMessageId()); + info.put("taskName", message.getTaskName()); + info.put("uploadPath", message.getUploadPath()); + info.put("deadLetterTime", new Date()); + info.put("originalHeaders", amqpMessage.getMessageProperties().getHeaders()); + + return info; + } + + private RabbitMqMessage parseMessage(org.springframework.amqp.core.Message message) { + try { + String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); + RabbitMqMessage rabbitMq = objectMapper.readValue(messageBody, RabbitMqMessage.class); + + if (rabbitMq.getMessageId() == null) { + String messageId = message.getMessageProperties().getMessageId(); + rabbitMq.setMessageId(messageId != null ? messageId : generateMessageId()); + } + return rabbitMq; + + } catch (Exception e) { + log.error("消息解析失败", e); + throw new RuntimeException("消息解析失败", e); + } + } + + private void processBusiness(RabbitMqMessage message) { + String uploadPath = message.getUploadPath(); + validateUploadPath(uploadPath); + File fileFromMinio = getFileFromMinio(uploadPath); + OcrResponse ocrResponse = performOcrRecognition(fileFromMinio); + processOcrResult(message, ocrResponse); + } + + private void validateUploadPath(String uploadPath) { + if (StringUtils.isBlank(uploadPath)) { + throw new RuntimeException("文件路径不能为空"); + } + } + + private File getFileFromMinio(String uploadPath) { + try { + File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath); + if (file == null || !file.exists()) { + throw new RuntimeException("Minio文件不存在: " + uploadPath); + } + return file; + } catch (Exception e) { + throw new RuntimeException("获取Minio文件失败: " + uploadPath, e); + } + } + + private OcrResponse performOcrRecognition(File file) { + try { +// OcrRequest ocrRequest = buildOcrRequest(file); +// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest); + OcrResponse ocrResponse = null; + // 修复:检查 OCR 响应是否为 null + if (Objects.isNull(ocrResponse)) { + throw new RuntimeException("OCR服务返回结果为空"); + } + + if (!isOcrResponseValid(ocrResponse)) { + throw new RuntimeException("OCR识别结果无效"); + } + + log.info("OCR识别成功 - 数据: {}", ocrResponse.getData()); + return ocrResponse; + + } catch (Exception e) { + log.error("OCR识别失败", e); + throw new RuntimeException("OCR识别失败: " + e.getMessage(), e); + } + } + + private OcrRequest buildOcrRequest(File file) { + OcrRequest ocrRequest = new OcrRequest(); + ocrRequest.setFile(file); + ocrRequest.setType("image/png"); + ocrRequest.setFields_json("姓名,公民身份号码"); + return ocrRequest; + } + + private boolean isOcrResponseValid(OcrResponse ocrResponse) { + return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty(); + } + + public void processOcrResult(RabbitMqMessage originalMessage, OcrResponse ocrResponse) { + try { + log.info("处理OCR结果 - 消息ID: {}, 任务: {}", + originalMessage.getMessageId(), originalMessage.getTaskName()); + // 业务逻辑... + log.info("OCR结果处理完成 - 消息ID: {}", originalMessage.getMessageId()); + } catch (Exception e) { + log.error("OCR结果处理失败 - 消息ID: {}", originalMessage.getMessageId(), e); + throw new RuntimeException("OCR结果处理失败", e); + } + } + + private String generateMessageId() { + return "gen_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000); + } +} +*/ diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java new file mode 100644 index 0000000..5352eca --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/consumer/RabbitMQConsumerService.java @@ -0,0 +1,304 @@ +package com.bonus.web.rabbitmq.consumer; + +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.rabbitmq.client.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * @className:RabbitMQConsumerService + * @author:cwchen + * @date:2025-11-24-16:19 + * @version:1.0 + * @description:消费者服务 + */ +@Component(value = "") +@Slf4j +public class RabbitMQConsumerService { + + /** + * 主消息消费者 - 使用手动确认模式 + */ + @RabbitListener( + queues = "myQueue", + containerFactory = "multiConsumerFactory", + errorHandler = "rabbitListenerErrorHandler" + ) + public void handleMessage(RabbitMqMessage message, + Channel channel, + @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { + + String messageId = message.getMessageId(); + String taskName = message.getTaskName(); + + log.info("开始处理消息 - ID: {}, 任务: {}, 投递标签: {}", + messageId, taskName, deliveryTag); + + try { + // 根据任务类型进行不同的处理 + switch (taskName) { + case "FILE_UPLOAD": + processFileUpload(message); + break; + case "OCR_PROCESS": + processOcrTask(message); + break; + case "IMAGE_PROCESS": + processImage(message); + break; + case "DOCUMENT_EXTRACT": + processDocumentExtract(message); + break; + default: + processDefaultTask(message); + break; + } + + // 处理成功,手动确认消息 + channel.basicAck(deliveryTag, false); + log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag); + + } catch (BusinessException e) { + // 业务异常,记录日志但不重试 + log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e); + try { + channel.basicReject(deliveryTag, false); + } catch (IOException ioException) { + log.error("拒绝消息失败 - ID: {}", messageId, ioException); + } + } catch (Exception e) { + // 其他异常,记录日志并重新入队进行重试 + log.error("处理消息异常,将重新入队 - ID: {}", messageId, e); + try { + channel.basicReject(deliveryTag, true); + } catch (IOException ioException) { + log.error("拒绝消息失败 - ID: {}", messageId, ioException); + } + // 重新抛出异常,让重试拦截器处理 + throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e); + } + } + + /** + * 处理文件上传任务 + */ + private void processFileUpload(RabbitMqMessage message) { + String messageId = message.getMessageId(); + String uploadPath = message.getUploadPath(); + + log.info("处理文件上传任务 - ID: {}, 路径: {}", messageId, uploadPath); + + // 模拟业务处理 + try { + // 1. 验证文件 + validateFile(uploadPath); + + // 2. 处理文件 + processUploadedFile(uploadPath); + + // 3. 更新处理状态 + updateProcessStatus(messageId, "COMPLETED"); + + log.info("文件上传任务处理完成 - ID: {}", messageId); + + } catch (Exception e) { + log.error("文件上传任务处理失败 - ID: {}", messageId, e); + throw new BusinessException("文件处理失败", e); + } + } + + /** + * 处理OCR任务 + */ + private void processOcrTask(RabbitMqMessage message) { + String messageId = message.getMessageId(); + String filePath = message.getUploadPath(); + String taskId = (String) message.getBusinessData().get("taskId"); + + log.info("处理OCR任务 - ID: {}, 任务ID: {}, 文件: {}", messageId, taskId, filePath); + + try { + // 1. 检查OCR服务可用性 + checkOcrServiceAvailability(); + + // 2. 执行OCR处理 + String ocrResult = performOcrProcessing(filePath); + + // 3. 保存OCR结果 + saveOcrResult(taskId, ocrResult); + + // 4. 更新任务状态 + updateOcrTaskStatus(taskId, "SUCCESS"); + + log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId); + + } catch (OcrServiceException e) { + log.error("OCR服务异常 - ID: {}", messageId, e); + throw new BusinessException("OCR处理失败", e); + } catch (Exception e) { + log.error("OCR任务处理异常 - ID: {}", messageId, e); + throw new RuntimeException("OCR处理异常,需要重试", e); + } + } + + /** + * 处理图片处理任务 + */ + private void processImage(RabbitMqMessage message) { + String messageId = message.getMessageId(); + String imagePath = message.getUploadPath(); + String processType = (String) message.getBusinessData().get("processType"); + + log.info("处理图片任务 - ID: {}, 类型: {}, 路径: {}", messageId, processType, imagePath); + + try { + // 模拟图片处理 + Thread.sleep(1000); + + // 这里添加实际的图片处理逻辑 + processImageFile(imagePath, processType); + + log.info("图片处理完成 - ID: {}", messageId); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("图片处理被中断", e); + } catch (Exception e) { + log.error("图片处理失败 - ID: {}", messageId, e); + throw new RuntimeException("图片处理失败", e); + } + } + + /** + * 处理文档提取任务 + */ + private void processDocumentExtract(RabbitMqMessage message) { + String messageId = message.getMessageId(); + String documentPath = message.getUploadPath(); + String extractType = (String) message.getBusinessData().get("extractType"); + + log.info("处理文档提取任务 - ID: {}, 类型: {}, 路径: {}", messageId, extractType, documentPath); + + try { + // 模拟文档提取处理 + extractDocumentContent(documentPath, extractType); + + log.info("文档提取完成 - ID: {}", messageId); + + } catch (Exception e) { + log.error("文档提取失败 - ID: {}", messageId, e); + throw new RuntimeException("文档提取失败", e); + } + } + + /** + * 处理默认任务 + */ + private void processDefaultTask(RabbitMqMessage message) { + String messageId = message.getMessageId(); + String taskName = message.getTaskName(); + + log.info("处理默认任务 - ID: {}, 任务名: {}", messageId, taskName); + + // 模拟业务处理 + try { + // 这里添加实际的任务处理逻辑 + performBusinessLogic(message); + + log.info("默认任务处理完成 - ID: {}", messageId); + + } catch (Exception e) { + log.error("默认任务处理失败 - ID: {}", messageId, e); + throw new RuntimeException("任务处理失败", e); + } + } + + // 以下为模拟的业务方法实现 + + private void validateFile(String filePath) { + // 模拟文件验证 + if (filePath == null || filePath.isEmpty()) { + throw new BusinessException("文件路径不能为空"); + } + log.debug("文件验证通过: {}", filePath); + } + + private void processUploadedFile(String filePath) { + // 模拟文件处理 + log.debug("处理上传文件: {}", filePath); + // 实际处理逻辑... + } + + private void updateProcessStatus(String messageId, String status) { + // 模拟更新处理状态 + log.debug("更新处理状态 - ID: {}, 状态: {}", messageId, status); + } + + private void checkOcrServiceAvailability() { + // 模拟OCR服务检查 + log.debug("检查OCR服务可用性"); + // 实际检查逻辑... + } + + private String performOcrProcessing(String filePath) { + // 模拟OCR处理 + log.debug("执行OCR处理: {}", filePath); + return "OCR处理结果"; + } + + private void saveOcrResult(String taskId, String result) { + // 模拟保存OCR结果 + log.debug("保存OCR结果 - 任务ID: {}", taskId); + } + + private void updateOcrTaskStatus(String taskId, String status) { + // 模拟更新OCR任务状态 + log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status); + } + + private void processImageFile(String imagePath, String processType) { + // 模拟图片处理 + log.debug("处理图片文件 - 路径: {}, 类型: {}", imagePath, processType); + } + + private void extractDocumentContent(String documentPath, String extractType) { + // 模拟文档提取 + log.debug("提取文档内容 - 路径: {}, 类型: {}", documentPath, extractType); + } + + private void performBusinessLogic(RabbitMqMessage message) { + // 模拟业务逻辑执行 + log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId()); + } + + /** + * 自定义业务异常 + */ + public static class BusinessException extends RuntimeException { + public BusinessException(String message) { + super(message); + } + + public BusinessException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * OCR服务异常 + */ + public static class OcrServiceException extends RuntimeException { + public OcrServiceException(String message) { + super(message); + } + + public OcrServiceException(String message, Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java new file mode 100644 index 0000000..62a3407 --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/message/RabbitMqMessageBuilder.java @@ -0,0 +1,141 @@ +package com.bonus.web.rabbitmq.message; + +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.UUID; + +/** + * @className: RabbitMqMessageBuilder + * @description: RabbitMQ 消息构建器 + */ +@Component +@Slf4j +public class RabbitMqMessageBuilder { + + /** + * 构建基础消息 + */ + public RabbitMqMessage buildBaseMessage(String taskName, String uploadPath) { + return RabbitMqMessage.createSimple(taskName, uploadPath); + } + + /** + * 构建带自定义消息ID的消息 + */ + public RabbitMqMessage buildMessageWithCustomId(String taskName, String uploadPath, String customMessageId) { + return RabbitMqMessage.builder() + .messageId(customMessageId) + .taskName(taskName) + .uploadPath(uploadPath) + .timestamp(System.currentTimeMillis()) + .status(RabbitMqMessage.Status.PENDING) + .build(); + } + + /** + * 构建文件处理消息 + */ + public RabbitMqMessage buildFileProcessMessage(String filePath) { + return RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.FILE_UPLOAD, filePath); + } + + /** + * 构建OCR处理消息 + */ + public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) { + RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath); + message.addBusinessData("taskId", taskId); + message.addBusinessData("processType", "TEXT_EXTRACTION"); + return message; + } + + /** + * 构建带重试配置的消息 + */ + public RabbitMqMessage buildMessageWithRetryConfig(String taskName, String uploadPath, int maxRetryCount) { + return RabbitMqMessage.builder() + .taskName(taskName) + .messageId(UUID.randomUUID().toString()) + .uploadPath(uploadPath) + .timestamp(System.currentTimeMillis()) + .maxRetryCount(maxRetryCount) + .status(RabbitMqMessage.Status.PENDING) + .build(); + } + + /** + * 构建高优先级消息 + */ + public RabbitMqMessage buildHighPriorityMessage(String taskName, String uploadPath) { + return RabbitMqMessage.builder() + .taskName(taskName) + .messageId(UUID.randomUUID().toString()) + .uploadPath(uploadPath) + .timestamp(System.currentTimeMillis()) + .priority(9) // 最高优先级 + .status(RabbitMqMessage.Status.PENDING) + .build(); + } + + /** + * 构建带过期时间的消息 + */ + public RabbitMqMessage buildMessageWithTtl(String taskName, String uploadPath, long ttlMillis) { + RabbitMqMessage message = RabbitMqMessage.createSimple(taskName, uploadPath); + message.setExpirationRelative(ttlMillis); + return message; + } + + /** + * 构建完整业务消息 + */ + public RabbitMqMessage buildBusinessMessage(String taskName, String uploadPath, + String sourceSystem, String targetSystem, + Map businessData) { + RabbitMqMessage message = RabbitMqMessage.createWithBusinessData(taskName, uploadPath, businessData); + message.setSourceSystem(sourceSystem); + message.setTargetSystem(targetSystem); + return message; + } + + /** + * 构建图片处理消息 + */ + public RabbitMqMessage buildImageProcessMessage(String imagePath, String processType) { + RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.IMAGE_PROCESS, imagePath); + message.addBusinessData("processType", processType); + message.addBusinessData("imageFormat", getFileExtension(imagePath)); + return message; + } + + /** + * 构建文档提取消息 + */ + public RabbitMqMessage buildDocumentExtractMessage(String documentPath, String extractType) { + RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.DOCUMENT_EXTRACT, documentPath); + message.addBusinessData("extractType", extractType); + message.addBusinessData("documentType", getFileExtension(documentPath)); + return message; + } + + /** + * 获取文件扩展名 + */ + private String getFileExtension(String filePath) { + if (filePath == null || filePath.lastIndexOf(".") == -1) { + return "unknown"; + } + return filePath.substring(filePath.lastIndexOf(".") + 1).toLowerCase(); + } + + /** + * 生成文件相关的消息ID + */ + private String generateFileMessageId(String taskName, String filePath) { + String fileHash = Integer.toHexString(filePath.hashCode()); + return taskName + "_" + fileHash + "_" + System.currentTimeMillis(); + } +} \ No newline at end of file diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducer.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducer.java new file mode 100644 index 0000000..5d44b8d --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducer.java @@ -0,0 +1,212 @@ +/* + +package com.bonus.web.rabbitmq.producer; + +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.Map; +import java.util.concurrent.*; + + +*/ +/** + * @className:RabbitMQProducer + * @author:cwchen + * @date:2025-11-13-13:32 + * @version:1.0 + * @description:mq生产者 + *//* + + +@Component(value = "RabbitMQProducer") +@Slf4j +public class RabbitMQProducer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Map> pendingConfirmations = new ConcurrentHashMap<>(); + private final ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor(); + + + */ +/** + * 发送消息 - 主入口,默认使用异步确认 + *//* + + + public CompletableFuture send(String exchange, String routingKey, RabbitMqMessage message) { + return sendAsync(exchange, routingKey, message); + } + + + */ +/** + * 异步发送 + *//* + + + public CompletableFuture sendAsync(String exchange, String routingKey, RabbitMqMessage message) { + String messageId = generateMessageId(); + message.setMessageId(messageId); + + CompletableFuture future = new CompletableFuture<>(); + pendingConfirmations.put(messageId, future); + + CorrelationData correlationData = new CorrelationData(messageId); + + try { + org.springframework.amqp.core.Message amqpMessage = buildMessage(message, messageId); + rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, correlationData); + log.debug("消息发送完成,等待异步确认 - ID: {}", messageId); + // 30分钟超时 + scheduleTimeout(messageId, future, 30, TimeUnit.MINUTES); + return future; + } catch (Exception e) { + pendingConfirmations.remove(messageId); + future.completeExceptionally(e); + log.error("消息发送异常 - ID: {}", messageId, e); + return future; + } + } + + + */ +/** + * 初始化确认回调 + *//* + + + @PostConstruct + public void init() { + rabbitTemplate.setConfirmCallback(this::handleConfirm); + // 优雅关闭 + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); + } + + + */ +/** + * 处理确认回调 + *//* + + + private void handleConfirm(CorrelationData correlationData, boolean ack, String cause) { + if (correlationData == null || correlationData.getId() == null) { + return; + } + String messageId = correlationData.getId(); + CompletableFuture future = pendingConfirmations.remove(messageId); + + if (future != null) { + if (ack) { + future.complete(true); + } else { + log.error("Broker确认失败 - ID: {}, 原因: {}", messageId, cause); + future.complete(false); + } + } + } + + + */ +/** + * 调度超时 + *//* + + + private void scheduleTimeout(String messageId, CompletableFuture future, + long timeout, TimeUnit unit) { + timeoutScheduler.schedule(() -> { + if (pendingConfirmations.remove(messageId) != null && !future.isDone()) { + log.warn("消息确认超时 - ID: {}", messageId); + future.complete(false); + } + }, timeout, unit); + } + + + */ +/** + * 资源清理 + *//* + + + @PreDestroy + public void shutdown() { + timeoutScheduler.shutdown(); + try { + if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + timeoutScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + timeoutScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + + // 处理未完成的消息 + pendingConfirmations.forEach((id, future) -> { + if (!future.isDone()) { + future.complete(false); + } + }); + pendingConfirmations.clear(); + } + + */ +/** + * 生成消息ID + *//* + + + private String generateMessageId() { + return "rabbitmq_msg" + System.currentTimeMillis() + "_" + + ThreadLocalRandom.current().nextInt(100000, 999999); + } + + + */ +/** + * 构建消息对象 + *//* + + + private org.springframework.amqp.core.Message buildMessage(RabbitMqMessage message, String messageId) { + try { + String jsonStr = objectMapper.writeValueAsString(message); + return MessageBuilder + .withBody(jsonStr.getBytes(StandardCharsets.UTF_8)) + .setContentType(MessageProperties.CONTENT_TYPE_JSON) + .setContentEncoding("UTF-8") + .setMessageId(messageId) + .setTimestamp(new Date()) + .setDeliveryMode(MessageDeliveryMode.PERSISTENT) + .setHeader("source", "ocr-service") + .setHeader("businessType", extractBusinessType(message.getUploadPath())) + .build(); + } catch (Exception e) { + throw new RuntimeException("消息构建失败: " + messageId, e); + } + } + + private String extractBusinessType(String uploadPath) { + if (uploadPath == null) return "unknown"; + if (uploadPath.contains("personnelDatabase")) return "personnel"; + if (uploadPath.contains("toolsDatabase")) return "tool"; + return "other"; + } +} + +*/ diff --git a/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java new file mode 100644 index 0000000..4e9f58a --- /dev/null +++ b/bonus-admin/src/main/java/com/bonus/web/rabbitmq/producer/RabbitMQProducerService.java @@ -0,0 +1,262 @@ +package com.bonus.web.rabbitmq.producer; + +import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @className:RabbitMQProducerService + * @author:cwchen + * @date:2025-11-24-16:16 + * @version:1.0 + * @description:生产者服务 + */ +@Service(value = "RabbitMQProducerService") +@Slf4j +public class RabbitMQProducerService { + + private final RabbitTemplate rabbitTemplate; + private final RabbitMqMessageBuilder messageBuilder; + private final Map> pendingConfirmations; + + public RabbitMQProducerService(RabbitTemplate rabbitTemplate, + RabbitMqMessageBuilder messageBuilder) { + this.rabbitTemplate = rabbitTemplate; + this.messageBuilder = messageBuilder; + this.pendingConfirmations = new ConcurrentHashMap<>(); + + // 设置确认回调 + this.rabbitTemplate.setConfirmCallback(createConfirmCallback()); + // 设置返回回调(用于处理路由失败) + this.rabbitTemplate.setReturnsCallback(createReturnsCallback()); + } + + /** + * 异步发送消息 - 基础版本 + */ + public CompletableFuture sendMessageAsync(String taskName, String uploadPath) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 异步发送消息 - 带自定义ID + */ + public CompletableFuture sendMessageWithCustomIdAsync(String taskName, + String uploadPath, + String customMessageId) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 异步发送OCR处理消息 + */ + public CompletableFuture sendOcrMessageAsync(String filePath, String taskId) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送OCR消息失败: {}", e.getMessage(), e); + throw new RuntimeException("OCR消息发送失败", e); + } + }); + } + + /** + * 异步发送带重试配置的消息 + */ + public CompletableFuture sendMessageWithRetryAsync(String taskName, + String uploadPath, + int maxRetryCount) { + return CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + }); + } + + /** + * 批量异步发送消息 + */ + public CompletableFuture> sendBatchMessagesAsync(List tasks) { + return CompletableFuture.supplyAsync(() -> { + List messageIds = new ArrayList<>(); + List> futures = new ArrayList<>(); + + for (MessageTask task : tasks) { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath()); + return sendMessageWithConfirmation(message); + } catch (Exception e) { + log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e); + return null; + } + }); + futures.add(future); + } + + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + for (CompletableFuture future : futures) { + try { + String messageId = future.get(); + if (messageId != null) { + messageIds.add(messageId); + } + } catch (Exception e) { + log.error("获取异步发送结果失败", e); + } + } + + log.info("批量发送完成,成功发送 {} 条消息", messageIds.size()); + return messageIds; + }); + } + + /** + * 发送消息并等待确认 + */ + private String sendMessageWithConfirmation(RabbitMqMessage message) { + String messageId = message.getMessageId(); + CorrelationData correlationData = new CorrelationData(messageId); + + // 创建确认Future + CompletableFuture confirmationFuture = new CompletableFuture<>(); + + // 保存到全局的Map中,确保确认回调可以访问 + pendingConfirmations.put(messageId, confirmationFuture); + + try { + log.info("准备发送消息 - ID: {}, 任务: {}, 路径: {}", + messageId, message.getTaskName(), message.getUploadPath()); + + // 发送消息 + rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); + + log.info("消息已发送到Broker - ID: {}", messageId); + + // 等待确认(带超时) + CorrelationData result = confirmationFuture.get(10, TimeUnit.SECONDS); + + log.info("消息发送确认成功 - ID: {}", messageId); + return messageId; + + } catch (TimeoutException e) { + log.error("消息确认超时 - ID: {}", messageId, e); + // 超时时移除对应的Future + pendingConfirmations.remove(messageId); + throw new RuntimeException("消息确认超时", e); + } catch (Exception e) { + log.error("消息发送异常 - ID: {}", messageId, e); + pendingConfirmations.remove(messageId); + throw new RuntimeException("消息发送异常", e); + } + } + + /** + * 创建确认回调 + */ + private RabbitTemplate.ConfirmCallback createConfirmCallback() { + return (correlationData, ack, cause) -> { + if (correlationData == null) { + log.error("确认回调中correlationData为null"); + return; + } + + String messageId = correlationData.getId(); + if (ack) { + log.debug("消息发送确认成功 - ID: {}", messageId); + // 这里可以更新消息状态为已确认 + } else { + log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, cause); + // 这里可以处理发送失败的消息 + } + }; + } + + /** + * 创建返回回调(处理路由失败) + */ + private RabbitTemplate.ReturnsCallback createReturnsCallback() { + return returned -> { + log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}", + returned.getExchange(), returned.getRoutingKey(), + returned.getReplyCode(), returned.getReplyText()); + + // 如果有CorrelationData,可以在这里处理 + if (returned.getMessage().getMessageProperties().getCorrelationId() != null) { + String messageId = returned.getMessage().getMessageProperties().getCorrelationId(); + CompletableFuture future = pendingConfirmations.remove(messageId); + if (future != null) { + future.completeExceptionally(new RuntimeException("消息路由失败: " + returned.getReplyText())); + } + } + }; + } + + /** + * 同步发送消息(备用方法) + */ + public String sendMessageSync(RabbitMqMessage message) { + try { + String messageId = message.getMessageId(); + CorrelationData correlationData = new CorrelationData(messageId); + + log.info("同步发送消息 - ID: {}, 任务: {}", messageId, message.getTaskName()); + + rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData); + + log.info("同步发送完成 - ID: {}", messageId); + return messageId; + } catch (Exception e) { + log.error("同步发送消息失败: {}", e.getMessage(), e); + throw new RuntimeException("消息发送失败", e); + } + } + + /** + * 任务数据类 + */ + @Data + @AllArgsConstructor + public static class MessageTask { + private String taskName; + private String uploadPath; + } +} diff --git a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java index 6dcb7d0..39a15dd 100644 --- a/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java +++ b/bonus-admin/src/main/java/com/bonus/web/service/analysis/AnalysisService.java @@ -5,9 +5,16 @@ import com.bonus.analysis.service.IASAnalysisService; import com.bonus.common.core.domain.AjaxResult; import com.bonus.common.domain.analysis.dto.AnalysisDto; import com.bonus.common.domain.analysis.vo.AnalysisVo; +import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto; import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage; +import com.bonus.common.utils.ValidatorsUtils; +import com.bonus.common.utils.uuid.UUID; +import com.bonus.web.rabbitmq.producer.RabbitMQProducerService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.interceptor.TransactionAspectSupport; import javax.annotation.Resource; import java.util.List; @@ -36,6 +43,12 @@ public class AnalysisService { @Resource(name = "IASAnalysisService") private IASAnalysisService analysisService; + @Resource(name = "ValidatorsUtils") + private ValidatorsUtils validatorsUtils; + + @Resource(name = "RabbitMQProducerService") + private RabbitMQProducerService rabbitMQProducerService; + public AjaxResult testAsyncMq() { /*taskExecutor.submit(() -> { @@ -63,6 +76,9 @@ public class AnalysisService { } });*/ CompletableFuture.runAsync(() -> { + String taskId = UUID.randomUUID().toString(); + String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png"; + rabbitMQProducerService.sendOcrMessageAsync(uploadPath,taskId); }, taskExecutor); return AjaxResult.success(); } @@ -78,5 +94,34 @@ public class AnalysisService { public List getList(AnalysisDto dto) { return analysisService.getList(dto); } + + /** + * 招标解析->新建项目 + * @param dto + * @return AjaxResult + * @author cwchen + * @date 2025/11/24 13:54 + */ + @Transactional(rollbackFor = Exception.class) + public AjaxResult saveData(AnalysisDto.TemplateDto dto) { + try { + // 校验数据是否合法 + String validResult = validatorsUtils.valid(dto, AnalysisDto.TemplateDto.ADD.class); + if (StringUtils.isNotBlank(validResult)) { + return AjaxResult.error(validResult); + } + // 保存项目数据 + analysisService.addProData(dto); + // 同步解析规则数据 + // 执行异步解析任务 + CompletableFuture.runAsync(() -> { + }, taskExecutor); + return AjaxResult.success(); + } catch (Exception e) { + log.error(e.toString(),e); + TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); + return AjaxResult.error(); + } + } } diff --git a/bonus-admin/src/main/resources/application-file.yml b/bonus-admin/src/main/resources/application-file.yml index b643632..14e2f9f 100644 --- a/bonus-admin/src/main/resources/application-file.yml +++ b/bonus-admin/src/main/resources/application-file.yml @@ -17,8 +17,8 @@ ignoreUrl: callBackUrl: /documents/callback #minio: -# url: http://192.168.31.170:9000 -# endpoint: http://192.168.31.170:9000 +# url: http://192.168.31.169:9000 +# endpoint: http://192.168.31.169:9000 # access-key: name # secret-key: password # bucket-name: smart-bid @@ -34,15 +34,16 @@ only-office: url: http://192.168.0.39:8080/documents/only/office/download permissions: # 是否可以编辑 - edit: true - print: true + edit: false + print: false download: true # 是否可以填写表格,如果将mode参数设置为edit,则填写表单仅对文档编辑器可用。 默认值与edit或review参数的值一致。 fillForms: false # 跟踪变化 - review: true + review: false editorConfig: # onlyoffice回调接口,这个接口也需要在springboot后端中实现 + mode: view callbackUrl: http://192.168.0.39:8080/smartBid/documents/callback lang: zh-CN coEditing: diff --git a/bonus-admin/src/main/resources/application-rabbitmq.yml b/bonus-admin/src/main/resources/application-rabbitmq.yml index 0c33ce8..d09b34e 100644 --- a/bonus-admin/src/main/resources/application-rabbitmq.yml +++ b/bonus-admin/src/main/resources/application-rabbitmq.yml @@ -1,63 +1,26 @@ spring: rabbitmq: - # 连接配置 - host: localhost - port: 15672 + host: 192.168.0.14 + port: 5672 username: guest password: guest virtual-host: / - - # 连接超时和心跳 - connection-timeout: 30000 - requested-heartbeat: 60 - - # 通道缓存配置 - cache: - channel: - size: 25 - checkout-timeout: 2000 - - # 发布确认 - publisher-confirm-type: correlated - publisher-returns: true - - # 模板配置 + # 生产者配置 + publisher-confirm-type: correlated # 消息确认 + publisher-returns: true # 开启返回模式 template: - mandatory: true - receive-timeout: 30000 - reply-timeout: 30000 + mandatory: true # 消息无法路由时返回给生产者 retry: - enabled: true - initial-interval: 2000 - max-attempts: 3 - multiplier: 1.5 - max-interval: 10000 - - # 监听器配置 + enabled: true # 启用重试 + initial-interval: 1000ms # 初始间隔 + max-attempts: 3 # 最大重试次数 + multiplier: 2.0 # 倍数 + # 消费者配置 listener: - type: simple simple: - acknowledge-mode: auto - concurrency: 2 - max-concurrency: 10 - prefetch: 1 - # 重要:解决队列检查超时问题 - missing-queues-fatal: false - auto-startup: true - default-requeue-rejected: false - retry: - enabled: true - initial-interval: 3000 - max-attempts: 3 - max-interval: 10000 - multiplier: 2.0 - -# Actuator 配置 -management: - endpoints: - web: - exposure: - include: health,info,metrics - endpoint: - health: - show-details: always \ No newline at end of file + acknowledge-mode: manual # 手动确认 + prefetch: 10 # 预取数量 + concurrency: 3 # 最小消费者数量 + max-concurrency: 5 # 最大消费者数量 + missing-queues-fatal: false # 队列不存在时不致命 + auto-startup: true # 自动启动 \ No newline at end of file diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java b/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java index 9b1ad5e..1dd18e8 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/mapper/IASAnalysisMapper.java @@ -23,4 +23,13 @@ public interface IASAnalysisMapper { * @date 2025/11/24 11:17 */ List getList(AnalysisDto dto); + + /** + * 保存项目数据 + * @param dto + * @return void + * @author cwchen + * @date 2025/11/24 14:05 + */ + void addProData(AnalysisDto.TemplateDto dto); } diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java b/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java index 7555379..5b773a7 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/service/IASAnalysisService.java @@ -22,4 +22,13 @@ public interface IASAnalysisService { * @date 2025/11/24 11:06 */ List getList(AnalysisDto dto); + + /** + * 保存项目数据 + * @param dto + * @return void + * @author cwchen + * @date 2025/11/24 14:04 + */ + void addProData(AnalysisDto.TemplateDto dto); } diff --git a/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java b/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java index e7a9f4a..073c137 100644 --- a/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java +++ b/bonus-analysis/src/main/java/com/bonus/analysis/service/impl/ASAnalysisServiceImpl.java @@ -29,4 +29,9 @@ public class ASAnalysisServiceImpl implements IASAnalysisService { public List getList(AnalysisDto dto) { return analysisMapper.getList(dto); } + + @Override + public void addProData(AnalysisDto.TemplateDto dto) { + analysisMapper.addProData(dto); + } } diff --git a/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml b/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml index acded39..580376c 100644 --- a/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml +++ b/bonus-analysis/src/main/resources/mapper/AnalysisMapper.xml @@ -30,4 +30,13 @@ ORDER BY tp.create_time DESC + + + + INSERT INTO tb_pro(template_id,create_user_id,create_user_name,update_user_id, + update_user_name,analysis_status) + VALUES ( + #{templateId},#{createUserId},#{createUserName},#{updateUserId},#{updateUserName},'0' + ) + diff --git a/bonus-common/src/main/java/com/bonus/common/config/AsyncConfig.java b/bonus-common/src/main/java/com/bonus/common/config/AsyncConfig.java index 250c56b..a892bb6 100644 --- a/bonus-common/src/main/java/com/bonus/common/config/AsyncConfig.java +++ b/bonus-common/src/main/java/com/bonus/common/config/AsyncConfig.java @@ -41,4 +41,33 @@ public class AsyncConfig { executor.initialize(); return executor; } + + /** + * RabbitMQ专用的线程池 + */ + @Bean("rabbitmqTaskExecutor") + public Executor rabbitmqTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(20); + // 最大线程数 + executor.setMaxPoolSize(100); + // 队列容量 + executor.setQueueCapacity(5000); + // 线程存活时间 + executor.setKeepAliveSeconds(120); + // 线程名称前缀 + executor.setThreadNamePrefix("RabbitMQ-Async-"); + // 拒绝策略 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 等待所有任务结束后再关闭线程池 + executor.setWaitForTasksToCompleteOnShutdown(true); + // 等待时间 + executor.setAwaitTerminationSeconds(120); + // 允许核心线程超时 + executor.setAllowCoreThreadTimeOut(true); + + executor.initialize(); + return executor; + } } diff --git a/bonus-common/src/main/java/com/bonus/common/domain/analysis/dto/AnalysisDto.java b/bonus-common/src/main/java/com/bonus/common/domain/analysis/dto/AnalysisDto.java index 71c8e73..85de904 100644 --- a/bonus-common/src/main/java/com/bonus/common/domain/analysis/dto/AnalysisDto.java +++ b/bonus-common/src/main/java/com/bonus/common/domain/analysis/dto/AnalysisDto.java @@ -1,7 +1,18 @@ package com.bonus.common.domain.analysis.dto; +import com.bonus.common.core.domain.model.LoginUser; +import com.bonus.common.domain.file.po.ResourceFilePo; +import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto; +import com.bonus.common.utils.SecurityUtils; import lombok.Data; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; +import java.util.Date; +import java.util.List; +import java.util.Optional; + /** * @className:AnalysisDto * @author:cwchen @@ -20,4 +31,63 @@ public class AnalysisDto { private String startDate; /**开标结束日期*/ private String endDate; + + private TemplateDto templateDto; + + @Data + public static class TemplateDto{ + + /** + * 项目id + * */ + private Long proId; + + @NotNull(message = "模板不能为空", groups = {ADD.class}) + private Long templateId; + + /** + * 创建人 + */ + private Long createUserId = Optional.ofNullable(SecurityUtils.getLoginUser()) + .map(LoginUser::getUserId) + .orElse(null); + ; + + /** + * 创建人姓名 + */ + private String createUserName = Optional.ofNullable(SecurityUtils.getLoginUser()) + .map(LoginUser::getUsername) + .orElse(null); + ; + + + /** + * 修改人 + */ + private Long updateUserId = Optional.ofNullable(SecurityUtils.getLoginUser()) + .map(LoginUser::getUserId) + .orElse(null); + ; + + /** + * 修改人姓名 + */ + private String updateUserName = Optional.ofNullable(SecurityUtils.getLoginUser()) + .map(LoginUser::getUsername) + .orElse(null); + + /** + * 资源文件 + */ + @NotEmpty(message = "文件不能为空", groups = {ADD.class}) + private List files; + + /** + * 新增条件限制 + */ + public interface ADD { + } + + } } diff --git a/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java new file mode 100644 index 0000000..a1c6e46 --- /dev/null +++ b/bonus-common/src/main/java/com/bonus/common/domain/rabbitmq/dto/RabbitMqMessage.java @@ -0,0 +1,259 @@ +package com.bonus.common.domain.rabbitmq.dto; + +import lombok.Data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * @className: RabbitMqMessage + * @description: RabbitMQ 消息实体类 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class RabbitMqMessage implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 消息名称/任务类型 + */ + private String taskName; + + /** + * 消息ID(唯一标识) + */ + private String messageId; + + /** + * 文件上传路径 + */ + private String uploadPath; + + /** + * 消息创建时间戳 + */ + @Builder.Default + private Long timestamp = System.currentTimeMillis(); + + /** + * 消息来源系统 + */ + private String sourceSystem; + + /** + * 消息目标系统 + */ + private String targetSystem; + + /** + * 消息优先级(0-9,数值越大优先级越高) + */ + @Builder.Default + private Integer priority = 5; + + /** + * 重试次数 + */ + @Builder.Default + private Integer retryCount = 0; + + /** + * 最大重试次数 + */ + @Builder.Default + private Integer maxRetryCount = 3; + + /** + * 消息过期时间(毫秒时间戳) + */ + private Long expirationTime; + + /** + * 业务数据(扩展字段,存储额外的业务信息) + */ + @Builder.Default + private Map businessData = new HashMap<>(); + + /** + * 消息状态 + */ + private String status; + + /** + * 错误信息(处理失败时记录) + */ + private String errorMessage; + + /** + * 处理开始时间 + */ + private Long processStartTime; + + /** + * 处理结束时间 + */ + private Long processEndTime; + + /** + * 版本号(用于乐观锁) + */ + @Builder.Default + private Integer version = 1; + + // 常用任务类型常量 + public static class TaskType { + public static final String OCR_PROCESS = "OCR_PROCESS"; + public static final String FILE_UPLOAD = "FILE_UPLOAD"; + public static final String IMAGE_PROCESS = "IMAGE_PROCESS"; + public static final String DOCUMENT_EXTRACT = "DOCUMENT_EXTRACT"; + public static final String DATA_SYNC = "DATA_SYNC"; + public static final String NOTIFICATION = "NOTIFICATION"; + } + + // 消息状态常量 + public static class Status { + public static final String PENDING = "PENDING"; + public static final String PROCESSING = "PROCESSING"; + public static final String SUCCESS = "SUCCESS"; + public static final String FAILED = "FAILED"; + public static final String RETRYING = "RETRYING"; + public static final String EXPIRED = "EXPIRED"; + } + + /** + * 添加业务数据 + */ + public void addBusinessData(String key, Object value) { + if (this.businessData == null) { + this.businessData = new HashMap<>(); + } + this.businessData.put(key, value); + } + + /** + * 获取业务数据 + */ + public Object getBusinessData(String key) { + return this.businessData != null ? this.businessData.get(key) : null; + } + + /** + * 增加重试次数 + */ + public void incrementRetryCount() { + this.retryCount++; + } + + /** + * 检查是否达到最大重试次数 + */ + public boolean isMaxRetryReached() { + return this.retryCount >= this.maxRetryCount; + } + + /** + * 检查消息是否过期 + */ + public boolean isExpired() { + return this.expirationTime != null && System.currentTimeMillis() > this.expirationTime; + } + + /** + * 设置过期时间(相对时间,单位:毫秒) + */ + public void setExpirationRelative(long ttlMillis) { + this.expirationTime = System.currentTimeMillis() + ttlMillis; + } + + /** + * 开始处理 + */ + public void startProcess() { + this.status = Status.PROCESSING; + this.processStartTime = System.currentTimeMillis(); + } + + /** + * 处理成功 + */ + public void processSuccess() { + this.status = Status.SUCCESS; + this.processEndTime = System.currentTimeMillis(); + } + + /** + * 处理失败 + */ + public void processFailed(String errorMessage) { + this.status = Status.FAILED; + this.errorMessage = errorMessage; + this.processEndTime = System.currentTimeMillis(); + } + + /** + * 重试处理 + */ + public void retryProcess() { + this.status = Status.RETRYING; + incrementRetryCount(); + } + + /** + * 计算处理耗时(毫秒) + */ + public Long getProcessDuration() { + if (processStartTime != null && processEndTime != null) { + return processEndTime - processStartTime; + } + return null; + } + + /** + * 创建简单的消息(快速构建) + */ + public static RabbitMqMessage createSimple(String taskName, String uploadPath) { + return RabbitMqMessage.builder() + .taskName(taskName) + .messageId(java.util.UUID.randomUUID().toString()) + .uploadPath(uploadPath) + .timestamp(System.currentTimeMillis()) + .status(Status.PENDING) + .build(); + } + + /** + * 创建带业务数据的消息 + */ + public static RabbitMqMessage createWithBusinessData(String taskName, String uploadPath, + Map businessData) { + return RabbitMqMessage.builder() + .taskName(taskName) + .messageId(java.util.UUID.randomUUID().toString()) + .uploadPath(uploadPath) + .timestamp(System.currentTimeMillis()) + .businessData(businessData != null ? new HashMap<>(businessData) : new HashMap<>()) + .status(Status.PENDING) + .build(); + } + + @Override + public String toString() { + return "RabbitMqMessage{" + + "taskName='" + taskName + '\'' + + ", messageId='" + messageId + '\'' + + ", uploadPath='" + uploadPath + '\'' + + ", timestamp=" + timestamp + + ", status='" + status + '\'' + + ", retryCount=" + retryCount + + ", maxRetryCount=" + maxRetryCount + + '}'; + } +}