mq消息推送

This commit is contained in:
tqzhang 2025-02-13 10:58:39 +08:00
parent bca90e0c44
commit 2abf460e5b
4 changed files with 331 additions and 150 deletions

View File

@ -3,6 +3,7 @@ package com.bonus.core.customer.mq;
import com.bonus.core.common.constant.LeMqConstant;
import com.bonus.core.common.redis.RedisUtil;
import com.bonus.core.customer.constants.CustCacheKey;
import com.bonus.core.order.mq.MqUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@ -21,9 +22,9 @@ public class DelayedMessageSender {
String delayTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
customerMessageDTO.setDelayTime(delayTime);
if (customerMessageDTO.isDelay()) {
// MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, DELAY.intValue() * 1000);
MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, DELAY.intValue() * 1000);
} else {
// MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, 0);
MqUtil.sendDelay(customerMessageDTO, LeMqConstant.Topic.CUSTOMER_CHANGE_DELAY, 0);
}
RedisUtil.setString(CustCacheKey.getTenantKey(customerMessageDTO.getMessageType().name()), delayTime, 7200L);

View File

@ -0,0 +1,111 @@
package com.bonus.core.order.mq;
import com.bonus.core.common.constant.LeMqConstant;
import com.bonus.framework.config.deserializer.DateTimeDeserializer;
import com.bonus.framework.config.serializer.DateTimeSerializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.swagger.annotations.ApiModelProperty;
import java.time.LocalDateTime;
public class MqDataChangeMessageDto {
@ApiModelProperty("商户")
private Long tenantId;
@ApiModelProperty("变更数据")
private JsonNode data;
@ApiModelProperty("数据更新类型")
private LeMqConstant.DataChangeType changeType;
@ApiModelProperty("数据更新主题")
private LeMqConstant.Topic changeTopic;
@JsonSerialize(
using = DateTimeSerializer.YYYY_MM_DD_HH_MM_SS.class
)
@JsonDeserialize(
using = DateTimeDeserializer.class
)
@ApiModelProperty("发送时间")
private LocalDateTime sendTime;
@ApiModelProperty("用户")
private String userName;
@ApiModelProperty("操作员id")
private Long userId;
public Long getTenantId() {
return this.tenantId;
}
public JsonNode getData() {
return this.data;
}
public LeMqConstant.DataChangeType getChangeType() {
return this.changeType;
}
public LeMqConstant.Topic getChangeTopic() {
return this.changeTopic;
}
public LocalDateTime getSendTime() {
return this.sendTime;
}
public String getUserName() {
return this.userName;
}
public Long getUserId() {
return this.userId;
}
public void setTenantId(final Long tenantId) {
this.tenantId = tenantId;
}
public void setData(final JsonNode data) {
this.data = data;
}
public void setChangeType(final LeMqConstant.DataChangeType changeType) {
this.changeType = changeType;
}
public void setChangeTopic(final LeMqConstant.Topic changeTopic) {
this.changeTopic = changeTopic;
}
@JsonDeserialize(
using = DateTimeDeserializer.class
)
public void setSendTime(final LocalDateTime sendTime) {
this.sendTime = sendTime;
}
public void setUserName(final String userName) {
this.userName = userName;
}
public void setUserId(final Long userId) {
this.userId = userId;
}
public MqDataChangeMessageDto() {
}
private MqDataChangeMessageDto(final Long tenantId, final JsonNode data, final LeMqConstant.DataChangeType changeType, final LeMqConstant.Topic changeTopic, final LocalDateTime sendTime, final String userName, final Long userId) {
this.tenantId = tenantId;
this.data = data;
this.changeType = changeType;
this.changeTopic = changeTopic;
this.sendTime = sendTime;
this.userName = userName;
this.userId = userId;
}
public static MqDataChangeMessageDto of(final Long tenantId, final JsonNode data, final LeMqConstant.DataChangeType changeType, final LeMqConstant.Topic changeTopic, final LocalDateTime sendTime, final String userName, final Long userId) {
return new MqDataChangeMessageDto(tenantId, data, changeType, changeTopic, sendTime, userName, userId);
}
}

View File

