websocket数据对接

This commit is contained in:
haozq 2024-08-16 19:44:18 +08:00
parent 7f22375f93
commit fc3ecd9f20
18 changed files with 373 additions and 56 deletions

View File

@ -35,4 +35,6 @@ public interface DevDataMapper {
* @param warnInfoVo
*/
void insertWarnInfo(WarnInfoVo warnInfoVo);
void updateStatus(DevVO vo);
}

View File

@ -101,7 +101,7 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
protected void configure(HttpSecurity http) throws Exception {
http.authorizeRequests()
// 不进行权限验证的请求或资源(从配置文件中读取)
.antMatchers("/", "/login/**","/favicon.ico","/websocket/**","/ws/**").permitAll()
.antMatchers("/", "/login/**","/favicon.ico","/websocket/**","/ws/**","/config/**").permitAll()
// 其他的需要登陆后才能访问
.anyRequest().authenticated()
.and()

View File

@ -2,6 +2,7 @@ package com.bonus.aqd.tcpservice;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -13,6 +14,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
/**
* I/O数据读写处理类
@ -23,7 +25,7 @@ import java.net.InetSocketAddress;
@Slf4j
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
public static Map<String, Channel> channelMap;
public SaveDataService service = SpringUtils.getBean(SaveDataService.class);

View File

@ -1,35 +0,0 @@
package com.bonus.aqd.tcpservice;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务器收到消息: {}", msg.toString());
ctx.write("我是服务端,我收到你的消息了!");
ctx.flush();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -1,5 +1,6 @@
package com.bonus.aqd.tcpservice;
import com.alibaba.fastjson.JSONObject;
import com.bonus.aqd.base.dao.DevDataMapper;
import com.bonus.aqd.base.dao.IndexMapper;
import com.bonus.aqd.base.entity.DevVO;
@ -10,6 +11,11 @@ import com.bonus.aqd.manager.common.util.DateTimeHelper;
import com.bonus.aqd.manager.common.util.StringHelper;
import com.bonus.aqd.manager.core.dao.SysUserDao;
import com.bonus.aqd.manager.core.entity.SysUserEntity;
import com.bonus.aqd.websocket.config.WsConfig;
import com.bonus.aqd.websocket.dao.AqdMapper;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
@ -32,6 +38,10 @@ public class SaveDataService {
@Autowired
private DevDataMapper mapper;
@Autowired
private AqdMapper aqdMapper;
/**
* 更新数据
* @param msg
@ -51,7 +61,71 @@ public class SaveDataService {
}
}
@Async
public void addWsSocket(String msg){
try{
JSONObject obj=JSONObject.parseObject(msg);
String imei=obj.getString("imei");
String hanger_a_status=obj.getString("hanger_a_status");
String hanger_b_status=obj.getString("hanger_b_status");
DevVO VO = new DevVO();
VO.setDevCode(imei);
VO.setDevA(hanger_a_status);
VO.setDevB(hanger_b_status);
WarnInfoVo warnInfoVo=new WarnInfoVo();
warnInfoVo.setDevCode(imei);
String time=DateTimeHelper.getNowTime();
VO.setDevAtime(time);
VO.setDevBtime(time);
//有告警
if(WsConfig.warn.equals(hanger_a_status) || WsConfig.warn.equals(hanger_b_status)){
if(WsConfig.warn.equals(hanger_a_status) && WsConfig.warn.equals(hanger_b_status)){
warnInfoVo.setWarnContent("双钩脱落");
warnInfoVo.setWarnReason("双钩-双钩脱落");
warnInfoVo.setDevModule("双钩");
VO.setWarnInfo("3");
}else if(WsConfig.warn.equals(hanger_a_status) ){
warnInfoVo.setWarnContent("A钩脱落");
warnInfoVo.setWarnReason("单钩-A钩脱落");
warnInfoVo.setDevModule("单钩");
VO.setWarnInfo("1");
}else if(WsConfig.warn.equals(hanger_b_status) ){
warnInfoVo.setWarnContent("B钩脱落");
warnInfoVo.setWarnReason("单钩-B钩脱落");
warnInfoVo.setDevModule("单钩");
VO.setWarnInfo("2");
}
mapper.insertWarnInfo(warnInfoVo);
}
//蓝牙终端
if(WsConfig.err.equals(hanger_a_status) || WsConfig.err.equals(hanger_b_status)){
if(WsConfig.err.equals(hanger_a_status) && WsConfig.err.equals(hanger_b_status)){
warnInfoVo.setWarnContent("双钩蓝牙通讯中断");
warnInfoVo.setWarnReason("双钩-双钩蓝牙通讯中断");
warnInfoVo.setDevModule("双钩");
VO.setWarnInfo("33");
}else if(WsConfig.warn.equals(hanger_a_status) ){
warnInfoVo.setWarnContent("A钩蓝牙通讯中断");
warnInfoVo.setWarnReason("单钩-A钩蓝牙通讯中断");
warnInfoVo.setDevModule("单钩");
VO.setWarnInfo("11");
}else if(WsConfig.warn.equals(hanger_b_status) ){
warnInfoVo.setWarnContent("B钩蓝牙通讯中断");
warnInfoVo.setWarnReason("单钩-B钩蓝牙通讯中断");
warnInfoVo.setDevModule("单钩");
VO.setWarnInfo("22");
}
mapper.insertWarnInfo(warnInfoVo);
}
mapper.updateData(VO);
System.err.println(msg);
}catch (Exception e){
log.error(e.toString(),e);
}
}
@Async
public void updateDataStatus(String msg,String channId){
try{
@ -66,14 +140,25 @@ public class SaveDataService {
}catch (Exception e){
log.error(e.toString(),e);
}
}
/**
* 更新状态
* @param chinnId
* 更新状态 ->上线
* @param keyId
*/
@Async
public void updateStatus(String keyId,String channId){
try {
DevVO VO = new DevVO();
VO.setDevCode(keyId);
VO.setChannId(channId);
VO.setDevStatus("1");
mapper.updateDevStatus(VO);
}catch (Exception e){
log.error(e.toString());
}
}
@Async
public void downDevStatus(String chinnId){
try {
DevVO VO = new DevVO();
@ -175,4 +260,9 @@ public class SaveDataService {
System.err.println(decimal3);
System.err.println(decimal);
}
public String getResult(int i) {
String result= aqdMapper.getConfigData(i);
return result;
}
}

View File

@ -9,17 +9,14 @@ import javax.annotation.PostConstruct;
/**
* @author 黑子
*/
@Component
public class ServerHandler extends IoHandlerAdapter {
@Autowired
protected SaveDataService service;
private static ServerHandler serverHandler ;
@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
serverHandler = this;
serverHandler.service = this.service;
// 初使化时将已静态化的testService实例化
}
//
// protected SaveDataService service;
// private static ServerHandler serverHandler ;
// public void init() {
// serverHandler = this;
// serverHandler.service = this.service;
// // 初使化时将已静态化的testService实例化
// }
}

View File

@ -25,7 +25,7 @@ public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注册 socket处理器
registry.addHandler(AqdSocketHandler (), "/ws/aqd").setAllowedOrigins("*");
registry.addHandler(AqdSocketHandler (), "").setAllowedOrigins("*");
}
@Bean

