From 2abf460e5bde96f755b22618de3eb65c6e8177a1 Mon Sep 17 00:00:00 2001 From: zhangtq <2452618307@qq.com> Date: Thu, 13 Feb 2025 10:58:39 +0800 Subject: [PATCH] =?UTF-8?q?mq=E6=B6=88=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../customer/mq/DelayedMessageSender.java | 5 +- .../core/order/mq/MqDataChangeMessageDto.java | 111 +++++++ .../com/bonus/core/order/mq/MqPayload.java | 68 ++++ .../java/com/bonus/core/order/mq/MqUtil.java | 297 +++++++++--------- 4 files changed, 331 insertions(+), 150 deletions(-) create mode 100644 bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqDataChangeMessageDto.java create mode 100644 bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqPayload.java diff --git a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/customer/mq/DelayedMessageSender.java b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/customer/mq/DelayedMessageSender.java index 1c4119cc..f3b236a3 100644 --- a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/customer/mq/DelayedMessageSender.java +++ b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/customer/mq/DelayedMessageSender.java @@ -3,6 +3,7 @@ package com.bonus.core.customer.mq; import com.bonus.core.common.constant.LeMqConstant; import com.bonus.core.common.redis.RedisUtil; import com.bonus.core.customer.constants.CustCacheKey; +import com.bonus.core.order.mq.MqUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -21,9 +22,9 @@ public class DelayedMessageSender { String delayTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")); customerMessageDTO.setDelayTime(delayTime); if (customerMessageDTO.isDelay()) { -// MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, DELAY.intValue() * 1000); + MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, DELAY.intValue() * 1000); } else { -// MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, 0); + MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, 0); } RedisUtil.setString(CustCacheKey.getTenantKey(customerMessageDTO.getMessageType().name()), delayTime, 7200L); diff --git a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqDataChangeMessageDto.java b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqDataChangeMessageDto.java new file mode 100644 index 00000000..8b3b0437 --- /dev/null +++ b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqDataChangeMessageDto.java @@ -0,0 +1,111 @@ +package com.bonus.core.order.mq; + +import com.bonus.core.common.constant.LeMqConstant; +import com.bonus.framework.config.deserializer.DateTimeDeserializer; +import com.bonus.framework.config.serializer.DateTimeSerializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.swagger.annotations.ApiModelProperty; + +import java.time.LocalDateTime; + +public class MqDataChangeMessageDto { + @ApiModelProperty("商户") + private Long tenantId; + @ApiModelProperty("变更数据") + private JsonNode data; + @ApiModelProperty("数据更新类型") + private LeMqConstant.DataChangeType changeType; + @ApiModelProperty("数据更新主题") + private LeMqConstant.Topic changeTopic; + @JsonSerialize( + using = DateTimeSerializer.YYYY_MM_DD_HH_MM_SS.class + ) + @JsonDeserialize( + using = DateTimeDeserializer.class + ) + @ApiModelProperty("发送时间") + private LocalDateTime sendTime; + @ApiModelProperty("用户") + private String userName; + @ApiModelProperty("操作员id") + private Long userId; + + public Long getTenantId() { + return this.tenantId; + } + + public JsonNode getData() { + return this.data; + } + + public LeMqConstant.DataChangeType getChangeType() { + return this.changeType; + } + + public LeMqConstant.Topic getChangeTopic() { + return this.changeTopic; + } + + public LocalDateTime getSendTime() { + return this.sendTime; + } + + public String getUserName() { + return this.userName; + } + + public Long getUserId() { + return this.userId; + } + + public void setTenantId(final Long tenantId) { + this.tenantId = tenantId; + } + + public void setData(final JsonNode data) { + this.data = data; + } + + public void setChangeType(final LeMqConstant.DataChangeType changeType) { + this.changeType = changeType; + } + + public void setChangeTopic(final LeMqConstant.Topic changeTopic) { + this.changeTopic = changeTopic; + } + + @JsonDeserialize( + using = DateTimeDeserializer.class + ) + public void setSendTime(final LocalDateTime sendTime) { + this.sendTime = sendTime; + } + + public void setUserName(final String userName) { + this.userName = userName; + } + + public void setUserId(final Long userId) { + this.userId = userId; + } + + + public MqDataChangeMessageDto() { + } + + private MqDataChangeMessageDto(final Long tenantId, final JsonNode data, final LeMqConstant.DataChangeType changeType, final LeMqConstant.Topic changeTopic, final LocalDateTime sendTime, final String userName, final Long userId) { + this.tenantId = tenantId; + this.data = data; + this.changeType = changeType; + this.changeTopic = changeTopic; + this.sendTime = sendTime; + this.userName = userName; + this.userId = userId; + } + + public static MqDataChangeMessageDto of(final Long tenantId, final JsonNode data, final LeMqConstant.DataChangeType changeType, final LeMqConstant.Topic changeTopic, final LocalDateTime sendTime, final String userName, final Long userId) { + return new MqDataChangeMessageDto(tenantId, data, changeType, changeTopic, sendTime, userName, userId); + } +} diff --git a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqPayload.java b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqPayload.java new file mode 100644 index 00000000..809a17cb --- /dev/null +++ b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqPayload.java @@ -0,0 +1,68 @@ +package com.bonus.core.order.mq; + +import com.bonus.core.common.constant.LeMqConstant; +import com.bonus.core.common.utils.LogUtil; +import com.bonus.core.common.utils.TenantContextHolder; + +public class MqPayload { + private Long tenantId; + private String traceId; + private LeMqConstant.Topic topic; + private String routingKey; + private T data; + + public String destination() { + String var10000 = this.topic.getKey(); + return var10000 + "." + this.tenantId; + } + + public static MqPayload of(T data, LeMqConstant.Topic topic, String routingKey) { + MqPayload payload = new MqPayload(); + payload.setData(data); + payload.setTopic(topic); + payload.setRoutingKey(routingKey); + payload.setTenantId(TenantContextHolder.getTenantId()); + payload.setTraceId(LogUtil.getCurrentTraceId()); + return payload; + } + + public Long getTenantId() { + return this.tenantId; + } + + public String getTraceId() { + return this.traceId; + } + + public LeMqConstant.Topic getTopic() { + return this.topic; + } + + public String getRoutingKey() { + return this.routingKey; + } + + public T getData() { + return this.data; + } + + public void setTenantId(final Long tenantId) { + this.tenantId = tenantId; + } + + public void setTraceId(final String traceId) { + this.traceId = traceId; + } + + public void setTopic(final LeMqConstant.Topic topic) { + this.topic = topic; + } + + public void setRoutingKey(final String routingKey) { + this.routingKey = routingKey; + } + + public void setData(final T data) { + this.data = data; + } +} diff --git a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqUtil.java b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqUtil.java index da9a4578..9c884c4f 100644 --- a/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqUtil.java +++ b/bonus-modules/bonus-smart-canteen/src/main/java/com/bonus/core/order/mq/MqUtil.java @@ -1,148 +1,149 @@ -//package com.bonus.core.order.mq; -// -//import com.bonus.common.security.utils.SecurityUtils; -//import com.bonus.core.common.constant.LeMqConstant; -//import com.bonus.core.common.utils.JacksonUtil; -//import com.bonus.core.common.utils.LogUtil; -//import com.bonus.core.common.utils.SpringContextHolder; -//import com.bonus.core.common.utils.TenantContextHolder; -//import com.bonus.core.mq.MQTemplate; -//import com.bonus.core.mq.dto.MqDataChangeMessageDto; -//import com.bonus.core.mq.MqPayload; -//import com.bonus.core.mq.tx.TxHolder; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.springframework.context.annotation.Lazy; -// -//import java.time.LocalDateTime; -// -//public class MqUtil { -// private static final Logger log = LoggerFactory.getLogger(MqUtil.class); -// @Lazy -// public static final MQTemplate mqTemplate = (MQTemplate) SpringContextHolder.getBean(MQTemplate.class); -// -// public static void send(T data, LeMqConstant.Topic topic) { -// try { -// log.info("发送消息,topic:{}", topic.getKey()); -// LogUtil.printArgs("消息体", data); -// String routing = topic.getKey(); -// mqTemplate.send(routing, MqPayload.of(data, topic, routing)); -// } catch (Exception var3) { -// log.error("发送MQ消息失败", var3); -// } -// -// } -// -// public static void send(T data, LeMqConstant.Topic topic, String routing) { -// try { -// log.info("发送消息,topic:{},routing:{}", topic.getKey(), routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.send(routing, MqPayload.of(data, topic, routing)); -// } catch (Exception var4) { -// log.error("发送MQ消息失败", var4); -// } -// -// } -// -// public static TxHolder sendByTx(T data, LeMqConstant.Topic topic) { -// String routing = topic.getKey(); -// log.info("发送事务消息,topic:{}", topic.getKey()); -// LogUtil.printArgs("消息体", data); -// return mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)); -// } -// -// public static void sendByTxEnd(T data, LeMqConstant.Topic topic) { -// String routing = topic.getKey(); -// log.info("发送事务消息,topic:{}", topic.getKey()); -// LogUtil.printArgs("消息体", data); -// mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).end(); -// } -// -// public static void sendByTxCommit(T data, LeMqConstant.Topic topic) { -// log.info("发送事务消息,topic:{}", topic.getKey()); -// LogUtil.printArgs("消息体", data); -// String routing = topic.getKey(); -// mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).commit(); -// } -// -// public static void sendDelay(T data, LeMqConstant.Topic topic, int delayMileSecond) { -// try { -// log.info("发送延迟消息,topic:{},delayMileSecond:{}", topic.getKey(), delayMileSecond); -// LogUtil.printArgs("消息体", data); -// String routing = topic.getKey(); -// mqTemplate.sendDelay(routing, MqPayload.of(data, topic, routing), (long)delayMileSecond); -// } catch (Exception var4) { -// log.error("发送事务MQ消息失败", var4); -// } -// -// } -// -// public static void sendDataChange(T data, LeMqConstant.DataChangeType changeType, LeMqConstant.Topic topic) { -// Long userId = null; -// -// try { -// userId = SecurityUtils.getUserId(); -// } catch (Exception var5) { -// } -// -// MqDataChangeMessageDto dto = MqDataChangeMessageDto.of(TenantContextHolder.getTenantId(), JacksonUtil.valueToTree(data), changeType, topic, LocalDateTime.now(), SecurityUtils.getUsername(), userId); -// send(dto, topic); -// } -// -// public static void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn) { -// String var10000 = topic.getKey(); -// String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn; -// log.info("推送给单条设备,routing:{}", routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.sendMqtt(routing, data); -// } -// -// public static void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn, Long merchantId) { -// String routing = topic.getKey() + "/" + merchantId + "/" + deviceSn; -// log.info("推送给单条设备,routing:{}", routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.sendMqtt(routing, data); -// } -// -// public static void pushToSingleTxDevice(T data, LeMqConstant.Topic topic, String deviceSn) { -// String var10000 = topic.getKey(); -// String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn; -// log.info("推送给单条设备,routing:{}", routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.mqttTxBegin(routing, data).end(); -// } -// -// public static void pushToSingleDevice(T data, String routing, String deviceSn) { -// routing = routing + "/" + TenantContextHolder.getTenantId(); -// if (deviceSn != null) { -// routing = routing + "/" + deviceSn; -// } -// -// log.info("推送给单条设备,routing:{}", routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.sendMqtt(routing, data); -// } -// -// public static void pushToTenantAllDevice(T data, LeMqConstant.Topic topic) { -// String var10000 = topic.getKey(); -// String routing = var10000 + "/" + TenantContextHolder.getTenantId(); -// log.info("推送给商户下所有设备,routing:{}", routing); -// mqTemplate.sendMqtt(routing, data); -// } -// -// public static void pushToTenantAllDeviceWithTenantId(T data, LeMqConstant.Topic topic, Long tenantId) { -// String var10000 = topic.getKey(); -// String routing = var10000 + "/" + tenantId; -// log.info("推送给商户下所有设备,routing:{}", routing); -// LogUtil.printArgs("消息体", data); -// mqTemplate.sendMqtt(routing, data); -// } -// -// public static void pushToAllDevice(T data, LeMqConstant.Topic topic) { -// mqTemplate.sendMqtt(topic.getKey(), data); -// } -// -// public static void pushToSingleDeviceCustomed(T data, String routing) { -// mqTemplate.sendMqtt(routing, data); -// } -//} +package com.bonus.core.order.mq; + +import com.bonus.common.security.utils.SecurityUtils; +import com.bonus.core.common.constant.LeMqConstant; +import com.bonus.core.common.utils.JacksonUtil; +import com.bonus.core.common.utils.LogUtil; +import com.bonus.core.common.utils.SpringContextHolder; +import com.bonus.core.common.utils.TenantContextHolder; +import com.bonus.mq.MQTemplate; +import com.bonus.mq.tx.TxHolder; +import lombok.SneakyThrows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; + +import java.time.LocalDateTime; + +public class MqUtil { + private static final Logger log = LoggerFactory.getLogger(MqUtil.class); + @Lazy + public static final MQTemplate mqTemplate = (MQTemplate) SpringContextHolder.getBean(MQTemplate.class); + + public static void send(T data, LeMqConstant.Topic topic) { + try { + log.info("发送消息,topic:{}", topic.getKey()); + LogUtil.printArgs("消息体", data); + String routing = topic.getKey(); + mqTemplate.send(routing, MqPayload.of(data, topic, routing)); + } catch (Exception var3) { + log.error("发送MQ消息失败", var3); + } + + } + + public static void send(T data, LeMqConstant.Topic topic, String routing) { + try { + log.info("发送消息,topic:{},routing:{}", topic.getKey(), routing); + LogUtil.printArgs("消息体", data); + mqTemplate.send(routing, MqPayload.of(data, topic, routing)); + } catch (Exception var4) { + log.error("发送MQ消息失败", var4); + } + + } + + @SneakyThrows + public static TxHolder sendByTx(T data, LeMqConstant.Topic topic) { + String routing = topic.getKey(); + log.info("发送事务消息,topic:{}", topic.getKey()); + LogUtil.printArgs("消息体", data); + return mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)); + } + @SneakyThrows + public static void sendByTxEnd(T data, LeMqConstant.Topic topic) { + String routing = topic.getKey(); + log.info("发送事务消息,topic:{}", topic.getKey()); + LogUtil.printArgs("消息体", data); + mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).end(); + } + @SneakyThrows + public static void sendByTxCommit(T data, LeMqConstant.Topic topic) { + log.info("发送事务消息,topic:{}", topic.getKey()); + LogUtil.printArgs("消息体", data); + String routing = topic.getKey(); + mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).commit(); + } + + public static void sendDelay(T data, LeMqConstant.Topic topic, int delayMileSecond) { + try { + log.info("发送延迟消息,topic:{},delayMileSecond:{}", topic.getKey(), delayMileSecond); + LogUtil.printArgs("消息体", data); + String routing = topic.getKey(); + mqTemplate.sendDelay(routing, MqPayload.of(data, topic, routing), (long)delayMileSecond); + } catch (Exception var4) { + log.error("发送事务MQ消息失败", var4); + } + + } + + public static void sendDataChange(T data, LeMqConstant.DataChangeType changeType, LeMqConstant.Topic topic) { + Long userId = null; + + try { + userId = SecurityUtils.getUserId(); + } catch (Exception var5) { + } + + MqDataChangeMessageDto dto = MqDataChangeMessageDto.of(TenantContextHolder.getTenantId(), JacksonUtil.valueToTree(data), changeType, topic, LocalDateTime.now(), SecurityUtils.getUsername(), userId); + send(dto, topic); + } + + @SneakyThrows + public static void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn) { + String var10000 = topic.getKey(); + String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn; + log.info("推送给单条设备,routing:{}", routing); + LogUtil.printArgs("消息体", data); + mqTemplate.sendMqtt(routing, data); + } + @SneakyThrows + public static void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn, Long merchantId) { + String routing = topic.getKey() + "/" + merchantId + "/" + deviceSn; + log.info("推送给单条设备,routing:{}", routing); + LogUtil.printArgs("消息体", data); + mqTemplate.sendMqtt(routing, data); + } + @SneakyThrows + public static void pushToSingleTxDevice(T data, LeMqConstant.Topic topic, String deviceSn) { + String var10000 = topic.getKey(); + String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn; + log.info("推送给单条设备,routing:{}", routing); + LogUtil.printArgs("消息体", data); + mqTemplate.mqttTxBegin(routing, data).end(); + } + @SneakyThrows + public static void pushToSingleDevice(T data, String routing, String deviceSn) { + routing = routing + "/" + TenantContextHolder.getTenantId(); + if (deviceSn != null) { + routing = routing + "/" + deviceSn; + } + + log.info("推送给单条设备,routing:{}", routing); + LogUtil.printArgs("消息体", data); + mqTemplate.sendMqtt(routing, data); + } + @SneakyThrows + public static void pushToTenantAllDevice(T data, LeMqConstant.Topic topic) { + String var10000 = topic.getKey(); + String routing = var10000 + "/" + TenantContextHolder.getTenantId(); + log.info("推送给商户下所有设备,routing:{}", routing); + mqTemplate.sendMqtt(routing, data); + } + @SneakyThrows + public static void pushToTenantAllDeviceWithTenantId(T data, LeMqConstant.Topic topic, Long tenantId) { + String var10000 = topic.getKey(); + String routing = var10000 + "/" + tenantId; + log.info("推送给商户下所有设备,routing:{}", routing); + LogUtil.printArgs("消息体", data); + mqTemplate.sendMqtt(routing, data); + } + @SneakyThrows + public static void pushToAllDevice(T data, LeMqConstant.Topic topic) { + mqTemplate.sendMqtt(topic.getKey(), data); + } + @SneakyThrows + public static void pushToSingleDeviceCustomed(T data, String routing) { + mqTemplate.sendMqtt(routing, data); + } +}