侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

初识Netty原理 (二)——ByteBuf缓冲区

2022-07-02 星期六 / 0 评论 / 0 点赞 / 38 阅读 / 22688 字

上文已经了解到了Netty中的启动器、反应器、通道、处理器、流水线,下面来了解一下Netty中较为重要的ByteBuf缓冲区。ByteBuf原理优势Pooling池化,减少了内存复制和GC,提升了效率

上文已经了解到了Netty中的启动器、反应器、通道、处理器、流水线,下面来了解一下Netty中较为重要的ByteBuf缓冲区。

ByteBuf原理

优势

  • Pooling池化,减少了内存复制和GC,提升了效率。
  • 读写分开存储,索引也分开了,不需要切换读写模式。
  • 方法可链式调用,引入了引用计数法,方便了池化与内存回收。

重要属性

  • readerIndex(读指针):读取的起始位置,每读取一个字节,就加1,当它等于writerIndex时,说明已经读完了。
  • writerIndex(写指针):写入的起始位置,每写入一个字节,就加1,当它等于capacity()时,说明当前容量满了。此时可扩容,如果不能继续扩容,则不能写了。
  • maxCapacity(最大容量):可以扩容的最大容量,当前容量等于这个值时,说明不能再扩容了。

引用计数

Netty采用“计数器”来追踪ByteBuf的生命周期,主要是用于对池化的支持。(池化就是当ByteBuf的引用为0时,就会放到对象缓存池中,当需要用缓冲区时,可以直接从这个池里面取出来用,而不用重新创建一个了)。通过源码可以发现ByteBuf实现了一个类ReferenceCounted。这个类就是用于引用计数的。

当创建完ByteBuf时,引用数为1, 通过refCnt()方法可以获取当前缓冲区的引用数,调用retain()方法可以使引用数加1,调用release()方法可以使引用数减1,当引用数为0时,缓冲区就会被完全释放。如果池化了就放到缓冲池中。如果没池化就分两种情况,如果是分配在堆内存上的,就通过JVM的垃圾回收机制把它回收,如果分配在堆外直接内存上,就通过本地方法来释放堆外内存。在Handler处理器中,Netty会自动给流水线在最后加一个处理器用来调用release()去释放缓冲区,如果要在中间中断流水线,则需要自己调用release()释放缓冲区。

Allocator分配器

Netty提供了ByteAllocator的两种实现:PoolByteAllocator(池化)和UnpooledByteAllocator(未池化)。Netty默认使用的是PoolByteAllocator,默认使用的内存是堆外直接内存(写入速度比堆内存更快,池化分配器配合堆外直接内存,可将堆外缓冲区复用(弥补了堆外分配和释放空间的代价较高的缺点),从来大大提升了性能)。

浅层复制

浅层复制有两个方法,切片浅层复制和整体浅层复制

  • slice切片浅层复制 :切片只复制了原缓冲区的可读部分,不会复制底层数组(引用同一个),也不会增加引用数。
  • duplicate整体浅层复制 :这个是将整体都复制了,可读可写,但是引用还是一样的(同slice)。

ByteBuf使用

来写个小例子,分析一下。这里的Logger使用的是自己封装的静态日志类。分析堆栈信息封装一个SLF4J的静态类