View File

@ -0,0 +1,18 @@
package com.bonus.aqd.websocket.config;
/**
* @author 黑子
*/
public class WsConfig {
public static String login="ca_login";
public static String location="ca_report_location";
public static String warn="0";
public static String err="2";
}

View File

@ -0,0 +1,47 @@
package com.bonus.aqd.websocket.config;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.socket.WebSocketSession;
import java.util.HashMap;
/**
* /**
* * ws 全局数据
* * @author 黑子
* */
//@ControllerAdvice
public class WsGlobal {
public static HashMap<String,Object> map =new HashMap<>();
@ModelAttribute(value="getMap")
public static WebSocketSession getMap(String key){
WebSocketSession channelContext= (WebSocketSession) map.get(key);
return channelContext;
}
@ModelAttribute(value="getKey")
public static String getKey(String key){
String channelContext= (String) map.get(key);
return channelContext;
}
@ModelAttribute(value="addKey")
public static void addKey(String key, String obj){
map.put(key,obj);
}
@ModelAttribute(value="add")
public static void addMap(String key, WebSocketSession obj){
map.put(key,obj);
}
@ModelAttribute(value="remove")
public static void remove(String key){
map.remove(key);
}
}

View File

@ -0,0 +1,41 @@
package com.bonus.aqd.websocket.controller;
import com.bonus.aqd.base.entity.dto.ParamsDto;
import com.bonus.aqd.manager.annotation.DecryptAndVerify;
import com.bonus.aqd.manager.core.entity.EncryptedReq;
import com.bonus.aqd.manager.webResult.ServerResponse;
import com.bonus.aqd.websocket.service.AqdService;
import com.bonus.aqd.websocket.vo.AqdVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 安全带配置请求路径
* @author 黑子
*/
@RestController
@RequestMapping("/config/")
@Slf4j
public class AqdController {
@Autowired
private AqdService service;
/**
* 安全带设备列表
*
* @param
* @return ServerResponse
* @author cwchen
* @date 2024/7/29 14:32
*/
@GetMapping("getDevConfig")
public String getDevConfig(AqdVo vo) {
return service.getDevConfig(vo);
}
}

View File

@ -0,0 +1,16 @@
package com.bonus.aqd.websocket.dao;
import org.apache.ibatis.annotations.Mapper;
/**
* @author 黑子
*/
@Mapper
public interface AqdMapper {
/**
* 查询数据
* @param type
* @return
*/
String getConfigData(int type);
}

View File