@ -0,0 +1,68 @@
package com.bonus.core.order.mq;
import com.bonus.core.common.constant.LeMqConstant;
import com.bonus.core.common.utils.LogUtil;
import com.bonus.core.common.utils.TenantContextHolder;
public class MqPayload<T> {
private Long tenantId;
private String traceId;
private LeMqConstant.Topic topic;
private String routingKey;
private T data;
public String destination() {
String var10000 = this.topic.getKey();
return var10000 + "." + this.tenantId;
}
public static <T> MqPayload<T> of(T data, LeMqConstant.Topic topic, String routingKey) {
MqPayload<T> payload = new MqPayload();
payload.setData(data);
payload.setTopic(topic);
payload.setRoutingKey(routingKey);
payload.setTenantId(TenantContextHolder.getTenantId());
payload.setTraceId(LogUtil.getCurrentTraceId());
return payload;
}
public Long getTenantId() {
return this.tenantId;
}
public String getTraceId() {
return this.traceId;
}
public LeMqConstant.Topic getTopic() {
return this.topic;
}
public String getRoutingKey() {
return this.routingKey;
}
public T getData() {
return this.data;
}
public void setTenantId(final Long tenantId) {
this.tenantId = tenantId;
}
public void setTraceId(final String traceId) {
this.traceId = traceId;
}
public void setTopic(final LeMqConstant.Topic topic) {
this.topic = topic;
}
public void setRoutingKey(final String routingKey) {
this.routingKey = routingKey;
}
public void setData(final T data) {
this.data = data;
}
}

View File