public class testBuffer {	public static void main(String[] args) {		// 默认使用池化缓冲区,分配的是堆外直接内存		ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(9, 100);		// 使用堆内存来分配缓冲区内存		ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(9, 100);		// 写入一个字节数组		buf.writeBytes(new byte[] { 1, 2, 3, 4 });		Logger.info("引用次数--[{}]", buf.refCnt());		// 依次读出来		int i = 0;		while (buf.isReadable() && i < buf.readableBytes()) {			// 读字节,不改变指针(读完后,readIndex还是0)			Logger.info("取一个字节--[{}]", buf.getByte(i++));		}		Logger.info("buf是否使用的堆内存--[{}]", buf.hasArray());		Logger.info("buf1是否使用的堆内存--[{}]", buf1.hasArray());		int len = buf.readableBytes();		byte[] array = new byte[len];		// 把数据读取到堆内存中		buf.getBytes(buf.readerIndex(), array);		Logger.info("buf读出的数据--[{}]", array);		// 与原缓冲区buf的底层引用一样		ByteBuf slice = buf.slice();        // 增加一次浅层复制的引用		slice.retain();        // 减少一次原缓冲区的引用		buf.release();        // 会发现两个引用是同一个		Logger.info("引用次数--[{}]", buf.refCnt());		Logger.info("切片结果--{}", slice);		Logger.info("引用次数--[{}]", slice.refCnt());	}}

运行结果:

21:02:59.693 [main] INFO byteBuf.test1 - 引用次数--[1]21:02:59.695 [main] INFO byteBuf.test1 - 取一个字节--[1]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[2]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[3]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[4]21:02:59.696 [main] INFO byteBuf.test1 - buf是否使用的堆内存--[false]21:02:59.696 [main] INFO byteBuf.test1 - buf1是否使用的堆内存--[true]21:02:59.696 [main] INFO byteBuf.test1 - buf读出的数据--[[1, 2, 3, 4]]21:02:59.698 [main] INFO byteBuf.test1 - 引用次数--[1]21:02:59.698 [main] INFO byteBuf.test1 - 切片结果--UnpooledSlicedByteBuf(ridx: 0, widx: 4, cap: 4/4, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100))21:02:59.698 [main] INFO byteBuf.test1 - 引用次数--[1]

回显服务器实战

回显服务器的服务器端如下。这里的业务处理器使用了单例模式创建,因为这里的业务处理是多线程安全的,可以在多个通道间共享使用,所以使用单例模式让多个通道共享同一个实例,从而减少实例的创建,从而减少内存空间的浪费。
@ChannelHandler.Sharable这个注解是Netty中的注解,它是用于标注一个Handler实例可以被多个通道安全地共享,如果不加这个注解,直接共享的话将会抛出异常。

public class NettyEchoServer {    private final static Logger log = LoggerFactory.getLogger(NettyEchoServer.class);    private final int port;    private ServerBootstrap serverBootstrap = new ServerBootstrap();    private NettyEchoServer(int port){        this.port = port;    }    private void runServer(){        // 创建父通道反应器线程组        EventLoopGroup boss = new NioEventLoopGroup(1);        // 创建子通道反应器线程组        EventLoopGroup workers = new NioEventLoopGroup();        try {            serverBootstrap.group(boss, workers)                    .channel(NioServerSocketChannel.class)                    .localAddress("127.0.0.1", port)                    .option(ChannelOption.SO_KEEPALIVE, true)                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) {                            ch.pipeline().addLast(NettyEchoServerHandler.getInstance());                        }                    });            ChannelFuture future = serverBootstrap.bind();            future.addListener((channelFuture) -> {                if (channelFuture.isSuccess()){                    log.info("服务器启动成功,监听地址: [{}]", future.channel().localAddress());                }else {                    log.info("服务器启动失败");                }            });            // 阻塞直到启动成功            future.sync();            ChannelFuture close = future.channel().closeFuture();            // 阻塞直到服务器关闭            close.sync();        }catch (Exception e){            e.printStackTrace();        }finally {            boss.shutdownGracefully();            workers.shutdownGracefully();        }    }    /**     * 使用单例模式实现处理器,使得多个通道可以使用同一个处理器实例     * ChannelHandler.Sharable这个注解表示处理器可以共享     */    @ChannelHandler.Sharable    static class NettyEchoServerHandler extends ChannelInboundHandlerAdapter{        // 使用volatile修饰变量,保证instance在多线程下的可见性        volatile static NettyEchoServerHandler instance;        // 双重检查锁实现单例模式        static NettyEchoServerHandler getInstance(){            if (instance == null){                synchronized (NettyEchoServerHandler.class){                    if (instance == null){                        instance = new NettyEchoServerHandler();                    }                }            }            return instance;        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) {            ByteBuf buf = (ByteBuf)msg;            log.info("msg type: [{}]" ,(buf.hasArray() ? "堆内存" : "直接内存"));            int len = buf.readableBytes();            // 用取得的字节的长度来初始化字节数组            byte[] array = new byte[len];            // 将堆外直接内存上的数据读到堆内存上的字节数组中            buf.getBytes(buf.readerIndex(), array);            log.info("server received:[{}]", new String(array, StandardCharsets.UTF_8));            log.info("写回前的引用计数:[{}]", buf.refCnt());            // 写回数据,异步任务(写完后会释放引用)            ChannelFuture cl = ctx.writeAndFlush(msg);            // I/O操作完成后的操作            cl.addListener((ChannelFuture future) -> log.info("写回后的引用计数:[{}]", ((ByteBuf) msg).refCnt()));        }    }    public static void main(String[] args){        new NettyEchoServer(66).runServer();    }}

回显服务器的客户端:

public class NettyEchoClient {    private final static Logger log = LoggerFactory.getLogger(NettyEchoClient.class);    private int serverPort;    private String serverIp;    private Bootstrap bootstrap = new Bootstrap();    private NettyEchoClient(String ip, int port){        serverIp = ip;        serverPort = port;    }    private void runClient(){        // 创建反应器线程组        EventLoopGroup worker = new NioEventLoopGroup();        try{            bootstrap.group(worker)                    .channel(NioSocketChannel.class)                    .remoteAddress(serverIp, serverPort)                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) {                            ch.pipeline().addLast(NettyEchoClientHandler.getInstance());                        }                    });            ChannelFuture future = bootstrap.connect();            // 客户端连接的异步通知            future.addListener((channelFuture) -> {               if (channelFuture.isSuccess()){                   log.info("客户端连接成功");               }else {                   log.info("客户端连接失败");               }            });            // 阻塞直到连接成功            future.sync();            Channel channel = future.channel();            Scanner scanner = new Scanner(System.in);            System.out.println("请输入要发送的内容: ");            while (scanner.hasNext()){                // 获取输入的内容                String text = scanner.next();                byte[] bytes = text.getBytes(StandardCharsets.UTF_8);                // 分配一个直接内存的缓冲区                ByteBuf buf = channel.alloc().buffer();                // 将字节数组写入缓冲区                buf.writeBytes(bytes);                // 将缓冲区的数据写入到通道中并刷新通道                channel.writeAndFlush(buf);                System.out.println("请输入要发送的内容: ");            }        }catch (Exception e){            e.printStackTrace();        }finally {            worker.shutdownGracefully();        }    }    @ChannelHandler.Sharable    private static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter{        static volatile NettyEchoClientHandler instance;        static NettyEchoClientHandler getInstance(){            if (instance == null){                synchronized (NettyEchoClientHandler.class){                    if (instance == null){                        instance = new NettyEchoClientHandler();                    }                }            }            return instance;        }        @Override        public void channelRead(ChannelHandlerContext ctx, Object msg) {            ByteBuf buf = (ByteBuf)msg;            int len = buf.readableBytes();            byte[] array = new byte[len];            // 将数据读到字节数组中,这中读不会改变读指针            buf.getBytes(buf.readerIndex(), array);            log.info("客户端回显的数据:[{}]", new String(array, StandardCharsets.UTF_8));            // 手动释放引用            buf.release();        }    }    public static void main(String[] args){        new NettyEchoClient("127.0.0.1", 66).runClient();    }}

运行结果:
客户端:

15:12:49.677 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoClient - 客户端连接成功请输入要发送的内容: 哈哈哈,服务器你好15:12:59.821 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoClient - 客户端回显的数据:[哈哈哈,服务器你好]

服务器端:

15:12:40.747 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoServer - 服务器启动成功,监听地址: [/127.0.0.1:66]15:12:59.816 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - msg type: [直接内存]15:12:59.818 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - server received:[哈哈哈,服务器你好]15:12:59.818 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - 写回前的引用计数:[1]15:12:59.821 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - 写回后的引用计数:[0]
.
.

广告 广告

评论区