Merge remote-tracking branch 'origin/main'

This commit is contained in:
cwchen 2024-07-29 18:10:55 +08:00
commit 20d3b877e1
8 changed files with 445 additions and 4 deletions

View File

@ -1,15 +1,33 @@
package com.bonus.aqd; package com.bonus.aqd;
import com.bonus.aqd.tcpservice.BootNettyServer;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration;
import org.springframework.scheduling.annotation.Async;
/**
* @author 黑子
*/
@MapperScan({"com.bonus.aqd.*.*.dao", "com.bonus.aqd.*.dao"}) @MapperScan({"com.bonus.aqd.*.*.dao", "com.bonus.aqd.*.dao"})
@SpringBootApplication(exclude={MongoAutoConfiguration.class}) @SpringBootApplication(exclude={MongoAutoConfiguration.class})
public class SpringBootSecurityApplication { public class SpringBootSecurityApplication implements CommandLineRunner {
@Value("${netty.port}")
public int port;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(SpringBootSecurityApplication.class, args); SpringApplication.run(SpringBootSecurityApplication.class, args);
System.err.println("智能安全带实时监测启动成功!"); System.err.println("智能安全带实时监测启动成功!");
} }
@Async
@Override
public void run(String... args) throws Exception {
System.err.println(port);
new BootNettyServer().bind(port);
}
} }

View File

@ -0,0 +1,28 @@
package com.bonus.aqd.tcpservice;
import io.netty.channel.Channel;
import lombok.Data;
/**
* 蚂蚁舞
*/
@Data
public class BootNettyChannel {
/**
* 连接客户端唯一的code
*/
private String code;
/**
* 客户端最新发送的消息内容
*/
private String report_last_data;
/**
* 通道
*/
private transient volatile Channel channel;
}

View File

@ -0,0 +1,52 @@
package com.bonus.aqd.tcpservice;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 蚂蚁舞
* @author 黑子
*/
public class BootNettyChannelCache {
/**
* 保证可先性adj.易变的动荡不定的反复无常的情绪易变的易怒的突然发作的
*/
public static volatile Map<String, BootNettyChannel> channelMapCache = new ConcurrentHashMap<String, BootNettyChannel>();
/**
* 增加
* @param code
* @param channel
*/
public static void add(String code, BootNettyChannel channel) {
channelMapCache.put(code, channel);
}
/**
* 获取
* @param code
* @return
*/
public static BootNettyChannel get(String code) {
return channelMapCache.get(code);
}
/**
* 移除
* @param code
*/
public static void remove(String code) {
channelMapCache.remove(code);
}
/**
* 保存如果不存在才保存
* @param code
* @param channel
*/
public static void save(String code, BootNettyChannel channel) {
if (channelMapCache.get(code) == null) {
add(code, channel);
}
}
}

View File

@ -0,0 +1,161 @@
package com.bonus.aqd.tcpservice;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* I/O数据读写处理类
* @author 黑子
*/
@ChannelHandler.Sharable
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
/**
* 注册时执行
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//注册通道
super.channelRegistered(ctx);
System.out.println("--channelRegistered--" + ctx.channel().id().toString());
}
/**
* 离线时执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
//通道离线
super.channelUnregistered(ctx);
System.out.println("--channelUnregistered--" + ctx.channel().id().toString());
}
/**
* 从客户端收到新的数据时这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
if (msg == null) {
return;
}
//转成String
String data = (String) msg;
//替换掉 换行 空格
data = data.replaceAll("\r|\n", "");
//获取通道ID 并打印
String channelId = ctx.channel().id().toString();
System.out.println("channelId=" + channelId + "data=" + data);
// 这里我将通道id作为code来使用实际是需要msg里来摘取的客户端数据里的唯一值的
// 如果没有则创建 如果有更新data值
BootNettyChannel b = BootNettyChannelCache.get("server:" + channelId);
if (b == null) {
//创建通道
BootNettyChannel bnc = new BootNettyChannel();
bnc.setChannel(ctx.channel());
//设置通道编码
bnc.setCode("server:" + channelId);
bnc.setReport_last_data(data);
//保存通道
BootNettyChannelCache.save("server:" + channelId, bnc);
} else {
//更新data值什么意思呢
b.setReport_last_data(data);
}
//回写给客户端
ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:" + channelId).getBytes()));
// netty的编码已经指定因此可以不需要再次确认编码
// ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8)));
//直接这样写入也可以
//ctx.write("我是服务端,我收到你的消息了!");
//ctx.flush();
} catch (Exception e) {
System.out.println("channelRead--" + e.toString());
}
}
/**
* 从客户端收到新的数据读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
System.out.println("channelReadComplete");
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
System.out.println("exceptionCaught");
//打印错误内容
cause.printStackTrace();
//获取到这个通道
BootNettyChannel bnc = BootNettyChannelCache.get("server:" + ctx.channel().id().toString());
if (bnc != null) {
//移除掉
BootNettyChannelCache.remove("server:" + ctx.channel().id().toString());
}
ctx.close();//抛出异常断开与客户端的连接
}
/**
* 客户端与服务端第一次建立连接时 执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
//调用原始方法
super.channelActive(ctx);
ctx.channel().read();
//获取到IP地址
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
//此处不能使用ctx.close()否则客户端始终无法与服务端建立连接
System.out.println("channelActive:" + clientIp + ctx.name());
}
/**
* 客户端与服务端 断连时 执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
//获取到IP
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
System.out.println("channelInactive:" + clientIp);
//获取到通道
BootNettyChannel bnc = BootNettyChannelCache.get("server:" + ctx.channel().id().toString());
if (bnc != null) {
//移除
BootNettyChannelCache.remove("server:" + ctx.channel().id().toString());
}
ctx.close(); //断开连接时必须关闭否则造成资源浪费并发量很大情况下可能造成宕机
}
/**
* 服务端当read超时, 会调用这个方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
super.userEventTriggered(ctx, evt);
//获取到通道
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
//获取到IP
String clientIp = inSocket.getAddress().getHostAddress();
ctx.close();//超时时断开连接
System.out.println("userEventTriggered:" + clientIp);
}
}

View File

@ -0,0 +1,46 @@
package com.bonus.aqd.tcpservice;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 通道初始化
*
* @author 黑子
*/
@ChannelHandler.Sharable
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
public static long READ_TIME_OUT = 120;
public static long WRITE_TIME_OUT = 60;
public static long ALL_TIME_OUT = 60;
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(READ_TIME_OUT, WRITE_TIME_OUT, ALL_TIME_OUT, TimeUnit.SECONDS));
// 带编码
ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
// // ChannelOutboundHandler依照逆序执行
// ch.pipeline().addLast("encoder", new StringEncoder());
//
// // 属于ChannelInboundHandler依照顺序执行
// ch.pipeline().addLast("decoder", new StringDecoder());
//自定义ChannelInboundHandlerAdapter
ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
}
}