@ -1,20 +1,32 @@
package com.bonus.aqd.websocket.handle;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bonus.aqd.tcpservice.SaveDataService;
import com.bonus.aqd.tcpservice.SpringUtils;
import com.bonus.aqd.websocket.config.WsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* 安全带文本socket
* @author 黑子
*/
@Component
public class AqdSocketHandler implements org.springframework.web.socket.WebSocketHandler {
private static final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
public SaveDataService service = SpringUtils.getBean(SaveDataService.class);
private final static Logger LOGGER = LoggerFactory.getLogger(AqdSocketHandler.class);
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.put(session.getId(),session);
LOGGER.info("Socket 连接成功sessionId{}", session.getId());
}
@ -27,9 +39,22 @@ public class AqdSocketHandler implements org.springframework.web.socket.WebSocke
}
}
public void handlerTextMessage(WebSocketSession session, TextMessage message) throws Exception {
final String msg = message.getPayload();
String result=service.getResult(1);
JSONObject json= JSON.parseObject(msg);
String act=json.getString("act");
JSONObject res= JSON.parseObject(result);
if(WsConfig.location.equals(act)){
service.addWsSocket(msg);
res.put("msg","数据上传成功!");
}else if(WsConfig.login.equals(act)){
String devId=json.getString("device_id");
service.updateStatus(devId,session.getId());
}
res.put("cmd",act);
session.sendMessage(new TextMessage(res.toJSONString()));
LOGGER.info(" Socket 收到消息:{}", msg);
}
@ -40,9 +65,11 @@ public class AqdSocketHandler implements org.springframework.web.socket.WebSocke
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
service.downDevStatus(session.getId());
LOGGER.info(" Socket 关闭sessionId{}", session.getId());
}
@Override
public boolean supportsPartialMessages() {
return false;

View File

@ -0,0 +1,61 @@
package com.bonus.aqd.websocket.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bonus.aqd.base.dao.DevDataMapper;
import com.bonus.aqd.base.entity.DevVO;
import com.bonus.aqd.manager.common.util.StringHelper;
import com.bonus.aqd.manager.webResult.ServerResponse;
import com.bonus.aqd.websocket.dao.AqdMapper;
import com.bonus.aqd.websocket.vo.AqdVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author 黑子
*/
@Service
@Slf4j
public class AqdService {
public static String ws_url="ws://36.33.26.201:21995";
public static String api_url="http://36.33.26.201:21995/config/getDevConfig";
@Autowired
private AqdMapper mapper;
@Autowired
private DevDataMapper devDataMapper;
/**
* 查询安全带数据
* @param vo
* @return
*/
public String getDevConfig(AqdVo vo) {
log.info("数据->>>{}",vo);
String ddvId=vo.getDevice_id();
if(StringHelper.isNotEmpty(ddvId)){
DevVO VO = new DevVO();
VO.setDevCode(ddvId);
VO.setDevStatus("1");
devDataMapper.updateDevStatus(VO);
}
String config= mapper.getConfigData(0);
JSONObject json= JSON.parseObject(config);
JSONObject data=json.getJSONObject("data");
JSONArray ai_broadcast=json.getJSONArray("ai_broadcast");
data.put("ai_broadcast",ai_broadcast);
JSONObject notice_switch=json.getJSONObject("notice_switch");
data.put("notice_switch",notice_switch);
JSONObject model_switch=json.getJSONObject("model_switch");
data.put("model_switch",model_switch);
JSONObject configData=data.getJSONObject("config");
configData.put("ws_url",ws_url);
configData.put("api_url",api_url);
json.getJSONObject("data").put("config",configData);
return json.toJSONString();
}
}

View File

@ -0,0 +1,26 @@
package com.bonus.aqd.websocket.vo;
import lombok.Data;
/**
* 安全带实体类
* @author 黑子
*/
@Data
public class AqdVo {
private String device_id;
private String ca_ver;
private String app_version;
@Override
public String toString() {
return "AqdVo{" +
"device_id='" + device_id + '\'' +
", ca_ver='" + ca_ver + '\'' +
", app_version='" + app_version + '\'' +
'}';
}
}

View File

@ -0,0 +1,8 @@
package com.bonus.aqd.websocket.vo;
/**
* S
*/
public class Config {
}

View File

@ -1,8 +1,8 @@
# 配置端口
server:
port: 21995
servlet:
context-path: /aqd_screen
# servlet:
# context-path: /aqd_screen
max-http-header-size: 10240
spring:
# 配置数据源

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.bonus.aqd.websocket.dao.AqdMapper">
<select id="getConfigData" resultType="java.lang.String">
select config
from tb_config_data
where type=#{type}
</select>
</mapper>

View File

@ -38,4 +38,9 @@
set dev_status=0, chann_id= #{channId}
where chann_id=#{channId}
</update>
<update id="updateStatus">
update tb_device
set dev_status=0
where dev_code=#{devCode}
</update>
</mapper>