@ -1,148 +1,149 @@
//package com.bonus.core.order.mq;
//
//import com.bonus.common.security.utils.SecurityUtils;
//import com.bonus.core.common.constant.LeMqConstant;
//import com.bonus.core.common.utils.JacksonUtil;
//import com.bonus.core.common.utils.LogUtil;
//import com.bonus.core.common.utils.SpringContextHolder;
//import com.bonus.core.common.utils.TenantContextHolder;
//import com.bonus.core.mq.MQTemplate;
//import com.bonus.core.mq.dto.MqDataChangeMessageDto;
//import com.bonus.core.mq.MqPayload;
//import com.bonus.core.mq.tx.TxHolder;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.context.annotation.Lazy;
//
//import java.time.LocalDateTime;
//
//public class MqUtil {
// private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
// @Lazy
// public static final MQTemplate mqTemplate = (MQTemplate) SpringContextHolder.getBean(MQTemplate.class);
//
// public static <T> void send(T data, LeMqConstant.Topic topic) {
// try {
// log.info("发送消息,topic:{}", topic.getKey());
// LogUtil.printArgs("消息体", data);
// String routing = topic.getKey();
// mqTemplate.send(routing, MqPayload.of(data, topic, routing));
// } catch (Exception var3) {
// log.error("发送MQ消息失败", var3);
// }
//
// }
//
// public static <T> void send(T data, LeMqConstant.Topic topic, String routing) {
// try {
// log.info("发送消息,topic:{},routing:{}", topic.getKey(), routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.send(routing, MqPayload.of(data, topic, routing));
// } catch (Exception var4) {
// log.error("发送MQ消息失败", var4);
// }
//
// }
//
// public static <T> TxHolder sendByTx(T data, LeMqConstant.Topic topic) {
// String routing = topic.getKey();
// log.info("发送事务消息,topic:{}", topic.getKey());
// LogUtil.printArgs("消息体", data);
// return mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing));
// }
//
// public static <T> void sendByTxEnd(T data, LeMqConstant.Topic topic) {
// String routing = topic.getKey();
// log.info("发送事务消息,topic:{}", topic.getKey());
// LogUtil.printArgs("消息体", data);
// mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).end();
// }
//
// public static <T> void sendByTxCommit(T data, LeMqConstant.Topic topic) {
// log.info("发送事务消息,topic:{}", topic.getKey());
// LogUtil.printArgs("消息体", data);
// String routing = topic.getKey();
// mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).commit();
// }
//
// public static <T> void sendDelay(T data, LeMqConstant.Topic topic, int delayMileSecond) {
// try {
// log.info("发送延迟消息,topic:{},delayMileSecond:{}", topic.getKey(), delayMileSecond);
// LogUtil.printArgs("消息体", data);
// String routing = topic.getKey();
// mqTemplate.sendDelay(routing, MqPayload.of(data, topic, routing), (long)delayMileSecond);
// } catch (Exception var4) {
// log.error("发送事务MQ消息失败", var4);
// }
//
// }
//
// public static <T> void sendDataChange(T data, LeMqConstant.DataChangeType changeType, LeMqConstant.Topic topic) {
// Long userId = null;
//
// try {
// userId = SecurityUtils.getUserId();
// } catch (Exception var5) {
// }
//
// MqDataChangeMessageDto dto = MqDataChangeMessageDto.of(TenantContextHolder.getTenantId(), JacksonUtil.valueToTree(data), changeType, topic, LocalDateTime.now(), SecurityUtils.getUsername(), userId);
// send(dto, topic);
// }
//
// public static <T> void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn) {
// String var10000 = topic.getKey();
// String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn;
// log.info("推送给单条设备,routing:{}", routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.sendMqtt(routing, data);
// }
//
// public static <T> void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn, Long merchantId) {
// String routing = topic.getKey() + "/" + merchantId + "/" + deviceSn;
// log.info("推送给单条设备,routing:{}", routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.sendMqtt(routing, data);
// }
//
// public static <T> void pushToSingleTxDevice(T data, LeMqConstant.Topic topic, String deviceSn) {
// String var10000 = topic.getKey();
// String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn;
// log.info("推送给单条设备,routing:{}", routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.mqttTxBegin(routing, data).end();
// }
//
// public static <T> void pushToSingleDevice(T data, String routing, String deviceSn) {
// routing = routing + "/" + TenantContextHolder.getTenantId();
// if (deviceSn != null) {
// routing = routing + "/" + deviceSn;
// }
//
// log.info("推送给单条设备,routing:{}", routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.sendMqtt(routing, data);
// }
//
// public static <T> void pushToTenantAllDevice(T data, LeMqConstant.Topic topic) {
// String var10000 = topic.getKey();
// String routing = var10000 + "/" + TenantContextHolder.getTenantId();
// log.info("推送给商户下所有设备,routing:{}", routing);
// mqTemplate.sendMqtt(routing, data);
// }
//
// public static <T> void pushToTenantAllDeviceWithTenantId(T data, LeMqConstant.Topic topic, Long tenantId) {
// String var10000 = topic.getKey();
// String routing = var10000 + "/" + tenantId;
// log.info("推送给商户下所有设备,routing:{}", routing);
// LogUtil.printArgs("消息体", data);
// mqTemplate.sendMqtt(routing, data);
// }
//
// public static <T> void pushToAllDevice(T data, LeMqConstant.Topic topic) {
// mqTemplate.sendMqtt(topic.getKey(), data);
// }
//
// public static <T> void pushToSingleDeviceCustomed(T data, String routing) {
// mqTemplate.sendMqtt(routing, data);
// }
//}
package com.bonus.core.order.mq;
import com.bonus.common.security.utils.SecurityUtils;
import com.bonus.core.common.constant.LeMqConstant;
import com.bonus.core.common.utils.JacksonUtil;
import com.bonus.core.common.utils.LogUtil;
import com.bonus.core.common.utils.SpringContextHolder;
import com.bonus.core.common.utils.TenantContextHolder;
import com.bonus.mq.MQTemplate;
import com.bonus.mq.tx.TxHolder;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import java.time.LocalDateTime;
public class MqUtil {
private static final Logger log = LoggerFactory.getLogger(MqUtil.class);
@Lazy
public static final MQTemplate mqTemplate = (MQTemplate) SpringContextHolder.getBean(MQTemplate.class);
public static <T> void send(T data, LeMqConstant.Topic topic) {
try {
log.info("发送消息,topic:{}", topic.getKey());
LogUtil.printArgs("消息体", data);
String routing = topic.getKey();
mqTemplate.send(routing, MqPayload.of(data, topic, routing));
} catch (Exception var3) {
log.error("发送MQ消息失败", var3);
}
}
public static <T> void send(T data, LeMqConstant.Topic topic, String routing) {
try {
log.info("发送消息,topic:{},routing:{}", topic.getKey(), routing);
LogUtil.printArgs("消息体", data);
mqTemplate.send(routing, MqPayload.of(data, topic, routing));
} catch (Exception var4) {
log.error("发送MQ消息失败", var4);
}
}
@SneakyThrows
public static <T> TxHolder sendByTx(T data, LeMqConstant.Topic topic) {
String routing = topic.getKey();
log.info("发送事务消息,topic:{}", topic.getKey());
LogUtil.printArgs("消息体", data);
return mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing));
}
@SneakyThrows
public static <T> void sendByTxEnd(T data, LeMqConstant.Topic topic) {
String routing = topic.getKey();
log.info("发送事务消息,topic:{}", topic.getKey());
LogUtil.printArgs("消息体", data);
mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).end();
}
@SneakyThrows
public static <T> void sendByTxCommit(T data, LeMqConstant.Topic topic) {
log.info("发送事务消息,topic:{}", topic.getKey());
LogUtil.printArgs("消息体", data);
String routing = topic.getKey();
mqTemplate.txBegin(routing, MqPayload.of(data, topic, routing)).commit();
}
public static <T> void sendDelay(T data, LeMqConstant.Topic topic, int delayMileSecond) {
try {
log.info("发送延迟消息,topic:{},delayMileSecond:{}", topic.getKey(), delayMileSecond);
LogUtil.printArgs("消息体", data);
String routing = topic.getKey();
mqTemplate.sendDelay(routing, MqPayload.of(data, topic, routing), (long)delayMileSecond);
} catch (Exception var4) {
log.error("发送事务MQ消息失败", var4);
}
}
public static <T> void sendDataChange(T data, LeMqConstant.DataChangeType changeType, LeMqConstant.Topic topic) {
Long userId = null;
try {
userId = SecurityUtils.getUserId();
} catch (Exception var5) {
}
MqDataChangeMessageDto dto = MqDataChangeMessageDto.of(TenantContextHolder.getTenantId(), JacksonUtil.valueToTree(data), changeType, topic, LocalDateTime.now(), SecurityUtils.getUsername(), userId);
send(dto, topic);
}
@SneakyThrows
public static <T> void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn) {
String var10000 = topic.getKey();
String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn;
log.info("推送给单条设备,routing:{}", routing);
LogUtil.printArgs("消息体", data);
mqTemplate.sendMqtt(routing, data);
}
@SneakyThrows
public static <T> void pushToSingleDevice(T data, LeMqConstant.Topic topic, String deviceSn, Long merchantId) {
String routing = topic.getKey() + "/" + merchantId + "/" + deviceSn;
log.info("推送给单条设备,routing:{}", routing);
LogUtil.printArgs("消息体", data);
mqTemplate.sendMqtt(routing, data);
}
@SneakyThrows
public static <T> void pushToSingleTxDevice(T data, LeMqConstant.Topic topic, String deviceSn) {
String var10000 = topic.getKey();
String routing = var10000 + "/" + TenantContextHolder.getTenantId() + "/" + deviceSn;
log.info("推送给单条设备,routing:{}", routing);
LogUtil.printArgs("消息体", data);
mqTemplate.mqttTxBegin(routing, data).end();
}
@SneakyThrows
public static <T> void pushToSingleDevice(T data, String routing, String deviceSn) {
routing = routing + "/" + TenantContextHolder.getTenantId();
if (deviceSn != null) {
routing = routing + "/" + deviceSn;
}
log.info("推送给单条设备,routing:{}", routing);
LogUtil.printArgs("消息体", data);
mqTemplate.sendMqtt(routing, data);
}
@SneakyThrows
public static <T> void pushToTenantAllDevice(T data, LeMqConstant.Topic topic) {
String var10000 = topic.getKey();
String routing = var10000 + "/" + TenantContextHolder.getTenantId();
log.info("推送给商户下所有设备,routing:{}", routing);
mqTemplate.sendMqtt(routing, data);
}
@SneakyThrows
public static <T> void pushToTenantAllDeviceWithTenantId(T data, LeMqConstant.Topic topic, Long tenantId) {
String var10000 = topic.getKey();
String routing = var10000 + "/" + tenantId;
log.info("推送给商户下所有设备,routing:{}", routing);
LogUtil.printArgs("消息体", data);
mqTemplate.sendMqtt(routing, data);
}
@SneakyThrows
public static <T> void pushToAllDevice(T data, LeMqConstant.Topic topic) {
mqTemplate.sendMqtt(topic.getKey(), data);
}
@SneakyThrows
public static <T> void pushToSingleDeviceCustomed(T data, String routing) {
mqTemplate.sendMqtt(routing, data);
}
}