欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > netty中黏包,半包

netty中黏包,半包

2025/3/19 22:26:25 来源:https://blog.csdn.net/qq_36022463/article/details/146301088  浏览:    关键词:netty中黏包,半包
什么是黏包,半包

黏包:packet stick,,,接收端一次性接收了很多条完整的消息
半包 : packet fragment ,,, 接收端一次只读到了一部分消息,不是完整的

滑动窗口: 一个tcp协议的请求,是要等服务器的ack回应的,,而滑动窗口允许在窗口内的请求不用等到ack回应,也能继续往后开新的请求发送数据,提高了数据传输效率。。滑动窗口是自适应的,不用自己设置

黏包
  • netty使用ByteBuf作为缓冲区来存储tcp传递的数据,,当数据到达netty的Channel时,会被读入ByteBuf中,,这个缓冲区的大小,会产生黏包,半包问题

  • nagle算法传输层ip层都会对数据加一个报头,,ip层的报头最少占20字节,传输层的报头也最少20字节,,就算发送一个字节的数据,也要带上这么多字节的报头,,nagle算法会尽可能多的去发送数据,,或者等攒够了一批才会发送数据,,,这会产生黏包问题

半包
  • 接收方的滑动窗口比较小,一次性接收不了那么大的数据,要等到ack回应了之后,才能接收下面的数据,,就会半包
  • MSS限制:maximum segment size, 不同的网卡,对数据包的大小是有限制的,一般的网卡传输限制是1500字节(MTU),除开tcp/ip的报文头,,还有1460字节(MSS),,如果超过了这个限制,就会把数据拆分…。。回环地址localhost没有对mss的限制
  • 接收方的ByteBuf特别小,被迫拆分
netty处理黏包半包
  • FixedLengthFrameDecoder定长消息解码器, : 凑够这个长度才会解码,,把最长消息的长度设置到这个解码器中,,其他不够长的消息,补充字节数,凑够这个固定长度,,再发送,,
    只有凑够了这个长度,,才会去解码,,,这样虽然解决了黏包,半包问题,,但是需要补充很多空余的字符,,这些字符原本是不需要的,就造成了空间浪费

  • LineBasedFrameDecoder : 以换行符作为分割标志,,\n ,\r\n, 这里面会设置一个最大长度,,如果超过这个长度,还没找到换行符就会报错

  • DelimiterBasedFrameEncoder: 自定义分割符
    上面两种都要去遍历每个字符,找是不是分割符,,效率很低

  • LengthFieldBasedFrameDecoder : 前面有一部分来约定好传输的字节长度,版本,等信息,后面根据这个长度解析对应长度的字节
    这个类的构造函数有四个参数:

    • lengthFieldOffset : 前面有多少个多余的字节,才到描述数据长度的字节
    • lengthFieldLength : 有多少个字节用来表述 数据的长度
    • lengthAdjustment: 长度调整: 距离多少字节到真实的数据,,,描述数据长度的字节 和 存数据的字节 不一定是挨在一起的
    • initialBytesToStrip: 从头剥离几个字节,,这些描述内容的信息(长度,版本)等,如果不需要可以剥离,如果设置之后就会被剥离掉

LengthFieldBasedFrameDecoder : 是用的最多的

