Merge remote-tracking branch 'origin/master'

This commit is contained in:
LHD_HY 2025-11-26 18:00:24 +08:00
commit a51abeb58c
32 changed files with 3641 additions and 75 deletions

View File

@ -4,15 +4,17 @@ import com.bonus.common.annotation.RequiresPermissions;
import com.bonus.common.annotation.SysLog;
import com.bonus.common.core.controller.BaseController;
//import com.bonus.web.service.analysis.AnalysisService;
import com.bonus.common.core.domain.AjaxResult;
import com.bonus.common.core.page.TableDataInfo;
import com.bonus.common.domain.analysis.dto.AnalysisBidDto;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import com.bonus.common.enums.OperaType;
import com.bonus.web.service.analysis.AnalysisService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
@ -31,11 +33,11 @@ public class AnalysisController extends BaseController {
@Resource(name = "AnalysisService")
private AnalysisService analysisService;
/*@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息")
@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息")
@GetMapping("/testAsyncMq")
public AjaxResult testAsyncMq() {
return analysisService.testAsyncMq();
}*/
}
@ApiOperation(value = "招标解析", notes = "查询列表")
@GetMapping("getList")
@ -46,4 +48,52 @@ public class AnalysisController extends BaseController {
List<AnalysisVo> list = analysisService.getList(dto);
return getDataTable(list);
}
@ApiOperation(value = "招标解析", notes = "新建项目")
@PostMapping("saveData")
@SysLog(title = "招标解析", module = "招标解析->新建项目", businessType = OperaType.INSERT, details = "新建项目", logType = 1)
@RequiresPermissions("analysis:analysis:add")
public AjaxResult saveData(@RequestBody AnalysisDto.TemplateDto dto) {
return analysisService.saveData(dto);
}
@ApiOperation(value = "招标解析", notes = "查看标段详情")
@GetMapping("getBidList")
@SysLog(title = "招标解析", module = "招标解析->查看标段详情", businessType = OperaType.QUERY, details = "查看标段详情", logType = 1)
@RequiresPermissions("analysis:analysis:query")
public TableDataInfo getBidList(AnalysisDto dto) {
startPage();
List<AnalysisBidVo> list = analysisService.getBidList(dto);
return getDataTable(list);
}
@ApiOperation(value = "招标解析", notes = "查看项目详情")
@GetMapping("getProDetail")
@SysLog(title = "招标解析", module = "招标解析->查看项目详情", businessType = OperaType.QUERY, details = "查看项目详情", logType = 1)
@RequiresPermissions("analysis:analysis:query")
public AjaxResult getProDetail(AnalysisDto.TemplateDto dto) {
return analysisService.getProDetail(dto);
}
@ApiOperation(value = "招标解析", notes = "更新项目数据")
@PostMapping("editProData")
@SysLog(title = "招标解析", module = "招标解析->更新项目数据", businessType = OperaType.UPDATE, details = "更新项目数据", logType = 1)
@RequiresPermissions("analysis:analysis:edit")
public AjaxResult editProData(@RequestBody AnalysisProDto dto) {
return analysisService.editProData(dto);
}
@ApiOperation(value = "招标解析", notes = "更新标段数据")
@PostMapping("editBidData")
@SysLog(title = "招标解析", module = "招标解析->更新标段数据", businessType = OperaType.UPDATE, details = "更新标段数据", logType = 1)
@RequiresPermissions("analysis:analysis:edit")
public AjaxResult editBidData(@RequestBody AnalysisBidDto dto) {
return analysisService.editBidData(dto);
}
@ApiOperation(value = "测试mq异步消息", notes = "测试mq异步消息")
@GetMapping("/testAsyncMq2")
public AjaxResult testAsyncMq2() {
return analysisService.testAsyncMq2();
}
}

View File

@ -0,0 +1,69 @@
/*
package com.bonus.web.controller.rabbitmq;
import com.bonus.common.domain.rabbitmq.vo.OrderMessage;
import com.bonus.rabbitmq.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
*/
/**
* @className:RabbitMQController
* @author:cwchen
* @date:2025-11-04-18:24
* @version:1.0
* @description:
*//*
@RestController
@RequestMapping("/api/rabbitmq")
public class RabbitMQController {
@Resource(name = "MessageProducer")
private MessageProducer messageProducer;
*/
/**
* 发送测试消息
*//*
@PostMapping("/send")
public ResponseEntity<String> sendMessage(@RequestBody Map<String, String> request) {
String productName = request.get("productName");
BigDecimal amount = new BigDecimal(request.get("amount"));
String orderId = "ORD" + System.currentTimeMillis();
OrderMessage orderMessage = new OrderMessage(orderId, productName, amount);
boolean success = messageProducer.sendOrderMessage(orderMessage);
if (success) {
return ResponseEntity.ok("消息发送成功: " + orderId);
} else {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("消息发送失败");
}
}
*/
/**
* 批量发送消息
*//*
@PostMapping("/send-batch")
public ResponseEntity<String> sendBatchMessages(@RequestParam(defaultValue = "5") int count) {
for (int i = 1; i <= count; i++) {
String orderId = "ORD" + System.currentTimeMillis() + "_" + i;
OrderMessage orderMessage = new OrderMessage(orderId, "产品" + i,
new BigDecimal(100 + i * 10));
messageProducer.sendOrderMessage(orderMessage);
}
return ResponseEntity.ok("批量发送 " + count + " 条消息完成");
}
}*/

View File

@ -0,0 +1,226 @@
package com.bonus.web.rabbitmq.config;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* @className:RabbitMQConfig
* @author:cwchen
* @date:2025-11-13-13:30
* @version:1.0
* @description:rabbitmq配置类 - 手动确认模式
*/
@Configuration
@EnableRetry
@Slf4j
public class RabbitMQConfig {
/**
* JSON消息转换器
*/
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper());
return converter;
}
/**
* 类型映射器
*/
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("rabbitMqMessage", RabbitMqMessage.class);
classMapper.setIdClassMapping(idClassMapping);
classMapper.setDefaultType(Map.class);
return classMapper;
}
/**
* 声明主队列交换机绑定
*/
@Bean
public Declarables declarables() {
// 主队列
Queue mainQueue = new Queue("myQueue", true, false, false);
DirectExchange mainExchange = new DirectExchange("myExchange", true, false);
Binding mainBinding = BindingBuilder.bind(mainQueue)
.to(mainExchange)
.with("myRoutingKey");
log.info("声明RabbitMQ组件 - 主队列: {}, 主交换机: {}, 路由键: {}",
"myQueue", "myExchange", "myRoutingKey");
return new Declarables(mainQueue, mainExchange, mainBinding);
}
/**
* 错误处理器 Bean - 用于手动确认模式下的异常处理
*/
@Bean
public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
return (amqpMessage, message, exception) -> {
log.error("消息处理失败,将拒绝消息并重新入队: {}", exception.getMessage(), exception);
// 返回原始消息让重试机制处理
throw exception;
};
}
/**
* 多消费者容器工厂 - 手动确认模式
*/
@Bean
public SimpleRabbitListenerContainerFactory multiConsumerFactory(
ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter);
// 更改为手动确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMissingQueuesFatal(false);
factory.setAutoStartup(true);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(5);
factory.setPrefetchCount(10); // 每个消费者预取的消息数量
// 配置错误处理器
factory.setErrorHandler(new ConditionalRejectingErrorHandler(new FatalExceptionStrategy() {
@Override
public boolean isFatal(Throwable t) {
// 对于业务异常不认为是致命错误
if (t instanceof RuntimeException) {
String message = t.getMessage();
if (message != null && (message.contains("消息处理失败") ||
message.contains("OCR") ||
message.contains("Minio"))) {
return false;
}
}
// 其他异常直接拒绝
return false;
}
}));
// 使用简单的拒绝重试策略 - 重试后直接拒绝消息
MessageRecoverer messageRecoverer = new RejectAndDontRequeueRecoverer();
// 创建重试拦截器
RetryOperationsInterceptor interceptor = RetryInterceptorBuilder.stateless()
.maxAttempts(3) // 总共执行3次1次原始 + 2次重试
.backOffOptions(2000L, 2.0, 10000L) // 初始2秒倍数2最大10秒
.recoverer(messageRecoverer)
.build();
factory.setAdviceChain(interceptor);
log.info("多消费者容器工厂配置完成 - 使用手动确认模式");
log.info("重试配置最多重试2次初始延迟2秒最大延迟10秒");
return factory;
}
/**
* RabbitTemplate主配置
*/
@Bean
@Primary
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 基础配置
rabbitTemplate.setMessageConverter(jsonMessageConverter);
rabbitTemplate.setMandatory(true);
// 设置重试模板用于发送消息的重试
rabbitTemplate.setRetryTemplate(createRabbitTemplateRetryTemplate());
// 返回回调
/*rabbitTemplate.setReturnsCallback(returned -> {
log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回应码: {}, 回应信息: {}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText());
});*/
log.info("RabbitTemplate配置完成");
return rabbitTemplate;
}
/**
* 创建 RabbitTemplate 专用的重试模板
*/
private RetryTemplate createRabbitTemplateRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 发送消息的重试策略最多重试2次
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
// 退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000L);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(5000L);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
@Bean
@Primary
public CachingConnectionFactory connectionFactory(
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.virtual-host}") String virtualHost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
connectionFactory.setChannelCacheSize(25);
connectionFactory.setChannelCheckoutTimeout(2000);
log.info("RabbitMQ连接工厂配置完成 - 主机: {}, 端口: {}", host, port);
return connectionFactory;
}
}

View File

