上文已经了解到了Netty中的启动器、反应器、通道、处理器、流水线,下面来了解一下Netty中较为重要的ByteBuf缓冲区。ByteBuf原理优势Pooling池化,减少了内存复制和GC,提升了效率
上文已经了解到了Netty中的启动器、反应器、通道、处理器、流水线,下面来了解一下Netty中较为重要的ByteBuf缓冲区。
ByteBuf原理
优势
- Pooling池化,减少了内存复制和GC,提升了效率。
- 读写分开存储,索引也分开了,不需要切换读写模式。
- 方法可链式调用,引入了引用计数法,方便了池化与内存回收。
重要属性
- readerIndex(读指针):读取的起始位置,每读取一个字节,就加1,当它等于writerIndex时,说明已经读完了。
- writerIndex(写指针):写入的起始位置,每写入一个字节,就加1,当它等于capacity()时,说明当前容量满了。此时可扩容,如果不能继续扩容,则不能写了。
- maxCapacity(最大容量):可以扩容的最大容量,当前容量等于这个值时,说明不能再扩容了。
引用计数
Netty采用“计数器”来追踪ByteBuf的生命周期,主要是用于对池化的支持。(池化就是当ByteBuf的引用为0时,就会放到对象缓存池中,当需要用缓冲区时,可以直接从这个池里面取出来用,而不用重新创建一个了)。通过源码可以发现ByteBuf实现了一个类ReferenceCounted。这个类就是用于引用计数的。
当创建完ByteBuf时,引用数为1, 通过refCnt()方法可以获取当前缓冲区的引用数,调用retain()方法可以使引用数加1,调用release()方法可以使引用数减1,当引用数为0时,缓冲区就会被完全释放。如果池化了就放到缓冲池中。如果没池化就分两种情况,如果是分配在堆内存上的,就通过JVM的垃圾回收机制把它回收,如果分配在堆外直接内存上,就通过本地方法来释放堆外内存。在Handler处理器中,Netty会自动给流水线在最后加一个处理器用来调用release()去释放缓冲区,如果要在中间中断流水线,则需要自己调用release()释放缓冲区。
Allocator分配器
Netty提供了ByteAllocator的两种实现:PoolByteAllocator(池化)和UnpooledByteAllocator(未池化)。Netty默认使用的是PoolByteAllocator,默认使用的内存是堆外直接内存(写入速度比堆内存更快,池化分配器配合堆外直接内存,可将堆外缓冲区复用(弥补了堆外分配和释放空间的代价较高的缺点),从来大大提升了性能)。
浅层复制
浅层复制有两个方法,切片浅层复制和整体浅层复制
- slice切片浅层复制 :切片只复制了原缓冲区的可读部分,不会复制底层数组(引用同一个),也不会增加引用数。
- duplicate整体浅层复制 :这个是将整体都复制了,可读可写,但是引用还是一样的(同slice)。
ByteBuf使用
来写个小例子,分析一下。这里的Logger使用的是自己封装的静态日志类。分析堆栈信息封装一个SLF4J的静态类
public class testBuffer { public static void main(String[] args) { // 默认使用池化缓冲区,分配的是堆外直接内存 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(9, 100); // 使用堆内存来分配缓冲区内存 ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer(9, 100); // 写入一个字节数组 buf.writeBytes(new byte[] { 1, 2, 3, 4 }); Logger.info("引用次数--[{}]", buf.refCnt()); // 依次读出来 int i = 0; while (buf.isReadable() && i < buf.readableBytes()) { // 读字节,不改变指针(读完后,readIndex还是0) Logger.info("取一个字节--[{}]", buf.getByte(i++)); } Logger.info("buf是否使用的堆内存--[{}]", buf.hasArray()); Logger.info("buf1是否使用的堆内存--[{}]", buf1.hasArray()); int len = buf.readableBytes(); byte[] array = new byte[len]; // 把数据读取到堆内存中 buf.getBytes(buf.readerIndex(), array); Logger.info("buf读出的数据--[{}]", array); // 与原缓冲区buf的底层引用一样 ByteBuf slice = buf.slice(); // 增加一次浅层复制的引用 slice.retain(); // 减少一次原缓冲区的引用 buf.release(); // 会发现两个引用是同一个 Logger.info("引用次数--[{}]", buf.refCnt()); Logger.info("切片结果--{}", slice); Logger.info("引用次数--[{}]", slice.refCnt()); }}
运行结果:
21:02:59.693 [main] INFO byteBuf.test1 - 引用次数--[1]21:02:59.695 [main] INFO byteBuf.test1 - 取一个字节--[1]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[2]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[3]21:02:59.696 [main] INFO byteBuf.test1 - 取一个字节--[4]21:02:59.696 [main] INFO byteBuf.test1 - buf是否使用的堆内存--[false]21:02:59.696 [main] INFO byteBuf.test1 - buf1是否使用的堆内存--[true]21:02:59.696 [main] INFO byteBuf.test1 - buf读出的数据--[[1, 2, 3, 4]]21:02:59.698 [main] INFO byteBuf.test1 - 引用次数--[1]21:02:59.698 [main] INFO byteBuf.test1 - 切片结果--UnpooledSlicedByteBuf(ridx: 0, widx: 4, cap: 4/4, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 9/100))21:02:59.698 [main] INFO byteBuf.test1 - 引用次数--[1]
回显服务器实战
回显服务器的服务器端如下。这里的业务处理器使用了单例模式创建,因为这里的业务处理是多线程安全的,可以在多个通道间共享使用,所以使用单例模式让多个通道共享同一个实例,从而减少实例的创建,从而减少内存空间的浪费。
@ChannelHandler.Sharable这个注解是Netty中的注解,它是用于标注一个Handler实例可以被多个通道安全地共享,如果不加这个注解,直接共享的话将会抛出异常。
public class NettyEchoServer { private final static Logger log = LoggerFactory.getLogger(NettyEchoServer.class); private final int port; private ServerBootstrap serverBootstrap = new ServerBootstrap(); private NettyEchoServer(int port){ this.port = port; } private void runServer(){ // 创建父通道反应器线程组 EventLoopGroup boss = new NioEventLoopGroup(1); // 创建子通道反应器线程组 EventLoopGroup workers = new NioEventLoopGroup(); try { serverBootstrap.group(boss, workers) .channel(NioServerSocketChannel.class) .localAddress("127.0.0.1", port) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(NettyEchoServerHandler.getInstance()); } }); ChannelFuture future = serverBootstrap.bind(); future.addListener((channelFuture) -> { if (channelFuture.isSuccess()){ log.info("服务器启动成功,监听地址: [{}]", future.channel().localAddress()); }else { log.info("服务器启动失败"); } }); // 阻塞直到启动成功 future.sync(); ChannelFuture close = future.channel().closeFuture(); // 阻塞直到服务器关闭 close.sync(); }catch (Exception e){ e.printStackTrace(); }finally { boss.shutdownGracefully(); workers.shutdownGracefully(); } } /** * 使用单例模式实现处理器,使得多个通道可以使用同一个处理器实例 * ChannelHandler.Sharable这个注解表示处理器可以共享 */ @ChannelHandler.Sharable static class NettyEchoServerHandler extends ChannelInboundHandlerAdapter{ // 使用volatile修饰变量,保证instance在多线程下的可见性 volatile static NettyEchoServerHandler instance; // 双重检查锁实现单例模式 static NettyEchoServerHandler getInstance(){ if (instance == null){ synchronized (NettyEchoServerHandler.class){ if (instance == null){ instance = new NettyEchoServerHandler(); } } } return instance; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf)msg; log.info("msg type: [{}]" ,(buf.hasArray() ? "堆内存" : "直接内存")); int len = buf.readableBytes(); // 用取得的字节的长度来初始化字节数组 byte[] array = new byte[len]; // 将堆外直接内存上的数据读到堆内存上的字节数组中 buf.getBytes(buf.readerIndex(), array); log.info("server received:[{}]", new String(array, StandardCharsets.UTF_8)); log.info("写回前的引用计数:[{}]", buf.refCnt()); // 写回数据,异步任务(写完后会释放引用) ChannelFuture cl = ctx.writeAndFlush(msg); // I/O操作完成后的操作 cl.addListener((ChannelFuture future) -> log.info("写回后的引用计数:[{}]", ((ByteBuf) msg).refCnt())); } } public static void main(String[] args){ new NettyEchoServer(66).runServer(); }}
回显服务器的客户端:
public class NettyEchoClient { private final static Logger log = LoggerFactory.getLogger(NettyEchoClient.class); private int serverPort; private String serverIp; private Bootstrap bootstrap = new Bootstrap(); private NettyEchoClient(String ip, int port){ serverIp = ip; serverPort = port; } private void runClient(){ // 创建反应器线程组 EventLoopGroup worker = new NioEventLoopGroup(); try{ bootstrap.group(worker) .channel(NioSocketChannel.class) .remoteAddress(serverIp, serverPort) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(NettyEchoClientHandler.getInstance()); } }); ChannelFuture future = bootstrap.connect(); // 客户端连接的异步通知 future.addListener((channelFuture) -> { if (channelFuture.isSuccess()){ log.info("客户端连接成功"); }else { log.info("客户端连接失败"); } }); // 阻塞直到连接成功 future.sync(); Channel channel = future.channel(); Scanner scanner = new Scanner(System.in); System.out.println("请输入要发送的内容: "); while (scanner.hasNext()){ // 获取输入的内容 String text = scanner.next(); byte[] bytes = text.getBytes(StandardCharsets.UTF_8); // 分配一个直接内存的缓冲区 ByteBuf buf = channel.alloc().buffer(); // 将字节数组写入缓冲区 buf.writeBytes(bytes); // 将缓冲区的数据写入到通道中并刷新通道 channel.writeAndFlush(buf); System.out.println("请输入要发送的内容: "); } }catch (Exception e){ e.printStackTrace(); }finally { worker.shutdownGracefully(); } } @ChannelHandler.Sharable private static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter{ static volatile NettyEchoClientHandler instance; static NettyEchoClientHandler getInstance(){ if (instance == null){ synchronized (NettyEchoClientHandler.class){ if (instance == null){ instance = new NettyEchoClientHandler(); } } } return instance; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf)msg; int len = buf.readableBytes(); byte[] array = new byte[len]; // 将数据读到字节数组中,这中读不会改变读指针 buf.getBytes(buf.readerIndex(), array); log.info("客户端回显的数据:[{}]", new String(array, StandardCharsets.UTF_8)); // 手动释放引用 buf.release(); } } public static void main(String[] args){ new NettyEchoClient("127.0.0.1", 66).runClient(); }}
运行结果:
客户端:
15:12:49.677 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoClient - 客户端连接成功请输入要发送的内容: 哈哈哈,服务器你好15:12:59.821 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoClient - 客户端回显的数据:[哈哈哈,服务器你好]
服务器端:
15:12:40.747 [nioEventLoopGroup-2-1] INFO echoserver.NettyEchoServer - 服务器启动成功,监听地址: [/127.0.0.1:66]15:12:59.816 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - msg type: [直接内存]15:12:59.818 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - server received:[哈哈哈,服务器你好]15:12:59.818 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - 写回前的引用计数:[1]15:12:59.821 [nioEventLoopGroup-3-1] INFO echoserver.NettyEchoServer - 写回后的引用计数:[0]
. 
- 0