websocket 数据对接

This commit is contained in:
haozq 2024-08-19 14:06:28 +08:00
parent fc3ecd9f20
commit b4c296d3b8
8 changed files with 65 additions and 20 deletions

View File

@ -15,7 +15,7 @@ public interface DevDataMapper {
* 更新双沟状态 * 更新双沟状态
* @param devVO * @param devVO
*/ */
void updateData(DevVO devVO); int updateData(DevVO devVO);
/** /**
* 更新在线状态 * 更新在线状态
@ -37,4 +37,10 @@ public interface DevDataMapper {
void insertWarnInfo(WarnInfoVo warnInfoVo); void insertWarnInfo(WarnInfoVo warnInfoVo);
void updateStatus(DevVO vo); void updateStatus(DevVO vo);
/**
* 查询告警设备 的通道
* @return
*/
String getWarnChann(String devCode);
} }

View File

@ -1,6 +1,7 @@
package com.bonus.aqd.tcpservice; package com.bonus.aqd.tcpservice;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data; import lombok.Data;
/** /**
@ -23,6 +24,7 @@ public class BootNettyChannel {
*/ */
private transient volatile Channel channel; private transient volatile Channel channel;
private transient volatile ChannelHandlerContext context;
} }

View File

@ -82,6 +82,7 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
//创建通道 //创建通道
BootNettyChannel bnc = new BootNettyChannel(); BootNettyChannel bnc = new BootNettyChannel();
bnc.setChannel(ctx.channel()); bnc.setChannel(ctx.channel());
bnc.setContext(ctx);
//设置通道编码 //设置通道编码
bnc.setCode("server:" + channelId); bnc.setCode("server:" + channelId);
bnc.setReport_last_data(data); bnc.setReport_last_data(data);
@ -91,14 +92,16 @@ public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandler
//更新data值什么意思呢 //更新data值什么意思呢
b.setReport_last_data(data); b.setReport_last_data(data);
} }
// ctx.channel().writeAndFlush("你好");
// ctx.channel().flush();
//回写给客户端 //回写给客户端
ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:" + channelId).getBytes())); ctx.writeAndFlush(Unpooled.buffer().writeBytes(("55aa78865ccd000103000000000d").getBytes()));
// netty的编码已经指定因此可以不需要再次确认编码 // netty的编码已经指定因此可以不需要再次确认编码
// ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8))); // ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8)));
//直接这样写入也可以 // //直接这样写入也可以
//ctx.write("我是服务端,我收到你的消息了!"); // ctx.write("我是服务端,我收到你的消息了!");
//ctx.flush(); // ctx.flush();
} catch (Exception e) { } catch (Exception e) {
System.out.println("channelRead--" + e.toString()); System.out.println("channelRead--" + e.toString());

View File

@ -1,19 +1,29 @@
package com.bonus.aqd.tcpservice; package com.bonus.aqd.tcpservice;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* 蚂蚁舞 * 蚂蚁舞
* @author 黑子 * @author 黑子
*/ */
@Component
public class BootNettyServer { public class BootNettyServer {
public void bind(int port) throws Exception { public void bind(int port) throws Exception {
/** /**
@ -90,6 +100,12 @@ public class BootNettyServer {
bossGroup.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync();
} }
}
public void sendMessage(String clientId, Object message) {
BootNettyChannel bnc = BootNettyChannelCache.get("server:" + clientId);
if (bnc.getChannel() != null && bnc.getChannel().isActive()) {
System.err.println("发送了消息:" +message);
bnc.getContext().writeAndFlush( Unpooled.buffer().writeBytes((message.toString()).getBytes()));
}
} }
} }

View File

@ -41,7 +41,8 @@ public class SaveDataService {
@Autowired @Autowired
private AqdMapper aqdMapper; private AqdMapper aqdMapper;
@Autowired
private BootNettyServer nettyTcpServer;
/** /**
* 更新数据 * 更新数据
* @param msg * @param msg
@ -95,6 +96,9 @@ public class SaveDataService {
warnInfoVo.setDevModule("单钩"); warnInfoVo.setDevModule("单钩");
VO.setWarnInfo("2"); VO.setWarnInfo("2");
} }
// 查询告警 通道
String warnChann=mapper.getWarnChann(imei);
sendMessageToClient(warnChann,"55aa78865ccd000103000000000d");
mapper.insertWarnInfo(warnInfoVo); mapper.insertWarnInfo(warnInfoVo);
} }
//蓝牙终端 //蓝牙终端
@ -117,9 +121,8 @@ public class SaveDataService {
} }
mapper.insertWarnInfo(warnInfoVo); mapper.insertWarnInfo(warnInfoVo);
} }
mapper.updateData(VO); int num= mapper.updateData(VO);
System.err.println("操作成数据=="+num);
System.err.println(msg); System.err.println(msg);
}catch (Exception e){ }catch (Exception e){
log.error(e.toString(),e); log.error(e.toString(),e);
@ -187,6 +190,18 @@ public class SaveDataService {
log.error(e.toString()); log.error(e.toString());
} }
} }
/**
* 给指定通道发消息
* @param clientId
* @param message
*/
public void sendMessageToClient(String clientId, Object message) {
nettyTcpServer.sendMessage(clientId, message);
}
@Async @Async
public void insertData(String msg){ public void insertData(String msg){
try { try {

View File

@ -22,7 +22,7 @@ public class AqdService {
public static String ws_url="ws://36.33.26.201:21995"; public static String ws_url="ws://36.33.26.201:21995";
public static String api_url="http://36.33.26.201:21995/config/getDevConfig"; public static String api_url="http://36.33.26.201:21995/config/getDevConfig?device_id=";
@Autowired @Autowired
private AqdMapper mapper; private AqdMapper mapper;
@ -53,6 +53,8 @@ public class AqdService {
data.put("model_switch",model_switch); data.put("model_switch",model_switch);
JSONObject configData=data.getJSONObject("config"); JSONObject configData=data.getJSONObject("config");
configData.put("ws_url",ws_url); configData.put("ws_url",ws_url);
api_url= api_url+ddvId+"&ca_ver="+vo.getCa_ver();
System.err.println(api_url);
configData.put("api_url",api_url); configData.put("api_url",api_url);
json.getJSONObject("data").put("config",configData); json.getJSONObject("data").put("config",configData);
return json.toJSONString(); return json.toJSONString();

View File

@ -1,6 +1,6 @@
# 配置端口 # 配置端口
server: server:
port: 21995 port: 21994
# servlet: # servlet:
# context-path: /aqd_screen # context-path: /aqd_screen
max-http-header-size: 10240 max-http-header-size: 10240
@ -39,5 +39,5 @@ zhly:
enable: false enable: false
netty: netty:
port: 6666 port: 21995

View File

@ -25,13 +25,8 @@
</update> </update>
<update id="updateDevStatus" parameterType="com.bonus.aqd.base.entity.DevVO"> <update id="updateDevStatus" parameterType="com.bonus.aqd.base.entity.DevVO">
update tb_device update tb_device
set dev_status=#{devStatus}, set warn_chann= #{channId}
<if test="channId!=null and channId!=''"> where warn_code=#{devCode}
chann_id= #{channId},
</if>
state_time=now(),
dev_time=now()
where dev_code=#{devCode}
</update> </update>
<update id="downDevStatus" parameterType="com.bonus.aqd.base.entity.DevVO"> <update id="downDevStatus" parameterType="com.bonus.aqd.base.entity.DevVO">
update tb_device update tb_device
@ -43,4 +38,10 @@
set dev_status=0 set dev_status=0
where dev_code=#{devCode} where dev_code=#{devCode}
</update> </update>
<!--查询告警通道-->
<select id="getWarnChann" resultType="java.lang.String">
select warn_chann
from tb_device
where dev_code=#{devCode}
</select>
</mapper> </mapper>