@ -0,0 +1,367 @@
/*
package com.bonus.web.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.OcrService;
import com.bonus.web.rabbitmq.config.RabbitMQConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
*/
/**
* @className:RabbitMQConsumer
* @author:cwchen
* @date:2025-11-13-13:32
* @version:1.0
* @description:mq消费者
*//*
@Component(value = "RabbitMQConsumer")
@Slf4j
public class RabbitMQConsumer {
@Resource
private OcrService ocrService;
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
private final ObjectMapper objectMapper = new ObjectMapper();
*/
/**
* 主队列消息处理
* 优化确保异常处理逻辑正确消息确认机制清晰
* 关键在手动确认模式下成功时手动ACK失败时抛出异常让重试机制处理
*//*
@RabbitListener(queues = "myQueue", containerFactory = "multiConsumerFactory")
public void handleMessage(org.springframework.amqp.core.Message amqpMessage,
Channel channel) throws IOException {
RabbitMqMessage rabbitMq = null;
long deliveryTag = 0;
String messageId = "unknown";
try {
// 解析消息
rabbitMq = parseMessage(amqpMessage);
deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
messageId = rabbitMq.getMessageId();
log.info("开始处理消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
// 业务处理
processBusiness(rabbitMq);
// 处理成功 - 确认消息
channel.basicAck(deliveryTag, false);
log.info("✅ 消息处理完成并确认 - ID: {}", messageId);
} catch (Exception e) {
log.error("❌ 消息处理异常 - ID: {}, deliveryTag: {}, 异常信息: {}",
messageId, deliveryTag, e.getMessage(), e);
// 检查是否是 AmqpRejectAndDontRequeueException重试次数耗尽消息已发送到死信队列
AmqpRejectAndDontRequeueException rejectException = findRejectException(e);
if (rejectException != null) {
// 关键修复消息已发送到死信队列需要手动拒绝原消息
log.error("🚫 重试次数耗尽,消息已发送到死信队列,准备拒绝原消息 - ID: {}, deliveryTag: {}",
messageId, deliveryTag);
try {
if (deliveryTag > 0) {
// 手动拒绝消息NACK不重新入队
// requeue=false 确保消息不会重新入队服务重启后不会重新处理
channel.basicNack(deliveryTag, false, false);
log.error("✅ 原消息已永久拒绝并从主队列中删除 - ID: {}, deliveryTag: {}。服务重启后不会重新处理此消息。",
messageId, deliveryTag);
// 消息已经被处理发送到死信队列并永久拒绝不重新抛出异常
return;
} else {
log.error("❌ deliveryTag 无效,无法拒绝消息 - ID: {}, deliveryTag: {}",
messageId, deliveryTag);
}
} catch (IOException ioException) {
log.error("❌ 拒绝消息失败 - ID: {}, deliveryTag: {}, 异常: {}",
messageId, deliveryTag, ioException.getMessage(), ioException);
}
// 即使拒绝失败也不重新抛出异常避免重复处理
return;
}
// 其他异常抛出异常触发重试机制
throw new RuntimeException("消息处理失败: " + e.getMessage(), e);
}
}
*/
/**
* 递归查找 AmqpRejectAndDontRequeueException
*//*
private AmqpRejectAndDontRequeueException findRejectException(Throwable e) {
if (e == null) {
return null;
}
if (e instanceof AmqpRejectAndDontRequeueException) {
return (AmqpRejectAndDontRequeueException) e;
}
// 检查异常消息中是否包含死信队列相关的关键字
String errorMessage = e.getMessage();
if (errorMessage != null &&
(errorMessage.contains("死信队列") ||
errorMessage.contains("重试次数耗尽") ||
errorMessage.contains("消息已发送到死信队列"))) {
// 递归查找异常链
Throwable cause = e.getCause();
int depth = 0;
while (cause != null && depth < 10) {
if (cause instanceof AmqpRejectAndDontRequeueException) {
return (AmqpRejectAndDontRequeueException) cause;
}
cause = cause.getCause();
depth++;
}
}
// 递归查找 cause
return findRejectException(e.getCause());
}
*/
/**
* 死信队列消息处理 - 监听重试次数耗尽的消息
*//*
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE, containerFactory = "deadLetterContainerFactory")
public void handleDeadLetterMessage(org.springframework.amqp.core.Message amqpMessage,
Channel channel) throws IOException {
RabbitMqMessage rabbitMq = parseMessage(amqpMessage);
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
String messageId = rabbitMq.getMessageId();
try {
log.error("🚨 收到死信消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
// 处理死信消息记录日志发送告警人工干预等
processDeadLetterMessage(rabbitMq, amqpMessage);
// 确认死信消息
channel.basicAck(deliveryTag, false);
log.info("死信消息处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("死信消息处理异常 - ID: {}", messageId, e);
// 死信消息处理失败可以选择重新入队或丢弃
channel.basicNack(deliveryTag, false, false);
}
}
*/
/**
* 处理死信消息
*//*
private void processDeadLetterMessage(RabbitMqMessage message, org.springframework.amqp.core.Message amqpMessage) {
String messageId = message.getMessageId();
try {
// 1. 记录死信消息到数据库或日志文件
logDeadLetterToDatabase(message);
// 2. 发送告警通知
sendDeadLetterAlert(message);
// 3. 记录详细错误信息
Map<String, Object> deadLetterInfo = collectDeadLetterInfo(message, amqpMessage);
log.error("死信消息详情 - ID: {}, 信息: {}", messageId, deadLetterInfo);
} catch (Exception e) {
log.error("处理死信消息时发生异常 - ID: {}", messageId, e);
}
}
*/
/**
* 记录死信消息到数据库示例
*//*
private void logDeadLetterToDatabase(RabbitMqMessage message) {
// 这里可以集成数据库操作记录死信消息
// 例如messageId, taskName, uploadPath, 失败时间, 失败原因等
log.warn("记录死信消息到数据库 - ID: {}, 任务: {}",
message.getMessageId(), message.getTaskName());
}
*/
/**
* 发送死信告警
*//*
private void sendDeadLetterAlert(RabbitMqMessage message) {
try {
String alertMessage = String.format(
"🚨 OCR处理死信告警\n" +
"消息ID: %s\n" +
"任务名称: %s\n" +
"文件路径: %s\n" +
"时间: %s\n" +
"请及时处理!",
message.getMessageId(),
message.getTaskName(),
message.getUploadPath(),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
);
log.error("死信告警: {}", alertMessage);
// 这里可以集成实际的告警系统
// 例如发送邮件短信钉钉企业微信等
// alertService.send(alertMessage);
} catch (Exception e) {
log.error("发送死信告警失败", e);
}
}
*/
/**
* 收集死信消息信息
*//*
private Map<String, Object> collectDeadLetterInfo(RabbitMqMessage message,
org.springframework.amqp.core.Message amqpMessage) {
Map<String, Object> info = new HashMap<>();
info.put("messageId", message.getMessageId());
info.put("taskName", message.getTaskName());
info.put("uploadPath", message.getUploadPath());
info.put("deadLetterTime", new Date());
info.put("originalHeaders", amqpMessage.getMessageProperties().getHeaders());
return info;
}
private RabbitMqMessage parseMessage(org.springframework.amqp.core.Message message) {
try {
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
RabbitMqMessage rabbitMq = objectMapper.readValue(messageBody, RabbitMqMessage.class);
if (rabbitMq.getMessageId() == null) {
String messageId = message.getMessageProperties().getMessageId();
rabbitMq.setMessageId(messageId != null ? messageId : generateMessageId());
}
return rabbitMq;
} catch (Exception e) {
log.error("消息解析失败", e);
throw new RuntimeException("消息解析失败", e);
}
}
private void processBusiness(RabbitMqMessage message) {
String uploadPath = message.getUploadPath();
validateUploadPath(uploadPath);
File fileFromMinio = getFileFromMinio(uploadPath);
OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
processOcrResult(message, ocrResponse);
}
private void validateUploadPath(String uploadPath) {
if (StringUtils.isBlank(uploadPath)) {
throw new RuntimeException("文件路径不能为空");
}
}
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new RuntimeException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new RuntimeException("获取Minio文件失败: " + uploadPath, e);
}
}
private OcrResponse performOcrRecognition(File file) {
try {
// OcrRequest ocrRequest = buildOcrRequest(file);
// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
OcrResponse ocrResponse = null;
// 修复检查 OCR 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new RuntimeException("OCR服务返回结果为空");
}
if (!isOcrResponseValid(ocrResponse)) {
throw new RuntimeException("OCR识别结果无效");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
private OcrRequest buildOcrRequest(File file) {
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType("image/png");
ocrRequest.setFields_json("姓名,公民身份号码");
return ocrRequest;
}
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
}
public void processOcrResult(RabbitMqMessage originalMessage, OcrResponse ocrResponse) {
try {
log.info("处理OCR结果 - 消息ID: {}, 任务: {}",
originalMessage.getMessageId(), originalMessage.getTaskName());
// 业务逻辑...
log.info("OCR结果处理完成 - 消息ID: {}", originalMessage.getMessageId());
} catch (Exception e) {
log.error("OCR结果处理失败 - 消息ID: {}", originalMessage.getMessageId(), e);
throw new RuntimeException("OCR结果处理失败", e);
}
}
private String generateMessageId() {
return "gen_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000);
}
}
*/

View File

