Netty进阶
粘包现象
案例
服务端代码
public static void main(String[] args) {NioEventLoopGroup bossGroup=new NioEventLoopGroup(1);NioEventLoopGroup workerGroup=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(bossGroup,workerGroup);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server erro {}",e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
客户端代码:希望发送10个消息,每个消息都是16个字节
public static void main(String[] args) {NioEventLoopGroup workerGroup=new NioEventLoopGroup(2);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(workerGroup);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//会在channel连接建立成功之后会触发channelactive事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {for (int i = 0; i < 10; i++) {ByteBuf buffer = ctx.alloc().buffer(16);buffer.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0,'a','b','c','d','e','f'});ctx.writeAndFlush(buffer);}}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1",8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {workerGroup.shutdownGracefully();}
}
服务端输出:一次性接受了160个字节,而非10次接收
09:54:51 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xc88b198b, L:/127.0.0.1:8080 - R:/127.0.0.1:55393] REGISTERED
09:54:51 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xc88b198b, L:/127.0.0.1:8080 - R:/127.0.0.1:55393] ACTIVE
09:54:51 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xc88b198b, L:/127.0.0.1:8080 - R:/127.0.0.1:55393] READ: 160B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000010| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000020| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000030| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000040| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000050| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000060| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000070| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000080| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000090| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
+--------+-------------------------------------------------+----------------+
09:54:51 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xc88b198b, L:/127.0.0.1:8080 - R:/127.0.0.1:55393] READ COMPLETE
半包现象
服务器
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) :影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
public static void main(String[] args) {NioEventLoopGroup bossGroup=new NioEventLoopGroup(1);NioEventLoopGroup workerGroup=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap = new ServerBootstrap();//调整系统底层接收缓冲区(即滑动窗口)大小serverBootstrap.option(ChannelOption.SO_RCVBUF,10);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(bossGroup,workerGroup);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server erro {}",e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
客户端:希望发送 1 个消息,这个消息是 160 字节
public static void main(String[] args) {NioEventLoopGroup workerGroup=new NioEventLoopGroup(2);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(workerGroup);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();for (int i = 0; i < 10; i++) {buffer.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,0,'a','b','c','d','e','f'});}ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1",8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {workerGroup.shutdownGracefully();}
}
现象:接收的消息被分为两节,第一次 20 字节,第二次 140 字节
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] REGISTERED
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] ACTIVE
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] READ: 20B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 00 61 62 63 64 65 66 |..........abcdef|
|00000010| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] READ COMPLETE
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] READ: 140B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000010| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000020| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000030| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000040| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000050| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000060| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000070| 05 06 07 08 09 00 61 62 63 64 65 66 01 02 03 04 |......abcdef....|
|00000080| 05 06 07 08 09 00 61 62 63 64 65 66 |......abcdef |
+--------+-------------------------------------------------+----------------+
10:06:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xba1ffac6, L:/127.0.0.1:8080 - R:/127.0.0.1:55530] READ COMPLETE
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
粘包和半包现象分析
Nagle算法详解
MSS 限制:半包
链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如
以太网的 MTU 是 1500
FDDI(光纤分布式数据接口)的 MTU 是 4352
本地回环地址的 MTU 是 65535 - 本地测试不走网卡
MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS
解决方案
短链接解决半包
发一次消息建立一次连接,这样连接建立到连接断开之间就是消息的边界
缺点:效率太低
缺点:还会存在半包问题
客户端
public static void main(String[] args) {for (int i = 0; i < 10; i++) { //建立10次连接发送10次数据connect();}
}
private static void connect() {NioEventLoopGroup group=new NioEventLoopGroup(2);try {Bootstrap bootstrap=new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {log.debug("连接成功!");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18});ctx.writeAndFlush(buffer);//发送完关闭连接ctx.close();}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {group.shutdownGracefully();}
}
服务端
public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error {}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
现象
13:24:21 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0x045cf95e, L:/127.0.0.1:8080 - R:/127.0.0.1:57708] READ: 18B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 10 |................|
|00000010| 11 12 |.. |
+--------+-------------------------------------------------+----------------+
** 存在半包问题**
接收方的缓冲区大小有限的
服务端:改变接受缓冲区的大小
public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap=new ServerBootstrap();//调整系统底层接收缓冲区(即滑动窗口)大小//serverBootstrap.option(ChannelOption.SO_RCVBUF,10);//调整netty接收缓冲区的大小:最小取16serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error {}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
现象以下内容输出10次,出现半包问题
13:25:58 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0x816efb49, L:/127.0.0.1:8080 - R:/127.0.0.1:57782] READ: 16B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 10 |................|
+--------+-------------------------------------------------+----------------+
13:25:58 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0x816efb49, L:/127.0.0.1:8080 - R:/127.0.0.1:57782] READ: 2B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 11 12 |.. |
+--------+-------------------------------------------------+----------------+
协商固定长度
固定长度:应该考虑业务场景中可能出现的最长消息
缺点:数据包的大小不好把握
长度定得太大:浪费空间
长度定得太小:对某些数据包又显得不够
客户端:一次性发送8条信息,每条信息长度固定为10。长度不够的信息用’-'分隔符补位
客户端什么时候 flush 都可以
public static void main(String[] args) {NioEventLoopGroup group=new NioEventLoopGroup(2);try {Bootstrap bootstrap=new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {log.debug("连接成功!");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();char c='a';for (int i = 0; i < 8; i++) {buffer.writeBytes(fillBuf(c++, (int) (Math.random()*10)));}//一次性发送8条信息ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {group.shutdownGracefully();}
}
//返回长度10的ByteBuf:其中c字符length个,不足的补'-'
private static ByteBuf fillBuf(char c,int time){byte[] buf=new byte[10];if(time >10){for (int i = 0; i < 10; i++) {buf[i]= (byte) c;}}else{for (int i = 0; i < time; i++) {buf[i]= (byte) c;}for (int i = 9; i >= time; i--) {buf[i]= '-';}}return ByteBufAllocator.DEFAULT.buffer().writeBytes(buf);
}
服务端:每次只取通道中的10个字节
public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEvntLoopGroup(2);try {ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//每次固定接收10个字节ch.pipeline().addLast(new FixedLengthFrameDecoder(10));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error {}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
输出:每次只取通道中的10个字节
14:13:59 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0xf3308d0c, L:/127.0.0.1:8080 - R:/127.0.0.1:58830] READ: 10B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 2d 2d 2d 2d 2d 2d 2d 2d 2d |a--------- |
+--------+-------------------------------------------------+----------------+
14:13:59 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0xf3308d0c, L:/127.0.0.1:8080 - R:/127.0.0.1:58830] READ: 10B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62 2d |bbbbbbbbb- |
+--------+-------------------------------------------------+----------------+
协商分隔符
LineBasedFrameDecoder:以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
DelimiterBasedFrameDecoder:除了指定长度外,可以自定义指定ByteBuf类型的分隔符
缺点
处理字符数据比较合适。但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误。
需要转义
客户端在每条消息之后,加入 \n 分隔符
public static void main(String[] args) {NioEventLoopGroup group=new NioEventLoopGroup(2);try {Bootstrap bootstrap=new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {log.debug("连接成功!");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();char c='a';for (int i = 0; i < 3; i++) {//拼接长度 1-32 的随机信息buffer.writeBytes(prodStr(c++, (int) (Math.random()*32)).toString().getBytes());}//一次性发送8条信息ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {group.shutdownGracefully();}
}
//返回由 c 组成的长度为 length 的信息
private static StringBuilder prodStr(char c,int length){StringBuilder sb=new StringBuilder();for (int i = 0; i < length; i++) {sb.append(c);}sb.append("\n");return sb;
}
服务端使用 \n 解码器 LineBasedFrameDecoder
public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error {}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
输出
客户端发送的数据包
+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 0a 62 62 62 62 62 62 62 62 62 |aaaaaa.bbbbbbbbb|
|00000010| 62 62 62 62 62 62 62 62 62 62 62 0a 63 63 63 63 |bbbbbbbbbbb.cccc|
|00000020| 63 63 63 63 63 63 63 63 63 63 63 63 63 63 0a |cccccccccccccc. |
+--------+-------------------------------------------------+----------------+
服务接收的消息
14:58:33 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x8da58f26, L:/127.0.0.1:8080 - R:/127.0.0.1:59463] READ: 6B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 |aaaaaa |
+--------+-------------------------------------------------+----------------+
14:58:33 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x8da58f26, L:/127.0.0.1:8080 - R:/127.0.0.1:59463] READ: 20B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbbbbbbb|
|00000010| 62 62 62 62 |bbbb |
+--------+-------------------------------------------------+----------------+
14:58:33 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x8da58f26, L:/127.0.0.1:8080 - R:/127.0.0.1:59463] READ: 18B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 63 63 63 63 63 63 63 63 63 63 63 |cccccccccccccccc|
|00000010| 63 63 |cc |
+--------+-------------------------------------------------+----------------+
协商预设长度
在发送消息前,先约定消息中 什么位置、多长字节 表示接下来数据的长度
LengthFieldBasedFrameDecoder:最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
每一条消息分为 head 和 body,head 中包含 body 的长度
客户端
public static void main(String[] args) {NioEventLoopGroup group=new NioEventLoopGroup(2);try {Bootstrap bootstrap=new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {log.debug("连接成功!");ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buffer = ctx.alloc().buffer();char c='a';for (int i = 0; i < 3; i++) {ByteBuf buf=ctx.alloc().buffer();byte[] bytes = prodStr(c++, (int) (Math.random() * 31)+1).toString().getBytes();//在信息头加入信息长度buf.writeInt(bytes.length);buf.writeBytes(bytes);buffer.writeBytes(buf);}//一次性发送3条信息ctx.writeAndFlush(buffer);}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8080)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client erro {}",e);} finally {group.shutdownGracefully();}
}
//返回由 c 组成的长度为 length 的信息
private static StringBuilder prodStr(char c,int length){StringBuilder sb=new StringBuilder();for (int i = 0; i < length; i++) {sb.append(c);}return sb;
}
服务端
public static void main(String[] args) {NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEventLoopGroup(2);try {ServerBootstrap serverBootstrap=new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {//在消息中存放此消息的长度,协商格式// 最大长度,长度偏移,长度占用字节,长度调整,剥离字节数ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error {}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
输出
客户端发送的数据包
+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 1c 61 61 61 61 61 61 61 61 61 61 61 61 |....aaaaaaaaaaaa|
|00000010| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000020| 00 00 00 01 62 00 00 00 03 63 63 63 |....b....ccc |
+--------+-------------------------------------------------+----------------+
服务端接收的消息
15:07:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xdd9a24d1, L:/127.0.0.1:8080 - R:/127.0.0.1:59595] READ: 32B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 1c 61 61 61 61 61 61 61 61 61 61 61 61 |....aaaaaaaaaaaa|
|00000010| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
+--------+-------------------------------------------------+----------------+
15:07:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xdd9a24d1, L:/127.0.0.1:8080 - R:/127.0.0.1:59595] READ: 5B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 62 |....b |
+--------+-------------------------------------------------+----------------+
15:07:06 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0xdd9a24d1, L:/127.0.0.1:8080 - R:/127.0.0.1:59595] READ: 7B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 03 63 63 63 |....ccc |
+--------+-------------------------------------------------+----------------+
协议设计与解析
为什么需要协议?
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
** redis 协议举例**
模仿redis客户端发送:auth 123456 + set name xiaohong
public static void main(String[] args) {Bootstrap bootstrap=new Bootstrap(); //启动器NioEventLoopGroup worker=new NioEventLoopGroup(2); //worker//回车、换行byte[] LINE = {13, 10};try {bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){//1.连接建立时向redis发送: set name xiaohong@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//"*3"表示命令组成的个数//密码验证ByteBuf buf=ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$4".getBytes());buf.writeBytes(LINE);buf.writeBytes("auth".getBytes());buf.writeBytes(LINE);buf.writeBytes("$6".getBytes());buf.writeBytes(LINE);buf.writeBytes("123456".getBytes());buf.writeBytes(LINE);//set name xiaohongbuf.writeBytes("*3".getBytes());buf.writeBytes(LINE);//"$3"表示3个字节buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$4".getBytes());buf.writeBytes(LINE);buf.writeBytes("name".getBytes());buf.writeBytes(LINE);buf.writeBytes("$8".getBytes());buf.writeBytes(LINE);buf.writeBytes("xiaohong".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}//2.接收redis响应的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf= (ByteBuf) msg;System.out.println(buf.toString(Charset.forName("UTF-8")));}});}});ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 6379)).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Client error,{}",e);} finally {worker.shutdownGracefully();}
}
客户端收到响应
+OK
+OK
redis查询数据
#发送信息前
127.0.0.1:6379> keys *
(empty list or set)#发送信息后
127.0.0.1:6379> keys *
1) "name"
127.0.0.1:6379> get name
"xiaohong"
http 协议举例
模仿服务器:接收http请求
public static void main(String[] args) {ServerBootstrap serverBootstrap=new ServerBootstrap();NioEventLoopGroup boss=new NioEventLoopGroup(1);NioEventLoopGroup worker=new NioEventLoopGroup(2);try {serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss,worker);serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());//1.添加HTTP解码器ch.pipeline().addLast(new HttpServerCodec());//2.处理http请求ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {//获取请求log.debug(msg.uri());//返回相应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes="<h1>Hello, world!</h1>".getBytes();response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.debug("Server error,{}",e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}
服务器输出:浏览器发送http请求 http://localhost:8080
//1.收到http请求:请求行+请求头
10:41:49 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] READ: 753B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:|
|00000020| 38 30 38 30 0d 0a 43 6f 6e 6e 65 63 74 69 6f 6e |8080..Connection|
|00000030| 3a 20 6b 65 65 70 2d 61 6c 69 76 65 0d 0a 43 61 |: keep-alive..Ca|
|00000040| 63 68 65 2d 43 6f 6e 74 72 6f 6c 3a 20 6d 61 78 |che-Control: max|
|00000050| 2d 61 67 65 3d 30 0d 0a 73 65 63 2d 63 68 2d 75 |-age=0..sec-ch-u|
|00000060| 61 3a 20 22 43 68 72 6f 6d 69 75 6d 22 3b 76 3d |a: "Chromium";v=|
|00000070| 22 31 30 36 22 2c 20 22 47 6f 6f 67 6c 65 20 43 |"106", "Google C|
......
+--------+-------------------------------------------------+----------------+
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] o.e.j.a.p.protocolHttp : /
//返回响应
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] WRITE: 61B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 77 6f 72 6c 64 21 3c 2f 68 31 3e |, world!</h1> |
+--------+-------------------------------------------------+----------------+//2.收到http请求:请求体
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] READ: 653B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 73 65 63 2d 63 68 2d |p-alive..sec-ch-|
|00000050| 75 61 3a 20 22 43 68 72 6f 6d 69 75 6d 22 3b 76 |ua: "Chromium";v|
|00000060| 3d 22 31 30 36 22 2c 20 22 47 6f 6f 67 6c 65 20 |="106", "Google |
|00000070| 43 68 72 6f 6d 65 22 3b 76 3d 22 31 30 36 22 2c |Chrome";v="106",|
|00000080| 20 22 4e 6f 74 3b 41 3d 42 72 61 6e 64 22 3b 76 | "Not;A=Brand";v|
......
+--------+-------------------------------------------------+----------------+
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] o.e.j.a.p.protocolHttp : /favicon.ico
//返回响应
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] WRITE: 61B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 77 6f 72 6c 64 21 3c 2f 68 31 3e |, world!</h1> |
+--------+-------------------------------------------------+----------------+
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] FLUSH
10:41:50 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler : [id: 0x783f8f12, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58325] READ COMPLETE
10:42:52 [DEBUG] [nioEventLoopGroup-3-2] i.n.h.l.LoggingHandler : [id: 0xf43e53bc, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:58326] READ COMPLETE
** 自定义协议**
编解码器
自定义
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCode extends ByteToMessageCodec<Message> {//编码:把 message 编码成 ByteBuf(netty已经准备好了,可以直接写入)@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Message msg, ByteBuf byteBuf) throws Exception {//1.魔数:4个字节byteBuf.writeBytes(new byte[]{1,2,3,4});//2.版本号:1个字节byteBuf.writeByte(1);//3.序列化算法:1个字节,0 jdk ,1 jsonbyteBuf.writeByte(0);//4.指令类型:1个字节,由消息本身决定byteBuf.writeByte(msg.getMessageType());//5.请求序号:4个字节,由消息本身自带byteBuf.writeByte(msg.getSequenceId());//序列化正文内容ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos=new ObjectOutputStream(baos);oos.writeObject(msg);byte[] bytes = baos.toString().getBytes();//6.正文长度:4个字节byteBuf.writeInt(bytes.length);//7.消息正文:byteBuf.writeBytes(bytes);}//解码:把 ByteBuf 解码出 message@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//1.魔数:4个字节int magicNum = byteBuf.readInt();//2.版本号:1个字节byte version=byteBuf.readByte();//3.序列化算法:0 jdk ,1 jsonbyte serializerType=byteBuf.readByte();//4.指令类型:由消息本身决定byte messageType=byteBuf.readByte();//5.请求序号:由消息本身自带int sequenceId=byteBuf.readInt();//6.正文长度int length=byteBuf.readInt();//7.消息正文byte[] bytes=new byte[length];byteBuf.readBytes(bytes,0,length);//转换为 Message 对象ObjectInputStream ois=new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);list.add(message);}
}
使用测试
注意:需要配合 LengthFieldBasedFrameDecoder 使用,解决半包问题
注意:使用 MessageCodeSharable 进行编解码器的共享
编码
@Test
public void test1() {LoggingHandler loggingHandler=new LoggingHandler();EmbeddedChannel embeddedChannel= new EmbeddedChannel(loggingHandler,new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new MessageCode());LoginRequestMessage message=new LoginRequestMessage("lisi","123");//出站embeddedChannel.writeOutbound(message);
}
解码
@Test
public void test2() throws Exception {LoggingHandler loggingHandler=new LoggingHandler();EmbeddedChannel embeddedChannel= new EmbeddedChannel(loggingHandler,new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new MessageCode());//模拟入站二进制数据ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();LoginRequestMessage message=new LoginRequestMessage("lisi","123");new MessageCode().encode(null,message,buffer);//入站embeddedChannel.writeInbound(buffer);
}
半包问题
解决:配合 LengthFieldBasedFrameDecoder 使用
//半包解码
@Test
public void test3() throws Exception {LoggingHandler loggingHandler=new LoggingHandler();EmbeddedChannel embeddedChannel= new EmbeddedChannel(loggingHandler,
// new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new MessageCode());ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();LoginRequestMessage message=new LoginRequestMessage("lisi","123");new MessageCode().encode(null,message,buffer);ByteBuf slice1 = buffer.slice(0, 100);buffer.retain();ByteBuf slice2 = buffer.slice(100, buffer.readableBytes()-100);//入站embeddedChannel.writeInbound(slice1);
}
共享编解码器
- 由业务需求决定
- 当 handler 不保存状态时,就可以安全地在多线程下被共享 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或
- CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
/*** 必须与 LengthFieldBasedFrameDecoder 一起使用,才不用记录状态,才可以共享*/
@Slf4j
@ChannelHandler.Sharable
public class MessageCodeSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {ByteBuf byteBuf = ctx.alloc().buffer();//1.魔数:4个字节byteBuf.writeBytes(new byte[]{1,2,3,4});//2.版本号:1个字节byteBuf.writeByte(1);//3.序列化算法:1个字节,0 jdk ,1 jsonbyteBuf.writeByte(0);//4.指令类型:1个字节,由消息本身决定byteBuf.writeByte(msg.getMessageType());//5.请求序号:4个字节,由消息本身自带byteBuf.writeInt(msg.getSequenceId());byteBuf.writeByte(-1); //填充:无意义字节//序列化正文内容ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos=new ObjectOutputStream(baos);oos.writeObject(msg);byte[] bytes = baos.toByteArray();//6.正文长度:4个字节byteBuf.writeInt(bytes.length);//7.消息正文:byteBuf.writeBytes(bytes);list.add(byteBuf);}@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//1.魔数:4个字节int magicNum = byteBuf.readInt();//2.版本号:1个字节byte version=byteBuf.readByte();//3.序列化算法:0 jdk ,1 jsonbyte serializerType=byteBuf.readByte();//4.指令类型:由消息本身决定byte messageType=byteBuf.readByte();//5.请求序号:由消息本身自带int sequenceId=byteBuf.readInt();byteBuf.readByte();//6.正文长度int length=byteBuf.readInt();//7.消息正文byte[] bytes=new byte[length];byteBuf.readBytes(bytes,0,length);//转换为 Message 对象ObjectInputStream ois=new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);list.add(message);}
}
聊天室案例
/*** 用户管理接口*/
public interface UserService {/*** 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}
/*** 用户管理接口*/
public interface UserService {/*** 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}
/*** 聊天组会话管理接口*/
public interface GroupSession {/*** 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/*** 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/*** 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/*** 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/*** 获取组成员* @param name 组名* @return 成员集合, 没有成员会返回 empty set*/Set<String> getMembers(String name);/*** 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);
}
** 聊天室业务-登录**
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if(login) {message = new LoginResponseMessage(true, "登录成功");} else {message = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(message);}});}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);AtomicBoolean LOGIN = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {// 接收响应消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("msg: {}", msg);if ((msg instanceof LoginResponseMessage)) {LoginResponseMessage response = (LoginResponseMessage) msg;if (response.isSuccess()) {// 如果登录成功LOGIN.set(true);}// 唤醒 system in 线程WAIT_FOR_LOGIN.countDown();}}// 在连接建立后触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 负责接收用户在控制台的输入,负责向服务器发送各种消息new Thread(() -> {Scanner scanner = new Scanner(System.in);System.out.println("请输入用户名:");String username = scanner.nextLine();System.out.println("请输入密码:");String password = scanner.nextLine();// 构造消息对象LoginRequestMessage message = new LoginRequestMessage(username, password);// 发送消息ctx.writeAndFlush(message);System.out.println("等待后续操作...");try {WAIT_FOR_LOGIN.await();} catch (InterruptedException e) {e.printStackTrace();}// 如果登录失败if (!LOGIN.get()) {ctx.channel().close();return;}while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = scanner.nextLine();String[] s = command.split(" ");switch (s[0]){case "send":ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));break;case "gsend":ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));break;case "gcreate":Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));set.add(username); // 加入自己ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));break;case "gmembers":ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin":ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));break;case "gquit":ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));break;case "quit":ctx.channel().close();return;}}}, "system in").start();}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
聊天室业务-单聊
服务器端将 handler 独立出来
登录 handler
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean login = UserServiceFactory.getUserService().login(username, password);LoginResponseMessage message;if(login) {SessionFactory.getSession().bind(ctx.channel(), username);message = new LoginResponseMessage(true, "登录成功");} else {message = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(message);}
}
单聊 handler
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {String to = msg.getTo();Channel channel = SessionFactory.getSession().getChannel(to);// 在线if(channel != null) {channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));}// 不在线else {ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));}}
}
聊天室业务-群聊
创建群聊
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {String groupName = msg.getGroupName();Set<String> members = msg.getMembers();// 群管理器GroupSession groupSession = GroupSessionFactory.getGroupSession();Group group = groupSession.createGroup(groupName, members);if (group == null) {// 发生成功消息ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));// 发送拉群消息List<Channel> channels = groupSession.getMembersChannel(groupName);for (Channel channel : channels) {channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));}} else {ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));}}
}
群聊
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(msg.getGroupName());for (Channel channel : channels) {channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));}}
}
加入群聊
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());if (group != null) {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));}}
}
退出群聊
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());if (group != null) {ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));} else {ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));}}
}
查看成员
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {Set<String> members = GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName());ctx.writeAndFlush(new GroupMembersResponseMessage(members));}
}
聊天室业务-退出
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {// 当连接断开时触发 inactive 事件@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {SessionFactory.getSession().unbind(ctx.channel());log.debug("{} 已经断开", ctx.channel());}// 当出现异常时触发@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {SessionFactory.getSession().unbind(ctx.channel());log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());}
}
聊天室业务-空闲检测
服务器端解决?
怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了读空闲事件if (event.state() == IdleState.READER_IDLE) {log.debug("已经 5s 没有读到数据了");ctx.channel().close();}}
});
客户端定时心跳
客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
// 用来判断是不是 读空闲时间过长,或 写空闲时间过长
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {// log.debug("3s 没有写数据了,发送一个心跳包");ctx.writeAndFlush(new PingMessage());}}
});