协议的设计和解析
redis协议
   public static void main(String[] args) {// 回车符是13 ,,换行符是10final byte[] LINE = {13,10};NioEventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){/*** 连接成功之后执行* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf buf = ctx.alloc().buffer();// *3 发送命令的数组元素有几个buf.writeBytes("*3".getBytes());// 回车换行buf.writeBytes(LINE);// 每个命令的键的长度buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);// 四个字节buf.writeBytes("$4".getBytes());buf.writeBytes(LINE);buf.writeBytes("name".getBytes());buf.writeBytes(LINE);buf.writeBytes("$8".getBytes());buf.writeBytes(LINE);buf.writeBytes("waterkid".getBytes());buf.writeBytes(LINE);// 发送数据到redisctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// netty接收redis返回的数据,变成ByteBufByteBuf byteBuf = (ByteBuf) msg;System.out.println(byteBuf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379);
//        channelFuture.sync();
//        channelFuture.channel().closeFuture()}
netty处理http协议

netty中提供了很多现成的协议,,比如redis,http等,可以直接调用处理

  public static void main(String[] args) throws InterruptedException {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(bossGroup,workerGroup);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());// codec : 既能解码,,也能编码// 经过这个编解码器处理之后,会返回两种类型的对象// HttpRequest: 包含请求行和请求头// HttpContent: 包含请求体    ===》 get请求也会被解析成两个ch.pipeline().addLast(new HttpServerCodec());//                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
//                    @Override
//                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                        System.out.println("msg.getClass() = " + msg.getClass());
//
//                    }
//                });// SimpleChannelInboundHandler : 只会处理指定类型的数据,,指定的类型在范型中ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {System.out.println(msg.uri());// 返回响应DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.getProtocolVersion(), HttpResponseStatus.OK);byte[] bytes = "hello".getBytes();// 告诉浏览器,返回的内容有多长,,不然他会一直转圈等待response.headers().setInt("content-length",bytes.length);response.content().writeBytes(bytes);ctx.writeAndFlush(response);}});}});ChannelFuture channelFuture = serverBootstrap.bind(8080);
//        channelFuture.channel().closeFuture().sync();}
自定义协议

可以自己定义一个协议(共同遵守的约定),,
自定义协议需要约定的内容:

  • 魔数: 用来第一时间判断是否是无效的数据包,,类似java的cafe babe
  • 版本号: 支持协议的升级
  • 序列化算法: 消息正文采用哪种序列化和反序列化方式,,常用的是json,或者是二进制对象流
  • 指令类型: 这个指令是什么类型,登录,注册,单聊,还是群聊
  • 请求序号 : 用来异步操作的,,确保请求的唯一性,服务器可以按照序号恢复顺序
  • 正文长度: 正文有多少字节

Message类是自定义消息的父类,,下面有若干的子类:
在这里插入图片描述

@Slf4j
// 将来将ByteBuf  和  这个范型之间相互转换
public class MessageCodec extends ByteToMessageCodec<Message> {/*** 魔数:  第一时间判定是否是无效的数据包 。。。* 版本号* 序列化算法* 指令类型:  是登录,注册,单聊,群聊* 请求序号 : 为了双工通信,提供异步能力* 正文长度* 消息正文*/@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {// 编码 ,, 将message 编码成  ByteBuf// 将消息写入到  Bytebuf中out.writeBytes(new byte[]{1,2,3,4}); // 魔数// 版本out.writeByte(1);// 序列化算法   0:表示jdk序列化   1:表示json序列化out.writeByte(0);// 写入一个字节的 指令类型out.writeByte(message.getMessageType());// 四个字节的 请求序号out.writeInt(message.getSequenceId());// 无意义,,对齐填充,,, 2的次幂out.writeByte(0xff);// 正文长度ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(bos);outputStream.writeObject(message);byte[] bytes = bos.toByteArray();out.writeInt(bytes.length);// 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {// 解码  ,,将ByteBuf变成  messageint magicNumber = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();// 指令类型byte messageType = in.readByte();// 消息序号int sequenceId = in.readInt();// 无意义字符in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes,0,length);if (serializerType == 0){ObjectInputStream inputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) inputStream.readObject();log.debug("{},{},{},{},{},{}",magicNumber,version,serializerType,messageType,sequenceId,length);System.out.println("message = " + message);// 解码出来的结果,,要存到参数里面去,,不然后面的handler无法拿到解码后的结果out.add(message);}}
}

测试:

   public static void main(String[] args) throws Exception {EmbeddedChannel channel = new EmbeddedChannel(// 解决半包,,半包读取会有问题new LengthFieldBasedFrameDecoder(1024,12,4,0,0),new LoggingHandler(),new MessageCodec());LoginRequestMessage loginRequestMessage = new LoginRequestMessage();loginRequestMessage.setUsername("waterkid");loginRequestMessage.setPassword("123");channel.writeOutbound(loginRequestMessage);ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();new MessageCodec().encode(null,loginRequestMessage,buf);channel.writeInbound(buf);  // writeInbound()  写出数据会将 ByteBuf 的引用计数变成0 ,,会被释放掉,,因为ByteBuf是堆外内存,是零拷贝,,如果共用一个内存,就会有问题// 需要 retain 将引用计数+1}
名词

流式协议: stream oriented protocol : 消息之间是没有边界的,需要自己去拆解

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词