diff --git a/src/main/java/com/bonus/aqd/SpringBootSecurityApplication.java b/src/main/java/com/bonus/aqd/SpringBootSecurityApplication.java index 960ddba..51f1229 100644 --- a/src/main/java/com/bonus/aqd/SpringBootSecurityApplication.java +++ b/src/main/java/com/bonus/aqd/SpringBootSecurityApplication.java @@ -1,15 +1,33 @@ package com.bonus.aqd; +import com.bonus.aqd.tcpservice.BootNettyServer; import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.scheduling.annotation.Async; +/** + * @author 黑子 + */ @MapperScan({"com.bonus.aqd.*.*.dao", "com.bonus.aqd.*.dao"}) @SpringBootApplication(exclude={MongoAutoConfiguration.class}) -public class SpringBootSecurityApplication { +public class SpringBootSecurityApplication implements CommandLineRunner { + + + @Value("${netty.port}") + public int port; public static void main(String[] args) { SpringApplication.run(SpringBootSecurityApplication.class, args); System.err.println("智能安全带实时监测启动成功!"); } + + @Async + @Override + public void run(String... args) throws Exception { + System.err.println(port); + new BootNettyServer().bind(port); + } } diff --git a/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannel.java b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannel.java new file mode 100644 index 0000000..7e246ce --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannel.java @@ -0,0 +1,28 @@ +package com.bonus.aqd.tcpservice; + +import io.netty.channel.Channel; +import lombok.Data; + +/** + * 蚂蚁舞 + */ +@Data +public class BootNettyChannel { + + /** + * 连接客户端唯一的code + */ + private String code; + /** + * 客户端最新发送的消息内容 + */ + private String report_last_data; + + /** + * 通道 + */ + private transient volatile Channel channel; + + +} + diff --git a/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelCache.java b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelCache.java new file mode 100644 index 0000000..6c47a17 --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelCache.java @@ -0,0 +1,52 @@ +package com.bonus.aqd.tcpservice; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 蚂蚁舞 + * @author 黑子 + */ +public class BootNettyChannelCache { + /** + * 保证可先性。adj.易变的,动荡不定的,反复无常的;(情绪)易变的,易怒的,突然发作的; + */ + public static volatile Map channelMapCache = new ConcurrentHashMap(); + + /** + * 增加 + * @param code + * @param channel + */ + public static void add(String code, BootNettyChannel channel) { + channelMapCache.put(code, channel); + } + + /** + * 获取 + * @param code + * @return + */ + public static BootNettyChannel get(String code) { + return channelMapCache.get(code); + } + + /** + * 移除 + * @param code + */ + public static void remove(String code) { + channelMapCache.remove(code); + } + + /** + * 保存:如果不存在,才保存 + * @param code + * @param channel + */ + public static void save(String code, BootNettyChannel channel) { + if (channelMapCache.get(code) == null) { + add(code, channel); + } + } +} diff --git a/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInboundHandlerAdapter.java b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInboundHandlerAdapter.java new file mode 100644 index 0000000..3b8f5ad --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInboundHandlerAdapter.java @@ -0,0 +1,161 @@ +package com.bonus.aqd.tcpservice; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * I/O数据读写处理类 + * @author 黑子 + */ +@ChannelHandler.Sharable +public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { + + /** + * 注册时执行 + */ + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + //注册通道 + super.channelRegistered(ctx); + System.out.println("--channelRegistered--" + ctx.channel().id().toString()); + } + + /** + * 离线时执行 + */ + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + //通道离线 + super.channelUnregistered(ctx); + System.out.println("--channelUnregistered--" + ctx.channel().id().toString()); + } + + /** + * 从客户端收到新的数据时,这个方法会在收到消息时被调用 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + if (msg == null) { + return; + } + //转成String + String data = (String) msg; + //替换掉 换行 和 空格 + data = data.replaceAll("\r|\n", ""); + //获取通道ID 并打印 + String channelId = ctx.channel().id().toString(); + System.out.println("channelId=" + channelId + "data=" + data); + + // 这里我将通道id作为code来使用,实际是需要msg里来摘取的客户端数据里的唯一值的 + // 如果没有则创建 如果有,更新data值 + BootNettyChannel b = BootNettyChannelCache.get("server:" + channelId); + if (b == null) { + //创建通道 + BootNettyChannel bnc = new BootNettyChannel(); + bnc.setChannel(ctx.channel()); + //设置通道编码 + bnc.setCode("server:" + channelId); + bnc.setReport_last_data(data); + //保存通道 + BootNettyChannelCache.save("server:" + channelId, bnc); + } else { + //更新data值什么意思呢? + b.setReport_last_data(data); + } + //回写给客户端 + ctx.writeAndFlush(Unpooled.buffer().writeBytes(("server:" + channelId).getBytes())); + // netty的编码已经指定,因此可以不需要再次确认编码 + // ctx.writeAndFlush(Unpooled.buffer().writeBytes(channelId.getBytes(CharsetUtil.UTF_8))); + + //直接这样写入也可以 + //ctx.write("我是服务端,我收到你的消息了!"); + //ctx.flush(); + + } catch (Exception e) { + System.out.println("channelRead--" + e.toString()); + } + } + + /** + * 从客户端收到新的数据、读取完成时调用 + */ + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { + System.out.println("channelReadComplete"); + ctx.flush(); + } + + /** + * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException { + System.out.println("exceptionCaught"); + //打印错误内容 + cause.printStackTrace(); + //获取到这个通道 + BootNettyChannel bnc = BootNettyChannelCache.get("server:" + ctx.channel().id().toString()); + if (bnc != null) { + //移除掉 + BootNettyChannelCache.remove("server:" + ctx.channel().id().toString()); + } + ctx.close();//抛出异常,断开与客户端的连接 + } + + /** + * 客户端与服务端第一次建立连接时 执行 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException { + //调用原始方法 + super.channelActive(ctx); + ctx.channel().read(); + //获取到IP地址 + InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = inSocket.getAddress().getHostAddress(); + //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接 + System.out.println("channelActive:" + clientIp + ctx.name()); + } + + /** + * 客户端与服务端 断连时 执行 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { + super.channelInactive(ctx); + //获取到IP + InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = inSocket.getAddress().getHostAddress(); + System.out.println("channelInactive:" + clientIp); + + //获取到通道 + BootNettyChannel bnc = BootNettyChannelCache.get("server:" + ctx.channel().id().toString()); + if (bnc != null) { + //移除 + BootNettyChannelCache.remove("server:" + ctx.channel().id().toString()); + } + ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机 + } + + /** + * 服务端当read超时, 会调用这个方法 + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { + super.userEventTriggered(ctx, evt); + //获取到通道 + InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + //获取到IP + String clientIp = inSocket.getAddress().getHostAddress(); + + ctx.close();//超时时断开连接 + System.out.println("userEventTriggered:" + clientIp); + } + +} diff --git a/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInitializer.java b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInitializer.java new file mode 100644 index 0000000..1821668 --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/BootNettyChannelInitializer.java @@ -0,0 +1,46 @@ +package com.bonus.aqd.tcpservice; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; + +import java.util.concurrent.TimeUnit; + +/** + * 通道初始化 + * + * @author 黑子 + */ +@ChannelHandler.Sharable +public class BootNettyChannelInitializer extends ChannelInitializer { + + public static long READ_TIME_OUT = 120; + + public static long WRITE_TIME_OUT = 60; + + public static long ALL_TIME_OUT = 60; + + @Override + protected void initChannel(Channel ch) throws Exception { + + ch.pipeline().addLast(new IdleStateHandler(READ_TIME_OUT, WRITE_TIME_OUT, ALL_TIME_OUT, TimeUnit.SECONDS)); + // 带编码 + ch.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); + ch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); + +// // ChannelOutboundHandler,依照逆序执行 +// ch.pipeline().addLast("encoder", new StringEncoder()); +// +// // 属于ChannelInboundHandler,依照顺序执行 +// ch.pipeline().addLast("decoder", new StringDecoder()); + + //自定义ChannelInboundHandlerAdapter + ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter()); + + } + +} diff --git a/src/main/java/com/bonus/aqd/tcpservice/BootNettyServer.java b/src/main/java/com/bonus/aqd/tcpservice/BootNettyServer.java new file mode 100644 index 0000000..c95cb40 --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/BootNettyServer.java @@ -0,0 +1,95 @@ +package com.bonus.aqd.tcpservice; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +/** + * + * @author 黑子 + */ +public class BootNettyServer { + + public void bind(int port) throws Exception { + + /** + * 配置服务端的 NIO线程组 + * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组 + * * + * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写, + * * + * bossGroup接收到连接后就会把连接信息注册到workerGroup + * * + * workerGroup的 EventLoopGroup默认的线程数是 CPU核数的二倍 + */ + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + /** + * 1.ServerBootstrap 是一个启动NIO服务的辅助启动类 + */ + ServerBootstrap sb = new ServerBootstrap(); + /** + * 2.设置group,将bossGroup, workerGroup线程组传递到 ServerBootstrap + */ + sb = sb.group(bossGroup, workerGroup); + /** + * 3.ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接, + * * 这里告诉Channel通过NioServerSocketChannel获取新的连接 + */ + sb = sb.channel(NioServerSocketChannel.class); + + // option是设置 bossGroup,childOption是设置workerGroup + + /** + * 4.服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 + * * (队列被接收后,拒绝的客户端下次连接上来只要队列有空余就能连上) + */ + sb = sb.option(ChannelOption.SO_BACKLOG, 128); + /** + * 5.立即发送数据,默认值为Ture(Netty默认为True而操作系统默认为False)。 + * 该值设置Nagle算法的启用,改算法将小的碎片数据连接成更大的报文来最小化所发送的报文的数量, + * * 如果需要发送一些较小的报文,则需要禁用该算法。 + * Netty默认禁用该算法,从而最小化报文传输延时。 + * 不延迟,就是立刻发送* + */ + sb = sb.childOption(ChannelOption.TCP_NODELAY, true); + /** + * 6.连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。 + * 可以将此功能视为TCP的心跳机制,默认的心跳间隔是7200s即2小时, Netty默认关闭该功能。 + */ + sb = sb.childOption(ChannelOption.SO_KEEPALIVE, true); + + /** + * 7.设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息 + */ + sb = sb.childHandler(new BootNettyChannelInitializer()); + + /** + * 8.绑定端口,同步等待成功 + */ + ChannelFuture f = sb.bind(port).sync(); + if (f.isSuccess()) { + System.out.println("netty server start success!"); + /** + * 9.等待服务器监听端口关闭 + */ + f.channel().closeFuture().sync(); + } + } catch (InterruptedException e) { + System.out.println(e.toString()); + } finally { + /** + * 退出,释放线程池资源 + */ + bossGroup.shutdownGracefully().sync(); + workerGroup.shutdownGracefully().sync(); + } + + } +} diff --git a/src/main/java/com/bonus/aqd/tcpservice/NettyServerHandler.java b/src/main/java/com/bonus/aqd/tcpservice/NettyServerHandler.java new file mode 100644 index 0000000..54c29a2 --- /dev/null +++ b/src/main/java/com/bonus/aqd/tcpservice/NettyServerHandler.java @@ -0,0 +1,38 @@ +package com.bonus.aqd.tcpservice; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author 黑子 + */ +@Slf4j +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + /** + * 客户端连接会触发 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("Channel active......"); + } + + /** + * 客户端发消息会触发 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("服务器收到消息: {}", msg.toString()); + ctx.write("我是服务端,我收到你的消息了!"); + ctx.flush(); + } + + /** + * 发生异常触发 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3246123..ce9adb7 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,9 +8,9 @@ spring: # 配置数据源 datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/sz_aqd?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false - username: root - password: ccw1998@yyt1999 + url: jdbc:mysql://127.0.0.1:3306/aqdsb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false + username: mroot + password: bonus@admin123 type: com.alibaba.druid.pool.DruidDataSource redis: host: 127.0.0.1 @@ -36,3 +36,6 @@ zhly: aq: enable: false +netty: + port: 6665 +