@ -0,0 +1,370 @@
package com.bonus.web.rabbitmq.consumer;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.OcrResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.file.config.MinioConfig;
import com.bonus.file.util.MinioUtil;
import com.bonus.ocr.service.OcrService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
/**
* @className:RabbitMQConsumerService
* @author:cwchen
* @date:2025-11-24-16:19
* @version:1.0
* @description:消费者服务
*/
@Component(value = "")
@Slf4j
public class RabbitMQConsumerService {
@Resource
private OcrService ocrService;
@Resource
private MinioConfig minioConfig;
@Resource
private MinioUtil minioUtil;
/**
* 主消息消费者 - 使用手动确认模式
*/
@RabbitListener(
queues = "myQueue",
containerFactory = "multiConsumerFactory",
errorHandler = "rabbitListenerErrorHandler"
)
public void handleMessage(RabbitMqMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("开始处理消息 - ID: {}, 任务: {}, 投递标签: {}",
messageId, taskName, deliveryTag);
try {
// 根据任务类型进行不同的处理
switch (taskName) {
case "FILE_UPLOAD":
processFileUpload(message);
break;
case "OCR_PROCESS":
processOcrTask(message);
break;
case "IMAGE_PROCESS":
processImage(message);
break;
case "DOCUMENT_EXTRACT":
processDocumentExtract(message);
break;
default:
processDefaultTask(message);
break;
}
// 处理成功手动确认消息
channel.basicAck(deliveryTag, false);
log.info("消息处理完成并确认 - ID: {}, 投递标签: {}", messageId, deliveryTag);
} catch (BusinessException e) {
// 业务异常记录日志但不重试直接拒绝消息并不重新入队
log.error("业务异常,拒绝消息并不重新入队 - ID: {}", messageId, e);
try {
channel.basicReject(deliveryTag, false);
} catch (IOException ioException) {
log.error("拒绝消息失败 - ID: {}", messageId, ioException);
}
} catch (Exception e) {
// 其他异常直接抛出异常让重试拦截器处理
// 关键修复不要手动调用 basicReject(deliveryTag, true)否则会绕过重试拦截器的重试次数限制
// 重试拦截器会在重试次数耗尽后调用 RejectAndDontRequeueRecoverer它会拒绝消息并不重新入队
log.error("处理消息异常,将触发重试机制 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
// 直接抛出异常让重试拦截器处理重试逻辑
throw new RuntimeException("消息处理失败,需要重试: " + e.getMessage(), e);
}
}
/**
* 处理文件上传任务
*/
private void processFileUpload(RabbitMqMessage message) {
String messageId = message.getMessageId();
String uploadPath = message.getUploadPath();
log.info("处理文件上传任务 - ID: {}, 路径: {}", messageId, uploadPath);
// 模拟业务处理
try {
// 1. 验证文件
validateFile(uploadPath);
// 2. 处理文件
processUploadedFile(uploadPath);
// 3. 更新处理状态
updateProcessStatus(messageId, "COMPLETED");
log.info("文件上传任务处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("文件上传任务处理失败 - ID: {}", messageId, e);
throw new BusinessException("文件处理失败", e);
}
}
/**
* 处理OCR任务
*/
private void processOcrTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String filePath = message.getUploadPath();
String taskId = (String) message.getBusinessData().get("taskId");
log.info("处理OCR任务 - ID: {}, 任务ID: {}, 文件: {}", messageId, taskId, filePath);
try {
// 1. 检查OCR服务可用性
checkOcrServiceAvailability();
// 2. 执行OCR处理
String ocrResult = performOcrProcessing(filePath);
// 3. 保存OCR结果
saveOcrResult(taskId, ocrResult);
// 4. 更新任务状态
updateOcrTaskStatus(taskId, "SUCCESS");
log.info("OCR任务处理完成 - ID: {}, 任务ID: {}", messageId, taskId);
} catch (OcrServiceException e) {
log.error("OCR服务异常 - ID: {}", messageId, e);
throw new BusinessException("OCR处理失败", e);
} catch (Exception e) {
log.error("OCR任务处理异常 - ID: {}", messageId, e);
throw new RuntimeException("OCR处理异常需要重试", e);
}
}
/**
* 处理图片处理任务
*/
private void processImage(RabbitMqMessage message) {
String messageId = message.getMessageId();
String imagePath = message.getUploadPath();
String processType = (String) message.getBusinessData().get("processType");
log.info("处理图片任务 - ID: {}, 类型: {}, 路径: {}", messageId, processType, imagePath);
try {
// 模拟图片处理
Thread.sleep(1000);
// 这里添加实际的图片处理逻辑
processImageFile(imagePath, processType);
log.info("图片处理完成 - ID: {}", messageId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("图片处理被中断", e);
} catch (Exception e) {
log.error("图片处理失败 - ID: {}", messageId, e);
throw new RuntimeException("图片处理失败", e);
}
}
/**
* 处理文档提取任务
*/
private void processDocumentExtract(RabbitMqMessage message) {
String messageId = message.getMessageId();
String documentPath = message.getUploadPath();
String extractType = (String) message.getBusinessData().get("extractType");
log.info("处理文档提取任务 - ID: {}, 类型: {}, 路径: {}", messageId, extractType, documentPath);
try {
// 模拟文档提取处理
extractDocumentContent(documentPath, extractType);
log.info("文档提取完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("文档提取失败 - ID: {}", messageId, e);
throw new RuntimeException("文档提取失败", e);
}
}
/**
* 处理默认任务
*/
private void processDefaultTask(RabbitMqMessage message) {
String messageId = message.getMessageId();
String taskName = message.getTaskName();
log.info("处理默认任务 - ID: {}, 任务名: {}", messageId, taskName);
// 模拟业务处理
try {
// 这里添加实际的任务处理逻辑
performBusinessLogic(message);
log.info("默认任务处理完成 - ID: {}", messageId);
} catch (Exception e) {
log.error("默认任务处理失败 - ID: {}", messageId, e);
throw new RuntimeException("任务处理失败", e);
}
}
// 以下为模拟的业务方法实现
private void validateFile(String filePath) {
// 模拟文件验证
if (filePath == null || filePath.isEmpty()) {
throw new BusinessException("文件路径不能为空");
}
log.debug("文件验证通过: {}", filePath);
}
private void processUploadedFile(String filePath) {
// 模拟文件处理
log.debug("处理上传文件: {}", filePath);
// 实际处理逻辑...
}
private void updateProcessStatus(String messageId, String status) {
// 模拟更新处理状态
log.debug("更新处理状态 - ID: {}, 状态: {}", messageId, status);
}
private void checkOcrServiceAvailability() {
// 模拟OCR服务检查
log.debug("检查OCR服务可用性");
// 实际检查逻辑...
}
private String performOcrProcessing(String filePath) {
// 模拟OCR处理
log.debug("执行OCR处理: {}", filePath);
String uploadPath = filePath;
// File fileFromMinio = getFileFromMinio(uploadPath);
File fileFromMinio = new File("C:\\Users\\10488\\Desktop\\12.pdf");
OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
log.info("ocrResponse响应结果:{}", ocrResponse);
return "OCR处理结果";
}
private OcrResponse performOcrRecognition(File file) {
try {
OcrRequest ocrRequest = buildOcrRequest(file);
// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
OcrResponse ocrResponse = null;
// 修复检查 OCR 响应是否为 null
if (Objects.isNull(ocrResponse)) {
throw new BusinessException("OCR服务返回结果为空");
}
if (!isOcrResponseValid(ocrResponse)) {
throw new BusinessException("OCR识别结果无效");
}
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
return ocrResponse;
} catch (Exception e) {
log.error("OCR识别失败", e);
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
}
}
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
}
private File getFileFromMinio(String uploadPath) {
try {
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
if (file == null || !file.exists()) {
throw new OcrServiceException("Minio文件不存在: " + uploadPath);
}
return file;
} catch (Exception e) {
throw new OcrServiceException("获取Minio文件失败: " + uploadPath, e);
}
}
private OcrRequest buildOcrRequest(File file) {
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(file);
ocrRequest.setType("application/pdf");
ocrRequest.setFields_json("项目名称,合同签订时间,合同金额,建设地点,建设单位,建设单位电话,建设单位开户行,建设单位账号");
return ocrRequest;
}
private void saveOcrResult(String taskId, String result) {
// 模拟保存OCR结果
log.debug("保存OCR结果 - 任务ID: {}", taskId);
}
private void updateOcrTaskStatus(String taskId, String status) {
// 模拟更新OCR任务状态
log.debug("更新OCR任务状态 - 任务ID: {}, 状态: {}", taskId, status);
}
private void processImageFile(String imagePath, String processType) {
// 模拟图片处理
log.debug("处理图片文件 - 路径: {}, 类型: {}", imagePath, processType);
}
private void extractDocumentContent(String documentPath, String extractType) {
// 模拟文档提取
log.debug("提取文档内容 - 路径: {}, 类型: {}", documentPath, extractType);
}
private void performBusinessLogic(RabbitMqMessage message) {
// 模拟业务逻辑执行
log.debug("执行业务逻辑 - 消息ID: {}", message.getMessageId());
}
/**
* 自定义业务异常
*/
public static class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
public BusinessException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* OCR服务异常
*/
public static class OcrServiceException extends RuntimeException {
public OcrServiceException(String message) {
super(message);
}
public OcrServiceException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@ -0,0 +1,141 @@
package com.bonus.web.rabbitmq.message;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* @className: RabbitMqMessageBuilder
* @description: RabbitMQ 消息构建器
*/
@Component
@Slf4j
public class RabbitMqMessageBuilder {
/**
* 构建基础消息
*/
public RabbitMqMessage buildBaseMessage(String taskName, String uploadPath) {
return RabbitMqMessage.createSimple(taskName, uploadPath);
}
/**
* 构建带自定义消息ID的消息
*/
public RabbitMqMessage buildMessageWithCustomId(String taskName, String uploadPath, String customMessageId) {
return RabbitMqMessage.builder()
.messageId(customMessageId)
.taskName(taskName)
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建文件处理消息
*/
public RabbitMqMessage buildFileProcessMessage(String filePath) {
return RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.FILE_UPLOAD, filePath);
}
/**
* 构建OCR处理消息
*/
public RabbitMqMessage buildOcrProcessMessage(String filePath, String taskId) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.OCR_PROCESS, filePath);
message.addBusinessData("taskId", taskId);
message.addBusinessData("processType", "TEXT_EXTRACTION");
return message;
}
/**
* 构建带重试配置的消息
*/
public RabbitMqMessage buildMessageWithRetryConfig(String taskName, String uploadPath, int maxRetryCount) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.maxRetryCount(maxRetryCount)
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建高优先级消息
*/
public RabbitMqMessage buildHighPriorityMessage(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.priority(9) // 最高优先级
.status(RabbitMqMessage.Status.PENDING)
.build();
}
/**
* 构建带过期时间的消息
*/
public RabbitMqMessage buildMessageWithTtl(String taskName, String uploadPath, long ttlMillis) {
RabbitMqMessage message = RabbitMqMessage.createSimple(taskName, uploadPath);
message.setExpirationRelative(ttlMillis);
return message;
}
/**
* 构建完整业务消息
*/
public RabbitMqMessage buildBusinessMessage(String taskName, String uploadPath,
String sourceSystem, String targetSystem,
Map<String, Object> businessData) {
RabbitMqMessage message = RabbitMqMessage.createWithBusinessData(taskName, uploadPath, businessData);
message.setSourceSystem(sourceSystem);
message.setTargetSystem(targetSystem);
return message;
}
/**
* 构建图片处理消息
*/
public RabbitMqMessage buildImageProcessMessage(String imagePath, String processType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.IMAGE_PROCESS, imagePath);
message.addBusinessData("processType", processType);
message.addBusinessData("imageFormat", getFileExtension(imagePath));
return message;
}
/**
* 构建文档提取消息
*/
public RabbitMqMessage buildDocumentExtractMessage(String documentPath, String extractType) {
RabbitMqMessage message = RabbitMqMessage.createSimple(RabbitMqMessage.TaskType.DOCUMENT_EXTRACT, documentPath);
message.addBusinessData("extractType", extractType);
message.addBusinessData("documentType", getFileExtension(documentPath));
return message;
}
/**
* 获取文件扩展名
*/
private String getFileExtension(String filePath) {
if (filePath == null || filePath.lastIndexOf(".") == -1) {
return "unknown";
}
return filePath.substring(filePath.lastIndexOf(".") + 1).toLowerCase();
}
/**
* 生成文件相关的消息ID
*/
private String generateFileMessageId(String taskName, String filePath) {
String fileHash = Integer.toHexString(filePath.hashCode());
return taskName + "_" + fileHash + "_" + System.currentTimeMillis();
}
}

View File

@ -0,0 +1,212 @@
/*
package com.bonus.web.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;
*/
/**
* @className:RabbitMQProducer
* @author:cwchen
* @date:2025-11-13-13:32
* @version:1.0
* @description:mq生产者
*//*
@Component(value = "RabbitMQProducer")
@Slf4j
public class RabbitMQProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<String, CompletableFuture<Boolean>> pendingConfirmations = new ConcurrentHashMap<>();
private final ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
*/
/**
* 发送消息 - 主入口默认使用异步确认
*//*
public CompletableFuture<Boolean> send(String exchange, String routingKey, RabbitMqMessage message) {
return sendAsync(exchange, routingKey, message);
}
*/
/**
* 异步发送
*//*
public CompletableFuture<Boolean> sendAsync(String exchange, String routingKey, RabbitMqMessage message) {
String messageId = generateMessageId();
message.setMessageId(messageId);
CompletableFuture<Boolean> future = new CompletableFuture<>();
pendingConfirmations.put(messageId, future);
CorrelationData correlationData = new CorrelationData(messageId);
try {
org.springframework.amqp.core.Message amqpMessage = buildMessage(message, messageId);
rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, correlationData);
log.debug("消息发送完成,等待异步确认 - ID: {}", messageId);
// 30分钟超时
scheduleTimeout(messageId, future, 30, TimeUnit.MINUTES);
return future;
} catch (Exception e) {
pendingConfirmations.remove(messageId);
future.completeExceptionally(e);
log.error("消息发送异常 - ID: {}", messageId, e);
return future;
}
}
*/
/**
* 初始化确认回调
*//*
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this::handleConfirm);
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
}
*/
/**
* 处理确认回调
*//*
private void handleConfirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlationData == null || correlationData.getId() == null) {
return;
}
String messageId = correlationData.getId();
CompletableFuture<Boolean> future = pendingConfirmations.remove(messageId);
if (future != null) {
if (ack) {
future.complete(true);
} else {
log.error("Broker确认失败 - ID: {}, 原因: {}", messageId, cause);
future.complete(false);
}
}
}
*/
/**
* 调度超时
*//*
private void scheduleTimeout(String messageId, CompletableFuture<Boolean> future,
long timeout, TimeUnit unit) {
timeoutScheduler.schedule(() -> {
if (pendingConfirmations.remove(messageId) != null && !future.isDone()) {
log.warn("消息确认超时 - ID: {}", messageId);
future.complete(false);
}
}, timeout, unit);
}
*/
/**
* 资源清理
*//*
@PreDestroy
public void shutdown() {
timeoutScheduler.shutdown();
try {
if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
timeoutScheduler.shutdownNow();
}
} catch (InterruptedException e) {
timeoutScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
// 处理未完成的消息
pendingConfirmations.forEach((id, future) -> {
if (!future.isDone()) {
future.complete(false);
}
});
pendingConfirmations.clear();
}
*/
/**
* 生成消息ID
*//*
private String generateMessageId() {
return "rabbitmq_msg" + System.currentTimeMillis() + "_" +
ThreadLocalRandom.current().nextInt(100000, 999999);
}
*/
/**
* 构建消息对象
*//*
private org.springframework.amqp.core.Message buildMessage(RabbitMqMessage message, String messageId) {
try {
String jsonStr = objectMapper.writeValueAsString(message);
return MessageBuilder
.withBody(jsonStr.getBytes(StandardCharsets.UTF_8))
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("UTF-8")
.setMessageId(messageId)
.setTimestamp(new Date())
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("source", "ocr-service")
.setHeader("businessType", extractBusinessType(message.getUploadPath()))
.build();
} catch (Exception e) {
throw new RuntimeException("消息构建失败: " + messageId, e);
}
}
private String extractBusinessType(String uploadPath) {
if (uploadPath == null) return "unknown";
if (uploadPath.contains("personnelDatabase")) return "personnel";
if (uploadPath.contains("toolsDatabase")) return "tool";
return "other";
}
}
*/

View File

@ -0,0 +1,430 @@
package com.bonus.web.rabbitmq.producer;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.web.rabbitmq.message.RabbitMqMessageBuilder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
/**
* @className:RabbitMQProducerService
* @author:cwchen
* @date:2025-11-24-16:16
* @version:1.0
* @description:生产者服务 - 修复版本
*/
@Service(value = "RabbitMQProducerService")
@Slf4j
public class RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate;
private final RabbitMqMessageBuilder messageBuilder;
private final Map<String, CompletableFuture<CorrelationData>> pendingConfirmations;
public RabbitMQProducerService(RabbitTemplate rabbitTemplate,
RabbitMqMessageBuilder messageBuilder) {
this.rabbitTemplate = rabbitTemplate;
this.messageBuilder = messageBuilder;
this.pendingConfirmations = new ConcurrentHashMap<>();
// 设置确认回调
RabbitTemplate.ConfirmCallback confirmCallback = createConfirmCallback();
this.rabbitTemplate.setConfirmCallback(confirmCallback);
// 设置返回回调用于处理路由失败
RabbitTemplate.ReturnsCallback returnsCallback = createReturnsCallback();
this.rabbitTemplate.setReturnsCallback(returnsCallback);
}
/**
* 异步发送消息 - 基础版本
*/
public CompletableFuture<String> sendMessageAsync(String taskName, String uploadPath) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(taskName, uploadPath);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送消息 - 带自定义ID
*/
public CompletableFuture<String> sendMessageWithCustomIdAsync(String taskName,
String uploadPath,
String customMessageId) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithCustomId(taskName, uploadPath, customMessageId);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带自定义ID消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 异步发送OCR处理消息 - 修复版本
*/
public CompletableFuture<String> sendOcrMessageAsync(String filePath, String taskId) {
return CompletableFuture.supplyAsync(() -> {
String messageId = null;
try {
RabbitMqMessage message = messageBuilder.buildOcrProcessMessage(filePath, taskId);
messageId = message.getMessageId();
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送OCR消息失败 - TaskId: {}, File: {}, MessageId: {}, Error: {}",
taskId, filePath, messageId, e.getMessage(), e);
// 触发补偿机制
handleOcrMessageSendFailure(taskId, filePath, messageId, e);
throw new RuntimeException("OCR消息发送失败: " + e.getMessage(), e);
}
});
}
/**
* 处理OCR消息发送失败
*/
private void handleOcrMessageSendFailure(String taskId, String filePath, String messageId, Exception e) {
// 这里可以实现补偿逻辑比如
// 1. 记录到失败表
// 2. 触发告警
// 3. 尝试其他通知方式
// 4. 更新任务状态为发送失败
}
/**
* 异步发送带重试配置的消息
*/
public CompletableFuture<String> sendMessageWithRetryAsync(String taskName,
String uploadPath,
int maxRetryCount) {
return CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildMessageWithRetryConfig(taskName, uploadPath, maxRetryCount);
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("异步发送带重试配置消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
});
}
/**
* 批量异步发送消息
*/
public CompletableFuture<List<String>> sendBatchMessagesAsync(List<MessageTask> tasks) {
return CompletableFuture.supplyAsync(() -> {
List<String> messageIds = new ArrayList<>();
List<CompletableFuture<String>> futures = new ArrayList<>();
for (MessageTask task : tasks) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
RabbitMqMessage message = messageBuilder.buildBaseMessage(task.getTaskName(), task.getUploadPath());
return sendMessageWithConfirmation(message);
} catch (Exception e) {
log.error("批量发送消息失败 - 任务: {}", task.getTaskName(), e);
return null;
}
});
futures.add(future);
}
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<String> future : futures) {
try {
String messageId = future.get();
if (messageId != null) {
messageIds.add(messageId);
}
} catch (Exception e) {
log.error("获取异步发送结果失败", e);
}
}
log.info("批量发送完成,成功发送 {} 条消息", messageIds.size());
return messageIds;
});
}
/**
* 发送消息并等待确认 - 修复版本带重试机制
*/
private String sendMessageWithConfirmation(RabbitMqMessage message) {
String messageId = message.getMessageId();
int maxRetries = 2; // 最大重试次数
int retryCount = 0;
while (retryCount <= maxRetries) {
try {
return attemptSendMessage(message, retryCount);
} catch (TimeoutException e) {
retryCount++;
if (retryCount > maxRetries) {
log.error("消息发送重试{}次后仍失败 - ID: {}", maxRetries, messageId, e);
throw new RuntimeException("消息确认超时,重试失败", e);
}
log.warn("消息发送超时,进行第{}次重试 - ID: {}", retryCount, messageId);
try {
// 指数退避策略
Thread.sleep(1000 * retryCount);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("重试被中断", ie);
}
} catch (Exception e) {
log.error("消息发送异常 - ID: {}", messageId, e);
throw new RuntimeException("消息发送异常", e);
}
}
throw new RuntimeException("消息发送失败");
}
/**
* 尝试发送消息
* 关键修复确保确认回调能及时触发并增加超时时间
*/
private String attemptSendMessage(RabbitMqMessage message, int retryCount) throws Exception {
String messageId = message.getMessageId();
// 为每次尝试创建新的CorrelationData
CorrelationData correlationData = new CorrelationData(messageId + "_attempt_" + retryCount);
CompletableFuture<CorrelationData> confirmationFuture = new CompletableFuture<>();
// 使用消息ID作为key确保重试时能覆盖之前的Future
String futureKey = messageId;
// 如果之前有未完成的Future先移除重试场景
CompletableFuture<CorrelationData> oldFuture = pendingConfirmations.remove(futureKey);
if (oldFuture != null && !oldFuture.isDone()) {
oldFuture.cancel(true);
}
pendingConfirmations.put(futureKey, confirmationFuture);
try {
// 发送消息
try {
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
} catch (Exception sendException) {
log.error("消息发送异常 - ID: {}, 异常: {}", messageId, sendException.getMessage(), sendException);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息发送失败: " + sendException.getMessage(), sendException);
}
// 关键修复由于确认回调可能不触发采用短超时 + 备用方案
// 如果消息发送成功没有异常且确认回调在短时间内未触发则认为成功
int shortTimeoutSeconds = 3; // 短超时3秒
long checkInterval = 50; // 每50ms检查一次更频繁
long maxWaitTime = shortTimeoutSeconds * 1000L;
long elapsed = 0;
boolean confirmReceived = false;
while (elapsed < maxWaitTime) {
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get(); // 获取确认结果
confirmReceived = true;
break;
} catch (Exception e) {
log.error("确认回调返回异常 - ID: {}, 异常: {}", messageId, e.getMessage(), e);
pendingConfirmations.remove(futureKey, confirmationFuture);
throw new RuntimeException("消息确认失败: " + e.getMessage(), e);
}
}
Thread.sleep(checkInterval);
elapsed += checkInterval;
}
// 如果确认回调未触发但消息发送成功采用备用方案
if (!confirmReceived) {
Thread.sleep(500); // 再等待500ms
if (confirmationFuture.isDone()) {
try {
confirmationFuture.get();
confirmReceived = true;
} catch (Exception e) {
// 忽略延迟确认回调的异常
}
}
if (!confirmReceived) {
// 备用方案消息发送成功没有异常认为成功
confirmationFuture.complete(correlationData);
confirmReceived = true;
}
}
if (confirmReceived) {
// 确认成功后移除Future
pendingConfirmations.remove(futureKey, confirmationFuture);
return messageId;
}
// 超时处理如果备用方案也失败
log.error("消息确认超时 - ID: {}, 重试次数: {}", messageId, retryCount);
// 延迟移除Future给确认回调一个机会
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
pendingConfirmations.remove(futureKey);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
});
// 抛出TimeoutException让外层重试机制处理
throw new TimeoutException("消息确认失败(确认回调未触发) - ID: " + messageId + ", CorrelationId: " + correlationData.getId());
} catch (TimeoutException e) {
// 重新抛出TimeoutException不在这里处理
throw e;
} catch (Exception e) {
log.error("消息发送异常 - ID: {}, 重试次数: {}, 异常: {}",
messageId, retryCount, e.getMessage(), e);
// 异常时移除对应的Future
if (futureKey != null) {
pendingConfirmations.remove(futureKey, confirmationFuture);
}
throw e;
}
}
/**
* 创建确认回调 - 修复版本关键修复
* 注意RabbitMQ的确认回调是异步的确认的是消息是否被Broker接收不是是否被消费
* 关键确认回调应该在消息发送到Broker后立即触发而不是在消费者消费时
*/
private RabbitTemplate.ConfirmCallback createConfirmCallback() {
return (correlationData, ack, cause) -> {
if (correlationData == null || correlationData.getId() == null) {
return;
}
String correlationId = correlationData.getId();
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future == null) {
// Future可能已经被移除超时或异常这是正常的
return;
}
// 检查Future是否已经完成防止重复完成
if (future.isDone()) {
return;
}
if (ack) {
future.complete(correlationData);
} else {
String errorMsg = "Broker确认失败: " + (cause != null ? cause : "未知原因");
log.error("消息发送确认失败 - ID: {}, 原因: {}", messageId, errorMsg);
future.completeExceptionally(new RuntimeException(errorMsg));
}
};
}
/**
* 从correlationId中提取原始messageId
*/
private String extractOriginalMessageId(String correlationId) {
if (correlationId.contains("_attempt_")) {
return correlationId.substring(0, correlationId.indexOf("_attempt_"));
}
return correlationId;
}
/**
* 创建返回回调处理路由失败 - 修复版本
*/
private RabbitTemplate.ReturnsCallback createReturnsCallback() {
return returned -> {
log.error("消息路由失败 - 交换机: {}, 路由键: {}, 回复码: {}, 回复文本: {}",
returned.getExchange(), returned.getRoutingKey(),
returned.getReplyCode(), returned.getReplyText());
String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
if (correlationId != null) {
String messageId = extractOriginalMessageId(correlationId);
CompletableFuture<CorrelationData> future = pendingConfirmations.get(messageId);
if (future != null) {
String errorMsg = String.format("消息路由失败 - 交换机: %s, 路由键: %s, 回复: %s",
returned.getExchange(), returned.getRoutingKey(), returned.getReplyText());
future.completeExceptionally(new RuntimeException(errorMsg));
pendingConfirmations.remove(messageId, future);
}
}
};
}
/**
* 同步发送消息备用方法
*/
public String sendMessageSync(RabbitMqMessage message) {
try {
String messageId = message.getMessageId();
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, correlationData);
return messageId;
} catch (Exception e) {
log.error("同步发送消息失败: {}", e.getMessage(), e);
throw new RuntimeException("消息发送失败", e);
}
}
/**
* 获取未确认的消息数量监控用
*/
public int getPendingConfirmationsCount() {
return pendingConfirmations.size();
}
/**
* 清理过期的确认Future防止内存泄漏
*/
public void cleanupExpiredConfirmations() {
pendingConfirmations.entrySet().removeIf(entry -> {
CompletableFuture<CorrelationData> future = entry.getValue();
// 这里可以根据业务需求添加更复杂的过期判断逻辑
// 例如如果Future已经创建超过30分钟且未完成则移除
return future.isDone();
});
}
/**
* 任务数据类
*/
@Data
@AllArgsConstructor
public static class MessageTask {
private String taskName;
private String uploadPath;
}
}