View File

@ -0,0 +1,95 @@
package com.bonus.aqd.tcpservice;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
*
* @author 黑子
*/
public class BootNettyServer {
public void bind(int port) throws Exception {
/**
* 配置服务端的 NIO线程组
* NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
* *
* bossGroup用来接收进来的连接workerGroup用来处理已经被接收的连接,进行socketChannel的网络读写
* *
* bossGroup接收到连接后就会把连接信息注册到workerGroup
* *
* workerGroup的 EventLoopGroup默认的线程数是 CPU核数的二倍
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* 1.ServerBootstrap 是一个启动NIO服务的辅助启动类
*/
ServerBootstrap sb = new ServerBootstrap();
/**
* 2.设置group将bossGroup workerGroup线程组传递到 ServerBootstrap
*/
sb = sb.group(bossGroup, workerGroup);
/**
* 3.ServerSocketChannel是以NIO的selector为基础进行实现的用来接收新的连接
* * 这里告诉Channel通过NioServerSocketChannel获取新的连接
*/
sb = sb.channel(NioServerSocketChannel.class);
// option是设置 bossGroupchildOption是设置workerGroup
/**
* 4.服务端接受连接的队列长度如果队列已满客户端连接将被拒绝
* * (队列被接收后拒绝的客户端下次连接上来只要队列有空余就能连上)
*/
sb = sb.option(ChannelOption.SO_BACKLOG, 128);
/**
* 5.立即发送数据默认值为TureNetty默认为True而操作系统默认为False
* 该值设置Nagle算法的启用改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量
* * 如果需要发送一些较小的报文则需要禁用该算法
* Netty默认禁用该算法从而最小化报文传输延时
* 不延迟就是立刻发送*
*/
sb = sb.childOption(ChannelOption.TCP_NODELAY, true);
/**
* 6.连接保活默认值为False启用该功能时TCP会主动探测空闲连接的有效性
* 可以将此功能视为TCP的心跳机制默认的心跳间隔是7200s即2小时, Netty默认关闭该功能
*/
sb = sb.childOption(ChannelOption.SO_KEEPALIVE, true);
/**
* 7.设置 I/O处理类,主要用于网络I/O事件记录日志编码解码消息
*/
sb = sb.childHandler(new BootNettyChannelInitializer<SocketChannel>());
/**
* 8.绑定端口同步等待成功
*/
ChannelFuture f = sb.bind(port).sync();
if (f.isSuccess()) {
System.out.println("netty server start success!");
/**
* 9.等待服务器监听端口关闭
*/
f.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
System.out.println(e.toString());
} finally {
/**
* 退出释放线程池资源
*/
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
}

View File

@ -0,0 +1,38 @@
package com.bonus.aqd.tcpservice;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* @author 黑子
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}
/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务器收到消息: {}", msg.toString());
ctx.write("我是服务端,我收到你的消息了!");
ctx.flush();
}
/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

View File

@ -8,9 +8,9 @@ spring:
# 配置数据源 # 配置数据源
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/sz_aqd?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false url: jdbc:mysql://127.0.0.1:3306/aqdsb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false
username: root username: mroot
password: ccw1998@yyt1999 password: bonus@admin123
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
redis: redis:
host: 127.0.0.1 host: 127.0.0.1
@ -36,3 +36,6 @@ zhly:
aq: aq:
enable: false enable: false
netty:
port: 6665