Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
783490fa2e
|
|
@ -173,10 +173,12 @@ public class AnalysisService {
|
|||
RabbitMqMessage msg = new RabbitMqMessage();
|
||||
String taskId = UUID.randomUUID().toString();
|
||||
msg.setMessageId(taskId);
|
||||
msg.setUploadPath(dto.getFiles().get(i).getFilePath());
|
||||
// msg.setUploadPath(dto.getFiles().get(i).getFilePath());
|
||||
msg.setUploadPath("analysisDatabase/2025/11/29/2055f27b7a8c4fb883b1f9d12d95872a.pdf");
|
||||
msg.setProId(dto.getProId());
|
||||
msg.setTemplateId(dto.getTemplateId());
|
||||
msg.setAnalysisLabelId(dto.getAnalysisLabelId());
|
||||
msg.setCompositionType(1);
|
||||
msg.setTaskName("OCR_PROCESS");
|
||||
asyncTaskList.add(msg);
|
||||
}
|
||||
|
|
@ -233,6 +235,9 @@ public class AnalysisService {
|
|||
*/
|
||||
public AjaxResult getProDetail(AnalysisDto.TemplateDto dto) {
|
||||
AnalysisVo analysisVo = analysisService.getProDetail(dto);
|
||||
// 查询标段文件的文件组成
|
||||
List<String> bidCompositions = analysisService.getBidCompositionByBid(analysisVo);
|
||||
analysisVo.setBidCompositions(bidCompositions);
|
||||
// 查询类型为2时查询文件
|
||||
if(dto.getQueryType() == 2){
|
||||
// 查询项目组成文件
|
||||
|
|
@ -377,6 +382,7 @@ public class AnalysisService {
|
|||
msg.setUploadPath(dto.getFiles().get(i).getFilePath());
|
||||
msg.setProId(dto.getProId());
|
||||
msg.setBidId(dto.getBidId());
|
||||
msg.setCompositionType(2);
|
||||
msg.setTaskName("OCR_PROCESS");
|
||||
asyncTaskList.add(msg);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import com.bonus.common.domain.analysis.po.ProComposition;
|
|||
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisVo;
|
||||
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
|
|
@ -137,4 +138,40 @@ public interface IASAnalysisMapper {
|
|||
* @date 2025/11/29 15:51
|
||||
*/
|
||||
void delProBidAnalysisResult(AnalysisDto.TemplateDto dto);
|
||||
|
||||
/**
|
||||
* 查询标段文件的文件组成
|
||||
* @param analysisVo
|
||||
* @return List<ProComposition>
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 13:29
|
||||
*/
|
||||
List<String> getBidCompositionByBid(AnalysisVo analysisVo);
|
||||
|
||||
/**
|
||||
* 更新项目或者标段的文件组成的解析状态
|
||||
* @param message
|
||||
* @return void
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 14:37
|
||||
*/
|
||||
void updateProCompositionState(RabbitMqMessage message);
|
||||
|
||||
/**
|
||||
* 更新项目或者标段的解析状态
|
||||
* @param message
|
||||
* @return void
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 14:52
|
||||
*/
|
||||
void updateProOrBidState(RabbitMqMessage message);
|
||||
|
||||
/**
|
||||
* 查询项目或标段的组成文件的解析状态
|
||||
* @param message
|
||||
* @return void
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 15:03
|
||||
*/
|
||||
List<ProComposition> getProCompositionState(RabbitMqMessage message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import com.bonus.common.domain.analysis.po.ProComposition;
|
|||
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisVo;
|
||||
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -135,4 +136,40 @@ public interface IASAnalysisService {
|
|||
* @date 2025/11/29 15:48
|
||||
*/
|
||||
void delProBidAnalysisResult(AnalysisDto.TemplateDto dto);
|
||||
|
||||
/**
|
||||
* 查询标段文件的文件组成
|
||||
* @param analysisVo
|
||||
* @return List<ProComposition>
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 13:28
|
||||
*/
|
||||
List<String> getBidCompositionByBid(AnalysisVo analysisVo);
|
||||
|
||||
/**
|
||||
* 更新项目或者标段的文件组成的解析状态
|
||||
* @param message
|
||||
* @return void
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 14:36
|
||||
*/
|
||||
void updateProCompositionState(RabbitMqMessage message);
|
||||
|
||||
/**
|
||||
* 更新项目或者标段的解析状态
|
||||
* @param message
|
||||
* @return void
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 14:51
|
||||
*/
|
||||
void updateProOrBidState(RabbitMqMessage message);
|
||||
|
||||
/**
|
||||
* 查询项目或标段的组成文件的解析状态
|
||||
* @param message
|
||||
* @return List<ProComposition>
|
||||
* @author cwchen
|
||||
* @date 2025/11/30 15:02
|
||||
*/
|
||||
List<ProComposition> getProCompositionState(RabbitMqMessage message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,11 +9,13 @@ import com.bonus.common.domain.analysis.po.ProComposition;
|
|||
import com.bonus.common.domain.analysis.vo.AnalysisBidVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisVo;
|
||||
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
|
|
@ -107,4 +109,30 @@ public class ASAnalysisServiceImpl implements IASAnalysisService {
|
|||
public void delProBidAnalysisResult(AnalysisDto.TemplateDto dto) {
|
||||
analysisMapper.delProBidAnalysisResult(dto);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getBidCompositionByBid(AnalysisVo analysisVo) {
|
||||
try {
|
||||
List<String> list = Optional.ofNullable(analysisMapper.getBidCompositionByBid(analysisVo)).orElse(new ArrayList<>());
|
||||
return list;
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage());
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateProCompositionState(RabbitMqMessage message) {
|
||||
analysisMapper.updateProCompositionState(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateProOrBidState(RabbitMqMessage message) {
|
||||
analysisMapper.updateProOrBidState(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ProComposition> getProCompositionState(RabbitMqMessage message) {
|
||||
return analysisMapper.getProCompositionState(message);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,9 +34,9 @@
|
|||
<!--保存项目数据-->
|
||||
<insert id="addProData" keyProperty="proId" useGeneratedKeys="true" keyColumn="pro_id">
|
||||
INSERT INTO tb_pro(template_id,create_user_id,create_user_name,update_user_id,
|
||||
update_user_name,analysis_status)
|
||||
update_user_name)
|
||||
VALUES (
|
||||
#{templateId},#{createUserId},#{createUserName},#{updateUserId},#{updateUserName},'0'
|
||||
#{templateId},#{createUserId},#{createUserName},#{updateUserId},#{updateUserName}
|
||||
)
|
||||
</insert>
|
||||
|
||||
|
|
@ -82,7 +82,8 @@
|
|||
tp.bid_opening_method AS bidOpeningMethod,
|
||||
tp.create_time AS createTime,
|
||||
tp.analysis_status AS analysisStatus,
|
||||
tp.pro_introduction AS proIntroduction
|
||||
tp.pro_introduction AS proIntroduction,
|
||||
tp.template_id AS templateId
|
||||
FROM tb_pro tp WHERE pro_id = #{proId} AND del_flag = '0'
|
||||
</select>
|
||||
|
||||
|
|
@ -122,8 +123,8 @@
|
|||
<!--获取解析标签项-->
|
||||
<select id="getAnalysisLabels" resultType="com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo">
|
||||
SELECT tali.analysis_label_item_id AS id,
|
||||
tali.analysis_name AS search_keyword,
|
||||
tali.analysis_code AS target_field,
|
||||
tali.analysis_name AS searchKeyword,
|
||||
tali.analysis_code AS targetField,
|
||||
tali.parent_id AS parentId,
|
||||
tali.analysis_level AS analysisLevel,
|
||||
tali.analysis_sort AS analysisSort,
|
||||
|
|
@ -132,6 +133,26 @@
|
|||
LEFT JOIN tb_analysis_rule_set tars ON tali.analysis_label_item_id = tars.analysis_label_item_id AND tars.template_id = #{templateId}
|
||||
WHERE tali.analysis_label_id = #{analysisLabelId} AND tali.del_flag = '0'
|
||||
</select>
|
||||
<!--查询标段文件的文件组成-->
|
||||
<select id="getBidCompositionByBid" resultType="java.lang.String">
|
||||
SELECT file_name AS fileName
|
||||
FROM tb_template_composition
|
||||
WHERE template_id = #{templateId} AND composition_type = '2'
|
||||
</select>
|
||||
<!--查询项目或标段的组成文件的解析状态-->
|
||||
<select id="getProCompositionState" resultType="com.bonus.common.domain.analysis.po.ProComposition">
|
||||
SELECT id,analysis_state AS analysisState
|
||||
FROM tb_pro_composition
|
||||
WHERE composition_type = #{compositionType}
|
||||
<if test="compositionType == 1">
|
||||
AND pro_id = #{proId}
|
||||
</if>
|
||||
<if test="compositionType == 2">
|
||||
AND pro_id = #{bidId}
|
||||
</if>
|
||||
</select>
|
||||
|
||||
|
||||
|
||||
<!--删除项目数据-->
|
||||
<update id="delProData">
|
||||
|
|
@ -152,4 +173,22 @@
|
|||
<update id="delProBidAnalysisResult">
|
||||
DELETE FROM tb_pro_bid_analysis_result WHERE pro_id = #{proId}
|
||||
</update>
|
||||
<!--更新项目或者标段的文件组成的解析状态-->
|
||||
<update id="updateProCompositionState">
|
||||
<if test="compositionType == 1">
|
||||
UPDATE tb_pro_composition SET analysis_state = #{analysisState} WHERE pro_id = #{proId} AND composition_type = '1'
|
||||
</if>
|
||||
<if test="compositionType == 2">
|
||||
UPDATE tb_pro_composition SET analysis_state = #{analysisState} WHERE pro_id = #{bidId} AND composition_type = '2'
|
||||
</if>
|
||||
</update>
|
||||
<!--更新项目或者标段的解析状态-->
|
||||
<update id="updateProOrBidState">
|
||||
<if test="compositionType == 1">
|
||||
UPDATE tb_pro SET analysis_status = #{analysisState} WHERE pro_id = #{proId}
|
||||
</if>
|
||||
<if test="compositionType == 2">
|
||||
UPDATE tb_pro_bid SET parsing_state = #{analysisState} WHERE bid_id = #{bidId}
|
||||
</if>
|
||||
</update>
|
||||
</mapper>
|
||||
|
|
|
|||
|
|
@ -27,6 +27,9 @@ public class ProComposition {
|
|||
/**模板组成类型 1.项目文件 2.标段/标包文件*/
|
||||
private String compositionType;
|
||||
|
||||
/**解析状态 0.解析中 1.解析成功 2.解析失败*/
|
||||
private String analysisState;
|
||||
|
||||
/**文件*/
|
||||
List<ResourceFileVo> fileVoList;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,12 +23,12 @@ public class AnalysisLabelItemOcrVo {
|
|||
/**
|
||||
* 解析名称
|
||||
*/
|
||||
private String search_keyword;
|
||||
private String searchKeyword;
|
||||
|
||||
/**
|
||||
* 解析编码
|
||||
*/
|
||||
private String target_field;
|
||||
private String targetField;
|
||||
|
||||
/**
|
||||
* 父节点
|
||||
|
|
|
|||
|
|
@ -89,6 +89,9 @@ public class AnalysisVo {
|
|||
* */
|
||||
private List<AnalysisBidVo> bidList;
|
||||
|
||||
/**文件组成*/
|
||||
/**项目文件组成*/
|
||||
List<ProComposition> compositions;
|
||||
|
||||
/**标段文件组成*/
|
||||
private List<String> bidCompositions;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import lombok.Data;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
|
@ -32,12 +33,12 @@ public class AnalysisOcrRequest {
|
|||
private String gpus = "2";
|
||||
|
||||
private String doc_folder_path;
|
||||
private String cover_keys;
|
||||
// private List<String> cover_keys = new ArrayList<>();
|
||||
private List<AnalysisLabelItemOcrVo> extraction_items;
|
||||
|
||||
public AnalysisOcrRequest(String doc_folder_path, String cover_keys, List<AnalysisLabelItemOcrVo> extraction_items) {
|
||||
public AnalysisOcrRequest(String doc_folder_path, /*List<String> cover_keys,*/ List<AnalysisLabelItemOcrVo> extraction_items) {
|
||||
this.doc_folder_path = doc_folder_path;
|
||||
this.cover_keys = cover_keys;
|
||||
// this.cover_keys = cover_keys;
|
||||
this.extraction_items = extraction_items;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ public class AnalysisResponse {
|
|||
private String message; // 消息
|
||||
|
||||
@JsonProperty("data")
|
||||
private Map<String, String> data; // 识别数据
|
||||
private Map<String, Object> data; // 识别数据
|
||||
|
||||
public boolean isSuccess() {
|
||||
return Objects.equals(status, "success");
|
||||
|
|
|
|||
|
|
@ -60,4 +60,14 @@ public class RabbitMqMessage implements Serializable {
|
|||
*/
|
||||
private Map<String, Object> businessData = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 模板组成类型 1.项目文件 2.标段文件
|
||||
* */
|
||||
private int compositionType;
|
||||
|
||||
/**
|
||||
* 解析状态 0.待解析 1.解析成功 2.解析失败
|
||||
* */
|
||||
private String analysisState;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -146,27 +146,6 @@ public class AnalysisOcrService {
|
|||
return httpPost;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建HTTP POST请求2
|
||||
*/
|
||||
/*private HttpPost createHttpPost2(AnalysisOcrRequest analysisOcrRequest) {
|
||||
HttpPost httpPost = new HttpPost(extractInfoUrl);
|
||||
|
||||
try {
|
||||
// 将请求对象转换为JSON字符串
|
||||
String jsonRequest = convertToJson(analysisOcrRequest);
|
||||
// 设置请求体为JSON
|
||||
StringEntity entity = new StringEntity(jsonRequest, ContentType.APPLICATION_JSON);
|
||||
httpPost.setEntity(entity);
|
||||
// 设置请求头
|
||||
httpPost.setHeader("Accept", "application/json");
|
||||
httpPost.setHeader("Content-Type", "application/json; charset=UTF-8");
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("创建HTTP POST请求失败", e);
|
||||
}
|
||||
return httpPost;
|
||||
}*/
|
||||
|
||||
private HttpPost createHttpPost2(AnalysisOcrRequest analysisOcrRequest) {
|
||||
HttpPost httpPost = new HttpPost(extractInfoUrl);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,367 +0,0 @@
|
|||
/*
|
||||
package com.bonus.web.rabbitmq.consumer;
|
||||
|
||||
import com.bonus.common.domain.ocr.dto.OcrRequest;
|
||||
import com.bonus.common.domain.ocr.vo.OcrResponse;
|
||||
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
|
||||
import com.bonus.file.config.MinioConfig;
|
||||
import com.bonus.file.util.MinioUtil;
|
||||
import com.bonus.ocr.service.OcrService;
|
||||
import com.bonus.web.rabbitmq.config.RabbitMQConfig;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
*/
|
||||
/**
|
||||
* @className:RabbitMQConsumer
|
||||
* @author:cwchen
|
||||
* @date:2025-11-13-13:32
|
||||
* @version:1.0
|
||||
* @description:mq消费者
|
||||
*//*
|
||||
|
||||
@Component(value = "RabbitMQConsumer")
|
||||
@Slf4j
|
||||
public class RabbitMQConsumer {
|
||||
|
||||
@Resource
|
||||
private OcrService ocrService;
|
||||
|
||||
@Resource
|
||||
private MinioConfig minioConfig;
|
||||
|
||||
@Resource
|
||||
private MinioUtil minioUtil;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
*/
|
||||
/**
|
||||
* 主队列消息处理
|
||||
* 优化:确保异常处理逻辑正确,消息确认机制清晰
|
||||
* 关键:在手动确认模式下,成功时手动ACK,失败时抛出异常让重试机制处理
|
||||
*//*
|
||||
|
||||
@RabbitListener(queues = "myQueue", containerFactory = "multiConsumerFactory")
|
||||
public void handleMessage(org.springframework.amqp.core.Message amqpMessage,
|
||||
Channel channel) throws IOException {
|
||||
|
||||
RabbitMqMessage rabbitMq = null;
|
||||
long deliveryTag = 0;
|
||||
String messageId = "unknown";
|
||||
|
||||
try {
|
||||
// 解析消息
|
||||
rabbitMq = parseMessage(amqpMessage);
|
||||
deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
||||
messageId = rabbitMq.getMessageId();
|
||||
|
||||
log.info("开始处理消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
|
||||
|
||||
// 业务处理
|
||||
processBusiness(rabbitMq);
|
||||
|
||||
// 处理成功 - 确认消息
|
||||
channel.basicAck(deliveryTag, false);
|
||||
log.info("✅ 消息处理完成并确认 - ID: {}", messageId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("❌ 消息处理异常 - ID: {}, deliveryTag: {}, 异常信息: {}",
|
||||
messageId, deliveryTag, e.getMessage(), e);
|
||||
|
||||
// 检查是否是 AmqpRejectAndDontRequeueException(重试次数耗尽,消息已发送到死信队列)
|
||||
AmqpRejectAndDontRequeueException rejectException = findRejectException(e);
|
||||
|
||||
if (rejectException != null) {
|
||||
// 关键修复:消息已发送到死信队列,需要手动拒绝原消息
|
||||
log.error("🚫 重试次数耗尽,消息已发送到死信队列,准备拒绝原消息 - ID: {}, deliveryTag: {}",
|
||||
messageId, deliveryTag);
|
||||
|
||||
try {
|
||||
if (deliveryTag > 0) {
|
||||
// 手动拒绝消息(NACK,不重新入队)
|
||||
// requeue=false 确保消息不会重新入队,服务重启后不会重新处理
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
log.error("✅ 原消息已永久拒绝并从主队列中删除 - ID: {}, deliveryTag: {}。服务重启后不会重新处理此消息。",
|
||||
messageId, deliveryTag);
|
||||
// 消息已经被处理(发送到死信队列并永久拒绝),不重新抛出异常
|
||||
return;
|
||||
} else {
|
||||
log.error("❌ deliveryTag 无效,无法拒绝消息 - ID: {}, deliveryTag: {}",
|
||||
messageId, deliveryTag);
|
||||
}
|
||||
} catch (IOException ioException) {
|
||||
log.error("❌ 拒绝消息失败 - ID: {}, deliveryTag: {}, 异常: {}",
|
||||
messageId, deliveryTag, ioException.getMessage(), ioException);
|
||||
}
|
||||
// 即使拒绝失败,也不重新抛出异常,避免重复处理
|
||||
return;
|
||||
}
|
||||
|
||||
// 其他异常:抛出异常触发重试机制
|
||||
throw new RuntimeException("消息处理失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 递归查找 AmqpRejectAndDontRequeueException
|
||||
*//*
|
||||
|
||||
private AmqpRejectAndDontRequeueException findRejectException(Throwable e) {
|
||||
if (e == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (e instanceof AmqpRejectAndDontRequeueException) {
|
||||
return (AmqpRejectAndDontRequeueException) e;
|
||||
}
|
||||
|
||||
// 检查异常消息中是否包含死信队列相关的关键字
|
||||
String errorMessage = e.getMessage();
|
||||
if (errorMessage != null &&
|
||||
(errorMessage.contains("死信队列") ||
|
||||
errorMessage.contains("重试次数耗尽") ||
|
||||
errorMessage.contains("消息已发送到死信队列"))) {
|
||||
// 递归查找异常链
|
||||
Throwable cause = e.getCause();
|
||||
int depth = 0;
|
||||
while (cause != null && depth < 10) {
|
||||
if (cause instanceof AmqpRejectAndDontRequeueException) {
|
||||
return (AmqpRejectAndDontRequeueException) cause;
|
||||
}
|
||||
cause = cause.getCause();
|
||||
depth++;
|
||||
}
|
||||
}
|
||||
|
||||
// 递归查找 cause
|
||||
return findRejectException(e.getCause());
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 死信队列消息处理 - 监听重试次数耗尽的消息
|
||||
*//*
|
||||
|
||||
@RabbitListener(queues = RabbitMQConfig.DLX_QUEUE, containerFactory = "deadLetterContainerFactory")
|
||||
public void handleDeadLetterMessage(org.springframework.amqp.core.Message amqpMessage,
|
||||
Channel channel) throws IOException {
|
||||
|
||||
RabbitMqMessage rabbitMq = parseMessage(amqpMessage);
|
||||
long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
|
||||
String messageId = rabbitMq.getMessageId();
|
||||
|
||||
try {
|
||||
log.error("🚨 收到死信消息 - ID: {}, 任务: {}", messageId, rabbitMq.getTaskName());
|
||||
|
||||
// 处理死信消息:记录日志、发送告警、人工干预等
|
||||
processDeadLetterMessage(rabbitMq, amqpMessage);
|
||||
|
||||
// 确认死信消息
|
||||
channel.basicAck(deliveryTag, false);
|
||||
log.info("死信消息处理完成 - ID: {}", messageId);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("死信消息处理异常 - ID: {}", messageId, e);
|
||||
// 死信消息处理失败,可以选择重新入队或丢弃
|
||||
channel.basicNack(deliveryTag, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 处理死信消息
|
||||
*//*
|
||||
|
||||
private void processDeadLetterMessage(RabbitMqMessage message, org.springframework.amqp.core.Message amqpMessage) {
|
||||
String messageId = message.getMessageId();
|
||||
|
||||
try {
|
||||
// 1. 记录死信消息到数据库或日志文件
|
||||
logDeadLetterToDatabase(message);
|
||||
|
||||
// 2. 发送告警通知
|
||||
sendDeadLetterAlert(message);
|
||||
|
||||
// 3. 记录详细错误信息
|
||||
Map<String, Object> deadLetterInfo = collectDeadLetterInfo(message, amqpMessage);
|
||||
log.error("死信消息详情 - ID: {}, 信息: {}", messageId, deadLetterInfo);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("处理死信消息时发生异常 - ID: {}", messageId, e);
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 记录死信消息到数据库(示例)
|
||||
*//*
|
||||
|
||||
private void logDeadLetterToDatabase(RabbitMqMessage message) {
|
||||
// 这里可以集成数据库操作,记录死信消息
|
||||
// 例如:messageId, taskName, uploadPath, 失败时间, 失败原因等
|
||||
log.warn("记录死信消息到数据库 - ID: {}, 任务: {}",
|
||||
message.getMessageId(), message.getTaskName());
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 发送死信告警
|
||||
*//*
|
||||
|
||||
private void sendDeadLetterAlert(RabbitMqMessage message) {
|
||||
try {
|
||||
String alertMessage = String.format(
|
||||
"🚨 OCR处理死信告警\n" +
|
||||
"消息ID: %s\n" +
|
||||
"任务名称: %s\n" +
|
||||
"文件路径: %s\n" +
|
||||
"时间: %s\n" +
|
||||
"请及时处理!",
|
||||
message.getMessageId(),
|
||||
message.getTaskName(),
|
||||
message.getUploadPath(),
|
||||
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
|
||||
);
|
||||
|
||||
log.error("死信告警: {}", alertMessage);
|
||||
|
||||
// 这里可以集成实际的告警系统
|
||||
// 例如:发送邮件、短信、钉钉、企业微信等
|
||||
// alertService.send(alertMessage);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("发送死信告警失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 收集死信消息信息
|
||||
*//*
|
||||
|
||||
private Map<String, Object> collectDeadLetterInfo(RabbitMqMessage message,
|
||||
org.springframework.amqp.core.Message amqpMessage) {
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
|
||||
info.put("messageId", message.getMessageId());
|
||||
info.put("taskName", message.getTaskName());
|
||||
info.put("uploadPath", message.getUploadPath());
|
||||
info.put("deadLetterTime", new Date());
|
||||
info.put("originalHeaders", amqpMessage.getMessageProperties().getHeaders());
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
private RabbitMqMessage parseMessage(org.springframework.amqp.core.Message message) {
|
||||
try {
|
||||
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
|
||||
RabbitMqMessage rabbitMq = objectMapper.readValue(messageBody, RabbitMqMessage.class);
|
||||
|
||||
if (rabbitMq.getMessageId() == null) {
|
||||
String messageId = message.getMessageProperties().getMessageId();
|
||||
rabbitMq.setMessageId(messageId != null ? messageId : generateMessageId());
|
||||
}
|
||||
return rabbitMq;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("消息解析失败", e);
|
||||
throw new RuntimeException("消息解析失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processBusiness(RabbitMqMessage message) {
|
||||
String uploadPath = message.getUploadPath();
|
||||
validateUploadPath(uploadPath);
|
||||
File fileFromMinio = getFileFromMinio(uploadPath);
|
||||
OcrResponse ocrResponse = performOcrRecognition(fileFromMinio);
|
||||
processOcrResult(message, ocrResponse);
|
||||
}
|
||||
|
||||
private void validateUploadPath(String uploadPath) {
|
||||
if (StringUtils.isBlank(uploadPath)) {
|
||||
throw new RuntimeException("文件路径不能为空");
|
||||
}
|
||||
}
|
||||
|
||||
private File getFileFromMinio(String uploadPath) {
|
||||
try {
|
||||
File file = minioUtil.getFileFromMinio(minioConfig.getBucketName(), uploadPath);
|
||||
if (file == null || !file.exists()) {
|
||||
throw new RuntimeException("Minio文件不存在: " + uploadPath);
|
||||
}
|
||||
return file;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("获取Minio文件失败: " + uploadPath, e);
|
||||
}
|
||||
}
|
||||
|
||||
private OcrResponse performOcrRecognition(File file) {
|
||||
try {
|
||||
// OcrRequest ocrRequest = buildOcrRequest(file);
|
||||
// OcrResponse ocrResponse = ocrService.callOcrService(ocrRequest);
|
||||
OcrResponse ocrResponse = null;
|
||||
// 修复:检查 OCR 响应是否为 null
|
||||
if (Objects.isNull(ocrResponse)) {
|
||||
throw new RuntimeException("OCR服务返回结果为空");
|
||||
}
|
||||
|
||||
if (!isOcrResponseValid(ocrResponse)) {
|
||||
throw new RuntimeException("OCR识别结果无效");
|
||||
}
|
||||
|
||||
log.info("OCR识别成功 - 数据: {}", ocrResponse.getData());
|
||||
return ocrResponse;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("OCR识别失败", e);
|
||||
throw new RuntimeException("OCR识别失败: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private OcrRequest buildOcrRequest(File file) {
|
||||
OcrRequest ocrRequest = new OcrRequest();
|
||||
ocrRequest.setFile(file);
|
||||
ocrRequest.setType("image/png");
|
||||
ocrRequest.setFields_json("姓名,公民身份号码");
|
||||
return ocrRequest;
|
||||
}
|
||||
|
||||
private boolean isOcrResponseValid(OcrResponse ocrResponse) {
|
||||
return ocrResponse.getData() != null && !ocrResponse.getData().toString().isEmpty();
|
||||
}
|
||||
|
||||
public void processOcrResult(RabbitMqMessage originalMessage, OcrResponse ocrResponse) {
|
||||
try {
|
||||
log.info("处理OCR结果 - 消息ID: {}, 任务: {}",
|
||||
originalMessage.getMessageId(), originalMessage.getTaskName());
|
||||
// 业务逻辑...
|
||||
log.info("OCR结果处理完成 - 消息ID: {}", originalMessage.getMessageId());
|
||||
} catch (Exception e) {
|
||||
log.error("OCR结果处理失败 - 消息ID: {}", originalMessage.getMessageId(), e);
|
||||
throw new RuntimeException("OCR结果处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String generateMessageId() {
|
||||
return "gen_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000);
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
package com.bonus.rabbitmq.consumer;
|
||||
|
||||
import com.bonus.analysis.service.IASAnalysisService;
|
||||
import com.bonus.common.domain.analysis.po.ProComposition;
|
||||
import com.bonus.common.domain.analysis.vo.AnalysisLabelItemOcrVo;
|
||||
import com.bonus.common.domain.ocr.dto.AnalysisOcrRequest;
|
||||
import com.bonus.common.domain.ocr.vo.AnalysisResponse;
|
||||
|
|
@ -107,6 +108,15 @@ public class RabbitMQConsumerService {
|
|||
* @date 2025/11/29 13:25
|
||||
*/
|
||||
private void processBusiness(RabbitMqMessage message) {
|
||||
// 更新项目或者标段的解析状态
|
||||
message.setAnalysisState("0");
|
||||
analysisService.updateProOrBidState(message);
|
||||
// 更新项目或者标段的文件组成的解析状态
|
||||
message.setAnalysisState("0");
|
||||
analysisService.updateProCompositionState(message);
|
||||
// 查询项目或标段的组成文件的解析状态
|
||||
List<ProComposition> proCompositionList = analysisService.getProCompositionState(message);
|
||||
// 招标解析执行一次处理
|
||||
String uploadPath = message.getUploadPath();
|
||||
File fileFromMinio = getFileFromMinio(uploadPath);
|
||||
AnalysisResponse ocrResponse = performAnalysisRecognition(fileFromMinio);
|
||||
|
|
@ -117,10 +127,11 @@ public class RabbitMQConsumerService {
|
|||
.map(Object::toString)
|
||||
.orElse(null);
|
||||
if(StringUtils.isNotBlank(folderPath)) {
|
||||
// 一次处理解析成功后执行二次处理
|
||||
// 招标解析执行二次处理
|
||||
performAnalysisRecognition2(message, folderPath);
|
||||
}else{
|
||||
// 解析失败
|
||||
message.setAnalysisState("2");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -169,6 +180,14 @@ public class RabbitMQConsumerService {
|
|||
|
||||
public void performAnalysisRecognition2(RabbitMqMessage message,String folderPath){
|
||||
try {
|
||||
|
||||
if(message.getBidId() == null){
|
||||
// 1.项目解析
|
||||
|
||||
}else {
|
||||
// 2.标段解析
|
||||
}
|
||||
|
||||
List<AnalysisLabelItemOcrVo> labelItemVoList = analysisService.getAnalysisLabels(message.getAnalysisLabelId(),message.getTemplateId());
|
||||
AnalysisOcrRequest analysisOcrRequest = buildOcrRequest2(folderPath, labelItemVoList);
|
||||
AnalysisResponse ocrResponse2 = analysisOcrService.callOcrService(analysisOcrRequest);
|
||||
|
|
@ -206,7 +225,6 @@ public class RabbitMQConsumerService {
|
|||
*/
|
||||
private AnalysisOcrRequest buildOcrRequest2(String filePath,List<AnalysisLabelItemOcrVo> list) {
|
||||
AnalysisOcrRequest ocrRequest = new AnalysisOcrRequest();
|
||||
ocrRequest.setCover_keys("");
|
||||
ocrRequest.setExtraction_items(list);
|
||||
ocrRequest.setDoc_folder_path(filePath);
|
||||
ocrRequest.setAnalysisType("2");
|
||||
|
|
|
|||
|
|
@ -1,212 +0,0 @@
|
|||
/*
|
||||
|
||||
package com.bonus.web.rabbitmq.producer;
|
||||
|
||||
import com.bonus.common.domain.rabbitmq.dto.RabbitMqMessage;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.core.*;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* @className:RabbitMQProducer
|
||||
* @author:cwchen
|
||||
* @date:2025-11-13-13:32
|
||||
* @version:1.0
|
||||
* @description:mq生产者
|
||||
*//*
|
||||
|
||||
|
||||
@Component(value = "RabbitMQProducer")
|
||||
@Slf4j
|
||||
public class RabbitMQProducer {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private final Map<String, CompletableFuture<Boolean>> pendingConfirmations = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 发送消息 - 主入口,默认使用异步确认
|
||||
*//*
|
||||
|
||||
|
||||
public CompletableFuture<Boolean> send(String exchange, String routingKey, RabbitMqMessage message) {
|
||||
return sendAsync(exchange, routingKey, message);
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 异步发送
|
||||
*//*
|
||||
|
||||
|
||||
public CompletableFuture<Boolean> sendAsync(String exchange, String routingKey, RabbitMqMessage message) {
|
||||
String messageId = generateMessageId();
|
||||
message.setMessageId(messageId);
|
||||
|
||||
CompletableFuture<Boolean> future = new CompletableFuture<>();
|
||||
pendingConfirmations.put(messageId, future);
|
||||
|
||||
CorrelationData correlationData = new CorrelationData(messageId);
|
||||
|
||||
try {
|
||||
org.springframework.amqp.core.Message amqpMessage = buildMessage(message, messageId);
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, amqpMessage, correlationData);
|
||||
log.debug("消息发送完成,等待异步确认 - ID: {}", messageId);
|
||||
// 30分钟超时
|
||||
scheduleTimeout(messageId, future, 30, TimeUnit.MINUTES);
|
||||
return future;
|
||||
} catch (Exception e) {
|
||||
pendingConfirmations.remove(messageId);
|
||||
future.completeExceptionally(e);
|
||||
log.error("消息发送异常 - ID: {}", messageId, e);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 初始化确认回调
|
||||
*//*
|
||||
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
rabbitTemplate.setConfirmCallback(this::handleConfirm);
|
||||
// 优雅关闭
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 处理确认回调
|
||||
*//*
|
||||
|
||||
|
||||
private void handleConfirm(CorrelationData correlationData, boolean ack, String cause) {
|
||||
if (correlationData == null || correlationData.getId() == null) {
|
||||
return;
|
||||
}
|
||||
String messageId = correlationData.getId();
|
||||
CompletableFuture<Boolean> future = pendingConfirmations.remove(messageId);
|
||||
|
||||
if (future != null) {
|
||||
if (ack) {
|
||||
future.complete(true);
|
||||
} else {
|
||||
log.error("Broker确认失败 - ID: {}, 原因: {}", messageId, cause);
|
||||
future.complete(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 调度超时
|
||||
*//*
|
||||
|
||||
|
||||
private void scheduleTimeout(String messageId, CompletableFuture<Boolean> future,
|
||||
long timeout, TimeUnit unit) {
|
||||
timeoutScheduler.schedule(() -> {
|
||||
if (pendingConfirmations.remove(messageId) != null && !future.isDone()) {
|
||||
log.warn("消息确认超时 - ID: {}", messageId);
|
||||
future.complete(false);
|
||||
}
|
||||
}, timeout, unit);
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 资源清理
|
||||
*//*
|
||||
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
timeoutScheduler.shutdown();
|
||||
try {
|
||||
if (!timeoutScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
timeoutScheduler.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
timeoutScheduler.shutdownNow();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
// 处理未完成的消息
|
||||
pendingConfirmations.forEach((id, future) -> {
|
||||
if (!future.isDone()) {
|
||||
future.complete(false);
|
||||
}
|
||||
});
|
||||
pendingConfirmations.clear();
|
||||
}
|
||||
|
||||
*/
|
||||
/**
|
||||
* 生成消息ID
|
||||
*//*
|
||||
|
||||
|
||||
private String generateMessageId() {
|
||||
return "rabbitmq_msg" + System.currentTimeMillis() + "_" +
|
||||
ThreadLocalRandom.current().nextInt(100000, 999999);
|
||||
}
|
||||
|
||||
|
||||
*/
|
||||
/**
|
||||
* 构建消息对象
|
||||
*//*
|
||||
|
||||
|
||||
private org.springframework.amqp.core.Message buildMessage(RabbitMqMessage message, String messageId) {
|
||||
try {
|
||||
String jsonStr = objectMapper.writeValueAsString(message);
|
||||
return MessageBuilder
|
||||
.withBody(jsonStr.getBytes(StandardCharsets.UTF_8))
|
||||
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
|
||||
.setContentEncoding("UTF-8")
|
||||
.setMessageId(messageId)
|
||||
.setTimestamp(new Date())
|
||||
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
|
||||
.setHeader("source", "ocr-service")
|
||||
.setHeader("businessType", extractBusinessType(message.getUploadPath()))
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("消息构建失败: " + messageId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String extractBusinessType(String uploadPath) {
|
||||
if (uploadPath == null) return "unknown";
|
||||
if (uploadPath.contains("personnelDatabase")) return "personnel";
|
||||
if (uploadPath.contains("toolsDatabase")) return "tool";
|
||||
return "other";
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
@ -6,7 +6,6 @@ import org.springframework.stereotype.Service;
|
|||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @className:SendRabbitMqService
|
||||
|
|
|
|||
Loading…
Reference in New Issue