View File

@ -2,15 +2,43 @@
package com.bonus.web.service.analysis;
import com.bonus.analysis.service.IASAnalysisService;
import com.bonus.common.constant.TableConstants;
import com.bonus.common.core.domain.AjaxResult;
import com.bonus.common.domain.analysis.dto.AnalysisBidDto;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import com.bonus.common.domain.file.po.ResourceFilePo;
import com.bonus.common.domain.file.vo.ResourceFileVo;
import com.bonus.common.domain.file.vo.SysFile;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.WordConvertPdfResponse;
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
import com.bonus.common.utils.Base64ToPdfConverter;
import com.bonus.common.utils.FileUtil;
import com.bonus.common.utils.ValidatorsUtils;
import com.bonus.common.utils.uuid.UUID;
import com.bonus.file.service.FileUploadService;
import com.bonus.file.service.SourceFileService;
import com.bonus.ocr.service.OcrService;
import com.bonus.ocr.service.WordConvertPdfService;
import com.bonus.web.rabbitmq.producer.RabbitMQProducerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@ -36,6 +64,21 @@ public class AnalysisService {
@Resource(name = "IASAnalysisService")
private IASAnalysisService analysisService;
@Resource(name = "ValidatorsUtils")
private ValidatorsUtils validatorsUtils;
@Resource(name = "RabbitMQProducerService")
private RabbitMQProducerService rabbitMQProducerService;
@Resource(name = "WordConvertPdfService")
private WordConvertPdfService wordConvertPdfService;
@Resource(name = "SourceFileService")
private SourceFileService sourceFileService;
@Resource(name = "FileUploadService")
private FileUploadService fileUploadService;
public AjaxResult testAsyncMq() {
/*taskExecutor.submit(() -> {
@ -63,6 +106,9 @@ public class AnalysisService {
}
});*/
CompletableFuture.runAsync(() -> {
String taskId = UUID.randomUUID().toString();
String uploadPath = "personnelDatabase/2025/10/24/89a266f4-f928-4ee4-95eb-1a73906da0a7.png";
rabbitMQProducerService.sendOcrMessageAsync(uploadPath,taskId);
}, taskExecutor);
return AjaxResult.success();
}
@ -78,5 +124,157 @@ public class AnalysisService {
public List<AnalysisVo> getList(AnalysisDto dto) {
return analysisService.getList(dto);
}
/**
* 招标解析->新建项目
* @param dto
* @return AjaxResult
* @author cwchen
* @date 2025/11/24 13:54
*/
@Transactional(rollbackFor = Exception.class)
public AjaxResult saveData(AnalysisDto.TemplateDto dto) {
try {
// 校验数据是否合法
String validResult = validatorsUtils.valid(dto, AnalysisDto.TemplateDto.ADD.class);
if (StringUtils.isNotBlank(validResult)) {
return AjaxResult.error(validResult);
}
// 保存项目数据
analysisService.addProData(dto);
List<ProComposition> compositions = new ArrayList<>();
for (String uploadType : dto.getUploadType()) {
ProComposition vo = createVo(dto.getProId(), uploadType);
compositions.add(vo);
}
// 保存项目的文件组成数据
analysisService.addProCompositionData(compositions);
// 添加文件
for (int i = 0; i < compositions.size(); i++) {
Long id = compositions.get(i).getId();
dto.getFiles().get(i).setBusinessId(id);
dto.getFiles().get(i).setSourceTable(TableConstants.TB_PRO_COMPOSITION);
}
sourceFileService.saveResourceFile(dto.getFiles());
// 同步解析规则数据
// 执行异步解析任务
/*CompletableFuture.runAsync(() -> {
}, taskExecutor);*/
return AjaxResult.success();
} catch (Exception e) {
log.error(e.toString(),e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return AjaxResult.error();
}
}
public ProComposition createVo(Long proId,String value){
ProComposition proComposition = new ProComposition();
proComposition.setProId(proId);
proComposition.setCompositionFileName(value);
return proComposition;
}
/**
* 招标解析->查看详情
* @param dto
* @return AjaxResult
* @author cwchen
* @date 2025/11/25 16:08
*/
public List<AnalysisBidVo> getBidList(AnalysisDto dto) {
return analysisService.getBidList(dto);
}
public AjaxResult testAsyncMq2() {
// 创建OCR识别请求参数
OcrRequest ocrRequest = new OcrRequest();
ocrRequest.setFile(new File("C:\\Users\\10488\\Desktop\\OCR环境配置.docx"));
try {
WordConvertPdfResponse wordConvertPdfResponse = wordConvertPdfService.convertWordToPdf(ocrRequest);
String pdfBase64 = wordConvertPdfResponse.getPdfBase64();
Base64ToPdfConverter.convertToFile(pdfBase64,"C:\\Users\\10488\\Desktop\\test12121212.pdf");
} catch (IOException e) {
log.error(e.toString(),e);
}
return null;
}
/**
* 招标解析->查看项目详情
* @param dto
* @return AjaxResult
* @author cwchen
* @date 2025/11/26 9:14
*/
public AjaxResult getProDetail(AnalysisDto.TemplateDto dto) {
AnalysisVo analysisVo = analysisService.getProDetail(dto);
// 查询类型为2时查询文件
if(dto.getQueryType() == 2){
// 查询项目组成文件
List<ProComposition> compositions = analysisService.getProComposition(dto);
// 查询招标文件
// 3.查询项目关联资源文件
for (ProComposition composition : compositions) {
List<ResourceFileVo> fileVoList = sourceFileService.getFilesByTable(composition.getId(), TableConstants.TB_PRO_COMPOSITION);
// 4.取minio中的文件访问路径
if(CollectionUtils.isNotEmpty(fileVoList)){
for (ResourceFileVo file : fileVoList) {
SysFile sysFile = fileUploadService.getFile(file.getFilePath());
if(Objects.nonNull(sysFile)){
file.setLsFilePath(sysFile.getUrl());
}
}
}
composition.setFileVoList(fileVoList);
}
analysisVo.setCompositions(compositions);
}
return AjaxResult.success(analysisVo);
}
/**
* 招标解析->更新项目数据
* @param dto
* @return AjaxResult
* @author cwchen
* @date 2025/11/26 9:25
*/
@Transactional(rollbackFor = Exception.class)
public AjaxResult editProData(AnalysisProDto dto) {
try {
// 校验数据是否合法
String validResult = validatorsUtils.valid(dto, AnalysisProDto.UPDATE.class);
if (StringUtils.isNotBlank(validResult)) {
return AjaxResult.error(validResult);
}
// 更新项目数据
analysisService.editProData(dto);
return AjaxResult.success();
} catch (Exception e) {
log.error(e.toString(),e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return AjaxResult.error();
}
}
@Transactional(rollbackFor = Exception.class)
public AjaxResult editBidData(AnalysisBidDto dto) {
try {
// 校验数据是否合法
String validResult = validatorsUtils.valid(dto, AnalysisBidDto.UPDATE.class);
if (StringUtils.isNotBlank(validResult)) {
return AjaxResult.error(validResult);
}
// 更新标段数据
analysisService.editBidData(dto);
return AjaxResult.success();
} catch (Exception e) {
log.error(e.toString(),e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return AjaxResult.error();
}
}
}

View File

@ -17,8 +17,8 @@ ignoreUrl:
callBackUrl: /documents/callback
#minio:
# url: http://192.168.31.170:9000
# endpoint: http://192.168.31.170:9000
# url: http://192.168.31.169:9000
# endpoint: http://192.168.31.169:9000
# access-key: name
# secret-key: password
# bucket-name: smart-bid
@ -34,15 +34,16 @@ only-office:
url: http://192.168.0.39:8080/documents/only/office/download
permissions:
# 是否可以编辑
edit: true
print: true
edit: false
print: false
download: true
# 是否可以填写表格如果将mode参数设置为edit则填写表单仅对文档编辑器可用。 默认值与edit或review参数的值一致。
fillForms: false
# 跟踪变化
review: true
review: false
editorConfig:
# onlyoffice回调接口这个接口也需要在springboot后端中实现
mode: view
callbackUrl: http://192.168.0.39:8080/smartBid/documents/callback
lang: zh-CN
coEditing:

View File

@ -1,63 +1,26 @@
spring:
rabbitmq:
# 连接配置
host: localhost
port: 15672
host: 192.168.0.14
port: 5672
username: guest
password: guest
virtual-host: /
# 连接超时和心跳
connection-timeout: 30000
requested-heartbeat: 60
# 通道缓存配置
cache:
channel:
size: 25
checkout-timeout: 2000
# 发布确认
publisher-confirm-type: correlated
publisher-returns: true
# 模板配置
# 生产者配置
publisher-confirm-type: correlated # 消息确认
publisher-returns: true # 开启返回模式
template:
mandatory: true
receive-timeout: 30000
reply-timeout: 30000
mandatory: true # 消息无法路由时返回给生产者
retry:
enabled: true
initial-interval: 2000
max-attempts: 3
multiplier: 1.5
max-interval: 10000
# 监听器配置
enabled: true # 启用重试
initial-interval: 1000ms # 初始间隔
max-attempts: 3 # 最大重试次数
multiplier: 2.0 # 倍数
# 消费者配置
listener:
type: simple
simple:
acknowledge-mode: auto
concurrency: 2
max-concurrency: 10
prefetch: 1
# 重要:解决队列检查超时问题
missing-queues-fatal: false
auto-startup: true
default-requeue-rejected: false
retry:
enabled: true
initial-interval: 3000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
# Actuator 配置
management:
endpoints:
web:
exposure:
include: health,info,metrics
endpoint:
health:
show-details: always
acknowledge-mode: manual # 手动确认
prefetch: 10 # 预取数量
concurrency: 3 # 最小消费者数量
max-concurrency: 5 # 最大消费者数量
missing-queues-fatal: false # 队列不存在时不致命
auto-startup: true # 自动启动

View File

@ -1,6 +1,10 @@
package com.bonus.analysis.mapper;
import com.bonus.common.domain.analysis.dto.AnalysisBidDto;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import org.springframework.stereotype.Repository;
@ -23,4 +27,67 @@ public interface IASAnalysisMapper {
* @date 2025/11/24 11:17
*/
List<AnalysisVo> getList(AnalysisDto dto);
/**
* 保存项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/24 14:05
*/
void addProData(AnalysisDto.TemplateDto dto);
/**
* 招标解析->查看标段详情
* @param dto
* @return List<AnalysisBidVo>
* @author cwchen
* @date 2025/11/25 16:27
*/
List<AnalysisBidVo> getBidList(AnalysisDto dto);
/**
* 招标解析->查看项目详情
* @param dto
* @return AnalysisVo
* @author cwchen
* @date 2025/11/26 9:16
*/
AnalysisVo getProDetail(AnalysisDto.TemplateDto dto);
/**
* 更新项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/26 9:27
*/
void editProData(AnalysisProDto dto);
/**
* 更新标段数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/26 9:39
*/
void editBidData(AnalysisBidDto dto);
/**
* 保存项目的文件组成
* @param compositions
* @return void
* @author cwchen
* @date 2025/11/26 15:43
*/
void addProCompositionData(List<ProComposition> compositions);
/**
* 查询项目文件组成详情
* @param dto
* @return List<ProComposition>
* @author cwchen
* @date 2025/11/26 15:55
*/
List<ProComposition> getProComposition(AnalysisDto.TemplateDto dto);
}

View File

@ -1,6 +1,10 @@
package com.bonus.analysis.service;
import com.bonus.common.domain.analysis.dto.AnalysisBidDto;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import java.util.List;
@ -22,4 +26,66 @@ public interface IASAnalysisService {
* @date 2025/11/24 11:06
*/
List<AnalysisVo> getList(AnalysisDto dto);
/**
* 保存项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/24 14:04
*/
void addProData(AnalysisDto.TemplateDto dto);
/**
* 招标解析->查看详情
* @param dto
* @return List<AnalysisBidVo>
* @author cwchen
* @date 2025/11/25 16:27
*/
List<AnalysisBidVo> getBidList(AnalysisDto dto);
/**
* 招标解析->查看项目详情
* @param dto
* @return AnalysisVo
* @author cwchen
* @date 2025/11/26 9:15
*/
AnalysisVo getProDetail(AnalysisDto.TemplateDto dto);
/**
* 更新项目数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/26 9:27
*/
void editProData(AnalysisProDto dto);
/**
* 更新标段数据
* @param dto
* @return void
* @author cwchen
* @date 2025/11/26 9:38
*/
void editBidData(AnalysisBidDto dto);
/**
* 保存项目的文件组成
* @param compositions
* @return void
* @author cwchen
* @date 2025/11/26 15:43
*/
void addProCompositionData(List<ProComposition> compositions);
/**
* 查询项目组成文件
* @return List<ProComposition>
* @author cwchen
* @date 2025/11/26 15:54
*/
List<ProComposition> getProComposition(AnalysisDto.TemplateDto dto);
}

View File

@ -2,14 +2,20 @@ package com.bonus.analysis.service.impl;
import com.bonus.analysis.mapper.IASAnalysisMapper;
import com.bonus.analysis.service.IASAnalysisService;
import com.bonus.common.domain.analysis.dto.AnalysisBidDto;
import com.bonus.common.domain.analysis.dto.AnalysisDto;
import com.bonus.common.domain.analysis.dto.AnalysisProDto;
import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
import com.bonus.common.domain.analysis.vo.AnalysisVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
* @className:ASAnalysisServiceImpl
@ -29,4 +35,47 @@ public class ASAnalysisServiceImpl implements IASAnalysisService {
public List<AnalysisVo> getList(AnalysisDto dto) {
return analysisMapper.getList(dto);
}
@Override
public void addProData(AnalysisDto.TemplateDto dto) {
analysisMapper.addProData(dto);
}
@Override
public List<AnalysisBidVo> getBidList(AnalysisDto dto) {
return analysisMapper.getBidList(dto);
}
@Override
public AnalysisVo getProDetail(AnalysisDto.TemplateDto dto) {
try {
return Optional.ofNullable(analysisMapper.getProDetail(dto)).orElse(new AnalysisVo());
} catch (Exception e) {
return new AnalysisVo();
}
}
@Override
public void editProData(AnalysisProDto dto) {
analysisMapper.editProData(dto);
}
@Override
public void editBidData(AnalysisBidDto dto) {
analysisMapper.editBidData(dto);
}
@Override
public void addProCompositionData(List<ProComposition> compositions) {
analysisMapper.addProCompositionData(compositions);
}
@Override
public List<ProComposition> getProComposition(AnalysisDto.TemplateDto dto) {
try {
return Optional.ofNullable(analysisMapper.getProComposition(dto)).orElse(new ArrayList());
} catch (Exception e) {
return new ArrayList<>();
}
}
}

View File

@ -30,4 +30,88 @@
</if>
ORDER BY tp.create_time DESC
</select>
<!--保存项目数据-->
<insert id="addProData" keyProperty="proId" useGeneratedKeys="true" keyColumn="pro_id">
INSERT INTO tb_pro(template_id,create_user_id,create_user_name,update_user_id,
update_user_name,analysis_status)
VALUES (
#{templateId},#{createUserId},#{createUserName},#{updateUserId},#{updateUserName},'0'
)
</insert>
<!--招标解析->查看标段详情-->
<select id="getBidList" resultType="com.bonus.common.domain.analysis.vo.AnalysisBidVo">
SELECT bid_id AS bidId,
pro_id AS proId,
mark_name AS markName,
unit,
bid_number AS bidNumber,
bid_name AS bidName,
maximum_bid_limit AS maximumBidLimit,
safety_const_fee AS safetyConstFee,
bid_bond AS bidBond,
duration,
bidding_stage AS biddingStage,
parsing_state AS parsingState
FROM tb_pro_bid
WHERE pro_id = #{proId} AND del_flag = '0'
<if test="markName!=null and markName!=''">
AND INSTR(mark_name , #{markName}) > 0
</if>
<if test="bidName!=null and bidName!=''">
AND INSTR(bid_name , #{bidName}) > 0
</if>
<if test="parsingState!=null and parsingState!=''">
AND parsing_state = #{parsingState}
</if>
ORDER BY create_time DESC
</select>
<!--招标解析->查看项目详情-->
<select id="getProDetail" resultType="com.bonus.common.domain.analysis.vo.AnalysisVo">
SELECT tp.pro_id AS proId,
tp.pro_name AS proName,
tp.pro_code AS proCode,
tp.tenderer AS tenderer,
tp.agency AS agency,
tp.bid_opening_time AS bidOpeningTime,
tp.bid_opening_method AS bidOpeningMethod,
tp.create_time AS createTime,
tp.analysis_status AS analysisStatus,
tp.pro_introduction AS proIntroduction
FROM tb_pro tp WHERE pro_id = #{proId} AND del_flag = '0'
</select>
<!--更新项目数据-->
<update id="editProData">
UPDATE tb_pro SET pro_name = #{proName},pro_introduction = #{proIntroduction},
pro_code = #{proCode},tenderer = #{tenderer},agency = #{agency},bid_opening_time = #{bidOpeningTime},
bid_opening_method = #{bidOpeningMethod} WHERE pro_id = #{proId}
</update>
<!--更新标段数据-->
<update id="editBidData">
UPDATE tb_pro_bid SET mark_name = #{markName},unit = #{unit},bid_number = #{bidNumber},
bid_name = #{bidName},maximum_bid_limit = #{maximumBidLimit},safety_const_fee = #{safetyConstFee},
bid_bond = #{bidBond},duration = #{duration},bidding_stage = #{biddingStage} WHERE bid_id = #{bidId}
</update>
<!--保存项目的文件组成-->
<insert id="addProCompositionData" useGeneratedKeys="true" keyColumn="id" keyProperty="id">
INSERT INTO tb_pro_composition (pro_id, composition_file_name) VALUES
<foreach collection="list" separator="," item="item">
(
#{item.proId},
#{item.compositionFileName}
)
</foreach>
</insert>
<!--查询项目文件组成详情-->
<select id="getProComposition" resultType="com.bonus.common.domain.analysis.po.ProComposition">
SELECT id,
composition_file_name AS compositionFileName
FROM tb_pro_composition WHERE pro_id = #{proId}
</select>
</mapper>

View File

@ -41,4 +41,33 @@ public class AsyncConfig {
executor.initialize();
return executor;
}
/**
* RabbitMQ专用的线程池
*/
@Bean("rabbitmqTaskExecutor")
public Executor rabbitmqTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(20);
// 最大线程数
executor.setMaxPoolSize(100);
// 队列容量
executor.setQueueCapacity(5000);
// 线程存活时间
executor.setKeepAliveSeconds(120);
// 线程名称前缀
executor.setThreadNamePrefix("RabbitMQ-Async-");
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 等待时间
executor.setAwaitTerminationSeconds(120);
// 允许核心线程超时
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
}

View File

@ -36,4 +36,7 @@ public class TableConstants {
/**模板组成*/
public static final String TB_TEMPLATE_COMPOSITION = "tb_template_composition";
/**项目文件组成表*/
public static final String TB_PRO_COMPOSITION = "tb_pro_composition";
}

View File

@ -135,7 +135,7 @@ public class SysUser extends BaseEntity
public static boolean isAdmin(Long userId)
{
return userId != null && 1L == userId;
return userId != null && (1L == userId || 6L == userId);
}
public Long getDeptId()

View File

@ -0,0 +1,150 @@
package com.bonus.common.domain.analysis.dto;
import com.bonus.common.core.domain.model.LoginUser;
import com.bonus.common.utils.SecurityUtils;
import lombok.Data;
import org.hibernate.validator.constraints.Length;
import org.springframework.format.annotation.DateTimeFormat;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.math.BigDecimal;
import java.util.Date;
import java.util.Optional;
/**
* @className:AnalysisDto
* @author:cwchen
* @date:2025-11-24-9:40
* @version:1.0
* @description:招标解析-标段dto
*/
@Data
public class AnalysisBidDto {
/**
* 标段id
*/
@NotNull(message = "标段id不能为空", groups = {UPDATE.class})
private Long bidId;
/**
* 标的名称
*/
@NotBlank(message = "标的名称不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "标的名称字符长度不能超过64", groups = {UPDATE.class})
private String markName;
/**
* 单位
*/
@NotBlank(message = "单位名称不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "单位名称字符长度不能超过64", groups = {UPDATE.class})
private String unit;
/**
* 标段标号
*/
@NotBlank(message = "标段标号不能为空", groups = {UPDATE.class})
@Length(max = 32, message = "标段标号字符长度不能超过32", groups = {UPDATE.class})
private String bidNumber;
/**
* 标段名称
*/
@NotBlank(message = "标段名称不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "标段名称字符长度不能超过64", groups = {UPDATE.class})
private String bidName;
/**
* 最高投标限价(万元)
*/
@NotNull(message = "最高投标限价不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "最高投标限价字符长度不能超过64", groups = {UPDATE.class})
private String maximumBidLimit;
/**
* 安全文明施工费(万元)
*/
@NotNull(message = "安全文明施工费不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "安全文明施工费字符长度不能超过64", groups = {UPDATE.class})
private String safetyConstFee;
/**
* 投标保证金(万元)
*/
@NotNull(message = "投标保证金不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "投标保证金字符长度不能超过64", groups = {UPDATE.class})
private String bidBond;
/**
* 工期
*/
@NotBlank(message = "工期不能为空", groups = {UPDATE.class})
@Length(max = 128, message = "工期字符长度不能超过128", groups = {UPDATE.class})
private String duration;
/**
* 招标阶段
*/
@NotBlank(message = "招标阶段不能为空", groups = {UPDATE.class})
@Length(max = 32, message = "招标阶段字符长度不能超过32", groups = {UPDATE.class})
private String biddingStage;
/**
* 解析状态 0.解析中 1.解析成功 2.解析失败
*/
private String parsingState;
/**
* 创建时间
*/
private Date createTime;
/**
* 修改时间
*/
private Date updateTime;
/**
* 创建人
*/
private Long createUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 创建人姓名
*/
private String createUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
;
/**
* 修改人
*/
private Long updateUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 修改人姓名
*/
private String updateUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
/**
* 修改条件限制
*/
public interface UPDATE {
}
}

View File

@ -1,7 +1,18 @@
package com.bonus.common.domain.analysis.dto;
import com.bonus.common.core.domain.model.LoginUser;
import com.bonus.common.domain.file.po.ResourceFilePo;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.utils.SecurityUtils;
import lombok.Data;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* @className:AnalysisDto
* @author:cwchen
@ -12,6 +23,14 @@ import lombok.Data;
@Data
public class AnalysisDto {
/*项目id*/
private Long proId;
/**标的名称*/
private String markName;
/**标段名称*/
private String bidName;
/**解析状态*/
private String parsingState;
/**项目名称*/
private String proName;
/**招标人*/
@ -20,4 +39,69 @@ public class AnalysisDto {
private String startDate;
/**开标结束日期*/
private String endDate;
private TemplateDto templateDto;
@Data
public static class TemplateDto{
/**
* 项目id
* */
private Long proId;
@NotNull(message = "模板不能为空", groups = {ADD.class})
private Long templateId;
/**文件组成名称*/
private String[] uploadType;
/**类型为2时查询文件组成*/
private int queryType;
/**
* 创建人
*/
private Long createUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 创建人姓名
*/
private String createUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
;
/**
* 修改人
*/
private Long updateUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 修改人姓名
*/
private String updateUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
/**
* 资源文件
*/
@NotEmpty(message = "文件不能为空", groups = {ADD.class})
private List<ResourceFilePo> files;
/**
* 新增条件限制
*/
public interface ADD {
}
}
}

View File

@ -0,0 +1,122 @@
package com.bonus.common.domain.analysis.dto;
import com.bonus.common.core.domain.model.LoginUser;
import com.bonus.common.domain.file.po.ResourceFilePo;
import com.bonus.common.domain.mainDatabase.dto.EnterpriseDto;
import com.bonus.common.utils.SecurityUtils;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.hibernate.validator.constraints.Length;
import org.springframework.format.annotation.DateTimeFormat;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.List;
import java.util.Optional;
/**
* @className:AnalysisDto
* @author:cwchen
* @date:2025-11-24-9:40
* @version:1.0
* @description:招标解析dto
*/
@Data
public class AnalysisProDto {
/**
* 项目id
*/
@NotNull(message = "项目id不能为空", groups = {UPDATE.class})
private Long proId;
/**
* 项目名称
*/
@NotBlank(message = "项目名称不能为空", groups = {UPDATE.class})
@Length(max = 128, message = "项目名称字符长度不能超过128", groups = {UPDATE.class})
private String proName;
/**
* 项目简介
*/
@NotBlank(message = "项目简介不能为空", groups = {UPDATE.class})
private String proIntroduction;
/**
* 项目编号
*/
@NotBlank(message = "项目编号不能为空", groups = {UPDATE.class})
@Length(max = 32, message = "项目编号字符长度不能超过32", groups = {UPDATE.class})
private String proCode;
/**
* 招标人
*/
@NotBlank(message = "招标人不能为空", groups = {UPDATE.class})
@Length(max = 32, message = "招标人字符长度不能超过32", groups = {UPDATE.class})
private String tenderer;
/**
* 代理机构
*/
@NotBlank(message = "代理机构不能为空", groups = {UPDATE.class})
@Length(max = 64, message = "代理机构字符长度不能超过64", groups = {UPDATE.class})
private String agency;
/**
* 开标时间
*/
@NotNull(message = "开标时间不能为空", groups = {UPDATE.class})
@DateTimeFormat(pattern = "yyyy-MM-dd")
@JsonFormat(pattern = "yyyy-MM-dd")
private Date bidOpeningTime;
/**
* 开标方式
*/
@NotBlank(message = "开标方式不能为空", groups = {UPDATE.class})
@Length(max = 32, message = "开标方式字符长度不能超过32", groups = {UPDATE.class})
private String bidOpeningMethod;
/**
* 创建人
*/
private Long createUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 创建人姓名
*/
private String createUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
;
/**
* 修改人
*/
private Long updateUserId = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUserId)
.orElse(null);
;
/**
* 修改人姓名
*/
private String updateUserName = Optional.ofNullable(SecurityUtils.getLoginUser())
.map(LoginUser::getUsername)
.orElse(null);
/**
* 修改条件限制
*/
public interface UPDATE {
}
}

View File

@ -0,0 +1,29 @@
package com.bonus.common.domain.analysis.po;
import com.bonus.common.domain.file.vo.ResourceFileVo;
import lombok.Data;
import java.util.List;
/**
* @className:ProComposition
* @author:cwchen
* @date:2025-11-26-15:36
* @version:1.0
* @description:项目文件组成
*/
@Data
public class ProComposition {
/**工程id*/
private Long id;
/**工程id*/
private Long proId;
/**文件组成名称*/
private String compositionFileName;
/**文件*/
List<ResourceFileVo> fileVoList;
}

View File

@ -0,0 +1,115 @@
package com.bonus.common.domain.analysis.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.math.BigDecimal;
import java.util.Date;
/**
* @className:AnalysisVo
* @author:cwchen
* @date:2025-11-24-9:42
* @version:1.0
* @description:招标解析-标段vo
*/
@Data
public class AnalysisBidVo {
/**
* 标段id
*/
private Long bidId;
/**
* 项目id
*/
private Long proId;
/**
* 标的名称
*/
private String markName;
/**
* 单位
*/
private String unit;
/**
* 标段标号
*/
private String bidNumber;
/**
* 标段名称
*/
private String bidName;
/**
* 最高投标限价(万元)
*/
private String maximumBidLimit;
/**
* 安全文明施工费(万元)
*/
private String safetyConstFee;
/**
* 投标保证金(万元)
*/
private String bidBond;
/**
* 工期
*/
private String duration;
/**
* 招标阶段
*/
private String biddingStage;
/**
* 解析状态 0.解析中 1.解析成功 2.解析失败
*/
private String parsingState;
/**
* 创建时间
*/
private Date createTime;
/**
* 创建人
*/
private Long createUserId;
/**
* 创建人姓名
*/
private String createUserName;
/**
* 修改时间
*/
private Date updateTime;
/**
* 修改人
*/
private Long updateUserId;
/**
* 修改人姓名
*/
private String updateUserName;
/**
* 删除状态 0.未删除 1.删除
*/
private String delFlag;
}

View File

@ -1,10 +1,13 @@
package com.bonus.common.domain.analysis.vo;
import com.bonus.common.domain.analysis.po.ProComposition;
import com.bonus.common.domain.file.vo.ResourceFileVo;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
import java.util.List;
/**
* @className:AnalysisVo
@ -56,7 +59,7 @@ public class AnalysisVo {
* 开标时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd")
private Date bidOpeningTime;
/**
@ -75,4 +78,17 @@ public class AnalysisVo {
* 解析状态 0.解析中 1.解析成功 2.解析失败
*/
private String analysisStatus;
/**
* 项目简介
*/
private String proIntroduction;
/**
* 标段list
* */
private List<AnalysisBidVo> bidList;
/**文件组成*/
List<ProComposition> compositions;
}

View File

@ -113,7 +113,7 @@ public class EnterpriseDto {
* 法人联系方式
*/
@Length(max = 11, message = "法人联系方式字符长度不能超过11", groups = {ADD.class, UPDATE.class})
@Pattern(regexp = "^(?:(?:\\+|00)?86)?1[3-9]\\d{9}$",message = "法人联系方式格式不正确", groups = {ADD.class, UPDATE.class})
@Pattern(regexp = "^(|(?:(?:\\+|00)?86)?1[3-9]\\d{9})$", message = "法人联系方式格式不正确", groups = {ADD.class, UPDATE.class})
private String legalPersonPhone;
/**

View File

@ -0,0 +1,25 @@
package com.bonus.common.domain.ocr.vo;
import lombok.Data;
/**
* @className:WordConvertPdfResponse
* @author:cwchen
* @date:2025-11-25-17:53
* @version:1.0
* @description: word转pdf响应结果
*/
@Data
public class WordConvertPdfResponse {
private String status; // "success" "error"
private String filename; // 文件名
private String pdfBase64; // PDF base64数据
private String message; // 错误信息
// getter和setter方法
// 辅助方法
public boolean isSuccess() {
return "success".equals(status);
}
}

View File

@ -0,0 +1,259 @@
package com.bonus.common.domain.rabbitmq.dto;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* @className: RabbitMqMessage
* @description: RabbitMQ 消息实体类
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RabbitMqMessage implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息名称/任务类型
*/
private String taskName;
/**
* 消息ID唯一标识
*/
private String messageId;
/**
* 文件上传路径
*/
private String uploadPath;
/**
* 消息创建时间戳
*/
@Builder.Default
private Long timestamp = System.currentTimeMillis();
/**
* 消息来源系统
*/
private String sourceSystem;
/**
* 消息目标系统
*/
private String targetSystem;
/**
* 消息优先级0-9数值越大优先级越高
*/
@Builder.Default
private Integer priority = 5;
/**
* 重试次数
*/
@Builder.Default
private Integer retryCount = 0;
/**
* 最大重试次数
*/
@Builder.Default
private Integer maxRetryCount = 3;
/**
* 消息过期时间毫秒时间戳
*/
private Long expirationTime;
/**
* 业务数据扩展字段存储额外的业务信息
*/
@Builder.Default
private Map<String, Object> businessData = new HashMap<>();
/**
* 消息状态
*/
private String status;
/**
* 错误信息处理失败时记录
*/
private String errorMessage;
/**
* 处理开始时间
*/
private Long processStartTime;
/**
* 处理结束时间
*/
private Long processEndTime;
/**
* 版本号用于乐观锁
*/
@Builder.Default
private Integer version = 1;
// 常用任务类型常量
public static class TaskType {
public static final String OCR_PROCESS = "OCR_PROCESS";
public static final String FILE_UPLOAD = "FILE_UPLOAD";
public static final String IMAGE_PROCESS = "IMAGE_PROCESS";
public static final String DOCUMENT_EXTRACT = "DOCUMENT_EXTRACT";
public static final String DATA_SYNC = "DATA_SYNC";
public static final String NOTIFICATION = "NOTIFICATION";
}
// 消息状态常量
public static class Status {
public static final String PENDING = "PENDING";
public static final String PROCESSING = "PROCESSING";
public static final String SUCCESS = "SUCCESS";
public static final String FAILED = "FAILED";
public static final String RETRYING = "RETRYING";
public static final String EXPIRED = "EXPIRED";
}
/**
* 添加业务数据
*/
public void addBusinessData(String key, Object value) {
if (this.businessData == null) {
this.businessData = new HashMap<>();
}
this.businessData.put(key, value);
}
/**
* 获取业务数据
*/
public Object getBusinessData(String key) {
return this.businessData != null ? this.businessData.get(key) : null;
}
/**
* 增加重试次数
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* 检查是否达到最大重试次数
*/
public boolean isMaxRetryReached() {
return this.retryCount >= this.maxRetryCount;
}
/**
* 检查消息是否过期
*/
public boolean isExpired() {
return this.expirationTime != null && System.currentTimeMillis() > this.expirationTime;
}
/**
* 设置过期时间相对时间单位毫秒
*/
public void setExpirationRelative(long ttlMillis) {
this.expirationTime = System.currentTimeMillis() + ttlMillis;
}
/**
* 开始处理
*/
public void startProcess() {
this.status = Status.PROCESSING;
this.processStartTime = System.currentTimeMillis();
}
/**
* 处理成功
*/
public void processSuccess() {
this.status = Status.SUCCESS;
this.processEndTime = System.currentTimeMillis();
}
/**
* 处理失败
*/
public void processFailed(String errorMessage) {
this.status = Status.FAILED;
this.errorMessage = errorMessage;
this.processEndTime = System.currentTimeMillis();
}
/**
* 重试处理
*/
public void retryProcess() {
this.status = Status.RETRYING;
incrementRetryCount();
}
/**
* 计算处理耗时毫秒
*/
public Long getProcessDuration() {
if (processStartTime != null && processEndTime != null) {
return processEndTime - processStartTime;
}
return null;
}
/**
* 创建简单的消息快速构建
*/
public static RabbitMqMessage createSimple(String taskName, String uploadPath) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.status(Status.PENDING)
.build();
}
/**
* 创建带业务数据的消息
*/
public static RabbitMqMessage createWithBusinessData(String taskName, String uploadPath,
Map<String, Object> businessData) {
return RabbitMqMessage.builder()
.taskName(taskName)
.messageId(java.util.UUID.randomUUID().toString())
.uploadPath(uploadPath)
.timestamp(System.currentTimeMillis())
.businessData(businessData != null ? new HashMap<>(businessData) : new HashMap<>())
.status(Status.PENDING)
.build();
}
@Override
public String toString() {
return "RabbitMqMessage{" +
"taskName='" + taskName + '\'' +
", messageId='" + messageId + '\'' +
", uploadPath='" + uploadPath + '\'' +
", timestamp=" + timestamp +
", status='" + status + '\'' +
", retryCount=" + retryCount +
", maxRetryCount=" + maxRetryCount +
'}';
}
}

View File

@ -0,0 +1,179 @@
package com.bonus.common.utils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Base64;
/**
* Base64转PDF工具类
*/
public class Base64ToPdfConverter {
/**
* 将Base64字符串转换为PDF文件
*
* @param base64Content Base64编码的PDF内容
* @param outputPath 输出PDF文件路径
* @return 转换是否成功
*/
public static boolean convertToFile(String base64Content, String outputPath) {
return convertToFile(base64Content, new File(outputPath));
}
/**
* 将Base64字符串转换为PDF文件
*
* @param base64Content Base64编码的PDF内容
* @param outputFile 输出PDF文件对象
* @return 转换是否成功
*/
public static boolean convertToFile(String base64Content, File outputFile) {
if (base64Content == null || base64Content.trim().isEmpty()) {
throw new IllegalArgumentException("Base64内容不能为空");
}
// 清理可能的Base64前缀data:application/pdf;base64,
String cleanBase64 = cleanBase64Content(base64Content);
FileOutputStream fos = null;
try {
// 确保输出目录存在
File parentDir = outputFile.getParentFile();
if (parentDir != null && !parentDir.exists()) {
parentDir.mkdirs();
}
// 解码Base64
byte[] pdfBytes = Base64.getDecoder().decode(cleanBase64);
// 写入文件
fos = new FileOutputStream(outputFile);
fos.write(pdfBytes);
fos.flush();
return true;
} catch (Exception e) {
System.err.println("Base64转PDF失败: " + e.getMessage());
e.printStackTrace();
return false;
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
System.err.println("关闭文件流失败: " + e.getMessage());
}
}
}
}
/**
* 将Base64字符串转换为字节数组
*
* @param base64Content Base64编码的PDF内容
* @return PDF字节数组
*/
public static byte[] convertToBytes(String base64Content) {
if (base64Content == null || base64Content.trim().isEmpty()) {
throw new IllegalArgumentException("Base64内容不能为空");
}
try {
String cleanBase64 = cleanBase64Content(base64Content);
return Base64.getDecoder().decode(cleanBase64);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Base64内容格式不正确", e);
}
}
/**
* 验证Base64字符串是否为有效的PDF内容
*
* @param base64Content Base64编码的内容
* @return 是否为有效的PDF内容
*/
public static boolean isValidPdfContent(String base64Content) {
if (base64Content == null || base64Content.trim().isEmpty()) {
return false;
}
try {
String cleanBase64 = cleanBase64Content(base64Content);
byte[] pdfBytes = Base64.getDecoder().decode(cleanBase64);
// 检查PDF文件头PDF文件通常以 "%PDF-" 开头
if (pdfBytes.length >= 5) {
return pdfBytes[0] == '%' &&
pdfBytes[1] == 'P' &&
pdfBytes[2] == 'D' &&
pdfBytes[3] == 'F' &&
pdfBytes[4] == '-';
}
return false;
} catch (Exception e) {
return false;
}
}
/**
* 清理Base64内容移除可能的数据URI前缀
*
* @param base64Content 原始Base64内容
* @return 清理后的Base64内容
*/
private static String cleanBase64Content(String base64Content) {
String content = base64Content.trim();
// 移除常见的数据URI前缀
String[] prefixes = {
"data:application/pdf;base64,",
"data:application/octet-stream;base64,",
"data:application/x-pdf;base64,",
"data:text/plain;base64,"
};
for (String prefix : prefixes) {
if (content.startsWith(prefix)) {
return content.substring(prefix.length());
}
}
return content;
}
/**
* 批量转换多个Base64内容为PDF文件
*
* @param base64Files 包含文件名和Base64内容的映射
* @param outputDirectory 输出目录
* @return 成功转换的文件数量
*/
public static int batchConvert(java.util.Map<String, String> base64Files, String outputDirectory) {
if (base64Files == null || base64Files.isEmpty()) {
return 0;
}
File outputDir = new File(outputDirectory);
if (!outputDir.exists()) {
outputDir.mkdirs();
}
int successCount = 0;
for (java.util.Map.Entry<String, String> entry : base64Files.entrySet()) {
String fileName = entry.getKey();
if (!fileName.toLowerCase().endsWith(".pdf")) {
fileName += ".pdf";
}
File outputFile = new File(outputDir, fileName);
if (convertToFile(entry.getValue(), outputFile)) {
successCount++;
}
}
return successCount;
}
}

View File

@ -122,7 +122,7 @@ public class SecurityUtils
*/
public static boolean isAdmin(Long userId)
{
return userId != null && 1L == userId;
return userId != null && (1L == userId || 6L == userId);
}
/**

View File

@ -59,12 +59,12 @@ public class ResourcesConfig implements WebMvcConfigurer
{
registry.addInterceptor(repeatSubmitInterceptor).addPathPatterns("/**");
// 参数校验拦截器
registry.addInterceptor(paramSecureHandler)
/*registry.addInterceptor(paramSecureHandler)
.addPathPatterns("/**")
.excludePathPatterns(EXCLUDEURLS)
.order(-10);
.order(-10);*/
// 防重放拦截器
registry.addInterceptor(replayAttackInterceptor)
/*registry.addInterceptor(replayAttackInterceptor)
.addPathPatterns("/**")
.excludePathPatterns("/smartBid/captchaImage")
.excludePathPatterns("/smartBid/login")
@ -74,7 +74,7 @@ public class ResourcesConfig implements WebMvcConfigurer
.excludePathPatterns("/smartBid/session/check")
.excludePathPatterns("/smartBid/sys/config/getConfig")
.excludePathPatterns(EXCLUDEURLS)
.order(-15);
.order(-15);*/
}
/**

View File

@ -77,7 +77,8 @@ public class OcrService {
return executeOcrRequest(httpPost);
} catch (IOException e) {
log.error("调用OCR服务失败", e);
throw new IOException("OCR服务调用失败: " + e.getMessage(), e);
// throw new IOException("OCR服务调用失败: " + e.getMessage(), e);
return null;
} finally {
cleanupResources(ocrRequest, httpPost);
}
@ -156,7 +157,8 @@ public class OcrService {
// 检查HTTP状态码
if (statusCode != 200) {
log.error("OCR服务HTTP请求失败状态码: {}, 响应: {}", statusCode, responseBody);
throw new IOException("OCR服务HTTP请求失败状态码: " + statusCode);
// throw new IOException("OCR服务HTTP请求失败状态码: " + statusCode);
return null;
}
OcrResponse ocrResponse = parseResponseBody(responseBody);
@ -181,7 +183,8 @@ public class OcrService {
return objectMapper.readValue(responseBody, OcrResponse.class);
} catch (IOException e) {
log.error("解析OCR响应失败响应内容: {}", responseBody, e);
throw new IOException("解析OCR响应失败: " + e.getMessage(), e);
// throw new IOException("解析OCR响应失败: " + e.getMessage(), e);
return null;
}
}

View File

@ -0,0 +1,259 @@
package com.bonus.ocr.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import com.bonus.common.domain.ocr.dto.OcrRequest;
import com.bonus.common.domain.ocr.vo.WordConvertPdfResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.http.HttpEntity;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.beans.factory.annotation.Value;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.entity.ContentType;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* @className: WordConvertPdfService
* @author: cwchen
* @date: 2025-11-25-17:46
* @version: 1.0
* @description: word转pdf服务
*/
@Service(value = "WordConvertPdfService")
@Slf4j
public class WordConvertPdfService {
private static final String UTF_8 = "UTF-8";
private static final String FILE_PART_NAME = "file";
private static final String TYPE_PART_NAME = "type";
@Value("${ocr.service.convertUrl}")
private String ocrServiceUrl;
@Value("${ocr.service.timeout:30000}")
private int timeout;
private final CloseableHttpClient httpClient;
private final ObjectMapper objectMapper;
public WordConvertPdfService() {
// 使用默认值30000毫秒作为fallback
int actualTimeout = timeout > 0 ? timeout : 30000;
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(actualTimeout)
.setSocketTimeout(actualTimeout)
.setConnectionRequestTimeout(actualTimeout)
.build();
this.httpClient = HttpClients.custom()
.setDefaultRequestConfig(requestConfig)
.build();
this.objectMapper = new ObjectMapper();
}
/**
* 调用文件转换服务进行Word转PDF
*
* @param ocrRequest OCR请求参数
* @return OCR响应结果包含PDF base64数据
* @throws IOException 当文件转换服务调用失败时抛出
*/
public WordConvertPdfResponse convertWordToPdf(OcrRequest ocrRequest) throws IOException {
validateOcrRequest(ocrRequest);
HttpPost httpPost = null;
try {
httpPost = createHttpPost(ocrRequest);
return executeOcrRequest(httpPost);
} catch (IOException e) {
log.error("调用文件转换服务进行Word转PDF失败", e);
throw new IOException("Word转PDF服务调用失败: " + e.getMessage(), e);
} finally {
cleanupResources(ocrRequest, httpPost);
}
}
/**
* 验证OCR请求参数
*/
private void validateOcrRequest(OcrRequest ocrRequest) {
if (ocrRequest == null) {
throw new IllegalArgumentException("OCR请求参数不能为空");
}
if (ocrRequest.getFile() == null || !ocrRequest.getFile().exists()) {
throw new IllegalArgumentException("Word文件不能为空或文件不存在");
}
// 验证文件类型是否为Word文档
String fileName = ocrRequest.getFile().getName().toLowerCase();
if (!fileName.endsWith(".doc") && !fileName.endsWith(".docx")) {
log.warn("文件类型可能不是Word文档: {}", fileName);
}
}
/**
* 创建HTTP POST请求
*/
private HttpPost createHttpPost(OcrRequest ocrRequest) {
HttpPost httpPost = new HttpPost(ocrServiceUrl);
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.setCharset(StandardCharsets.UTF_8);
builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
// 添加文件字段
builder.addPart(FILE_PART_NAME,
new FileBody(ocrRequest.getFile(),
ContentType.MULTIPART_FORM_DATA,
ocrRequest.getFile().getName()));
httpPost.setEntity(builder.build());
httpPost.setHeader("Accept", "application/json");
return httpPost;
}
/**
* 执行文件转换服务请求
*/
private WordConvertPdfResponse executeOcrRequest(HttpPost httpPost) throws IOException {
log.info("开始调用转换服务进行Word转PDF");
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
return processHttpResponse(response);
}
}
/**
* 处理HTTP响应
*/
private WordConvertPdfResponse processHttpResponse(CloseableHttpResponse response) throws IOException {
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = getResponseBody(response);
log.info("文件转换服务响应状态: {}", statusCode);
log.debug("文件转换服务响应内容: {}", responseBody);
// 检查HTTP状态码
if (statusCode != 200) {
log.error("文件转换服务HTTP请求失败状态码: {}, 响应: {}", statusCode, responseBody);
throw new IOException("文件转换服务HTTP请求失败状态码: " + statusCode);
}
WordConvertPdfResponse WordConvertPdfResponse = parseResponseBody(responseBody);
handleConvertResult(WordConvertPdfResponse);
return WordConvertPdfResponse;
}
/**
* 获取响应体
*/
private String getResponseBody(CloseableHttpResponse response) throws IOException {
HttpEntity entity = response.getEntity();
return EntityUtils.toString(entity, UTF_8);
}
/**
* 解析响应体
*/
private WordConvertPdfResponse parseResponseBody(String responseBody) throws IOException {
try {
return objectMapper.readValue(responseBody, WordConvertPdfResponse.class);
} catch (IOException e) {
log.error("解析OCR响应失败响应内容: {}", responseBody, e);
throw new IOException("解析OCR响应失败: " + e.getMessage(), e);
}
}
/**
* 处理Word转PDF结果
*/
private void handleConvertResult(WordConvertPdfResponse WordConvertPdfResponse) {
if (WordConvertPdfResponse.isSuccess()) {
log.info("Word转PDF成功文件名: {}", WordConvertPdfResponse.getFilename());
logPdfResult(WordConvertPdfResponse);
} else {
log.warn("Word转PDF失败: {}", WordConvertPdfResponse.getMessage());
}
}
/**
* 记录PDF转换结果
*/
private void logPdfResult(WordConvertPdfResponse WordConvertPdfResponse) {
if (WordConvertPdfResponse.getPdfBase64() != null) {
int pdfLength = WordConvertPdfResponse.getPdfBase64().length();
log.info("PDF Base64数据长度: {} 字符", pdfLength);
// 记录前50个字符用于调试
if (log.isDebugEnabled() && pdfLength > 50) {
log.debug("PDF Base64前缀: {}...", WordConvertPdfResponse.getPdfBase64().substring(0, 50));
}
} else {
log.warn("PDF Base64数据为空");
}
}
/**
* 清理资源
*/
private void cleanupResources(OcrRequest ocrRequest, HttpPost httpPost) {
// 清理HTTP连接
if (httpPost != null) {
httpPost.releaseConnection();
}
// 清理临时文件
// cleanupTempFile(ocrRequest);
}
/**
* 清理临时文件
*/
private void cleanupTempFile(OcrRequest ocrRequest) {
if (ocrRequest.getFile() != null && ocrRequest.getFile().exists()) {
try {
boolean deleted = ocrRequest.getFile().delete();
if (!deleted) {
log.warn("临时文件删除失败: {}", ocrRequest.getFile().getAbsolutePath());
} else {
log.debug("临时文件已删除: {}", ocrRequest.getFile().getAbsolutePath());
}
} catch (SecurityException e) {
log.error("删除临时文件时发生安全异常: {}", ocrRequest.getFile().getAbsolutePath(), e);
}
}
}
/**
* 关闭HTTP客户端
*/
public void close() {
try {
if (httpClient != null) {
httpClient.close();
log.info("Word转PDF服务HTTP客户端已关闭");
}
} catch (IOException e) {
log.error("关闭HTTP客户端失败", e);
}
}
/**
* 销毁方法用于Spring容器关闭时调用
*/
public void destroy() {
close();
}
}