【深入理解 ByteBuf 之三 接口&类拆解】2. Recycler 接口设计真正的回收机制

发布时间:2024年01月15日

Recycler 回收器接口设计

本节接着 ObjectPool 的设计脉络,具体看看其具体实现 RecyclerObjectPool 中引用的 Recycler 究竟是怎么实现的

在这里插入图片描述

这一张图基本已经说明白了,我再做个总结,对细节感兴趣的可以看看我下面带源码的注释。

对于 Recycler 回收器:它实现了很多方法和类,但实际上除了持有一些静态参数配置外,它仅持有了一个 FastThreadLocal<LocalPool<T>> 那实际上就是每个线程有一个自己的 LocalPool ,真正分配也是通过调用 LocalPool 的方法。

那从设计的角度来说,在思考要设计一个对象池的时候就要考虑到多线程分配的问题,使用线程本地变量可能就是要优先考虑的问题。

然后实际上我们的对象池就是 LocalPool 对象中实例化的一个队列 Queue,这个 Queue 也比较有意思后续章节会详细讨论一下,这里只需要记住它是一个可以支持多线程入队,单线程消费的队列。

这个队列存储了我们分配出去然后空闲释放的句柄 handle,在之后的分配中会首先尝试在对象池中获取可以分配的句柄,如果没有则尝试创建池化句柄,如果没有达到可创建的间隔会直接创建非池化对象

根据解析篇 1 ObjectPool 和 本篇 Recycler ,已经大致对池的设计有了一个大概的了解

package io.netty.util;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * Light-weight object pool based on a thread-local stack.
 * 这里源码注释说这就是一个 轻量级的对象池,基于本地线程栈实现。
 以下代码我省去了其中废弃的方法
 * @param <T> the type of the pooled object
 */
public abstract class Recycler<T> {
  // 这是一个日志工具
  private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);

  /**
     *  定义一个空的 HANDLE 实现,实现 Handle 接口,实现方法不做任何事
     */
  private static final Handle<?> NOOP_HANDLE = new Handle<Object>() {
    @Override
    public void recycle(Object object) {
      // NOOP
    }

    @Override
    public String toString() {
      return "NOOP_HANDLE";
    }
  };
  // 这里定义了每个线程的初始 最大 容量是 4k
  private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
  // 默认每个线程的最大容量 默认等于上面这个东西 ↑ 4k
  private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
  // 一个比率
  private static final int RATIO;
  // 默认的每个线程的队列块的大小 默认 32
  private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;

  static {
    // In the future, we might have different maxCapacity for different object types.
    // e.g. io.netty.recycler.maxCapacity.writeTask
    //      io.netty.recycler.maxCapacity.outboundBuffer
    // 在未来的版本,我们可能会为不同的对象设置不同的 maxCapacity
    // 这里先取 maxCapacity 再尝试取 maxCapacityPerThread 优先读 maxCapacityPerThread 的设置
    int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
                                                         SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
    // 如果你的配置小于 0 ,会使用默认的 4k
    if (maxCapacityPerThread < 0) {
      maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
    }

    // 这里初始化了 默认的每个线程额最大容量,如果不配置的话就是 4k 大小
    DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
    // 获取每个线程的线程块大小,默认 32
    DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);


    // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
    // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
    // bursts.
    // 这里写的很抽象,我看了半天没搞懂,大概的感觉是 RATIO 是默认为 8 ,也就是说每 8 次的分配中有 1 次是尝试重用的,
    // 剩余 7 次都是进行扩容,也就是说,在一定时间的预热后这个池化资源池可能会逼近最大分配值。
    // 这种缓慢的增长有助于在热点场景不会那么猛烈的申请空间 (大概……缓慢吧)
    // 仅字面理解,后续代码核实后会订正 (问了 GPT3.5 和 通义千问)
    // ok 我回来了,这里应该是说这个方法,就是指每 8 次申请才生成一个新的
    //        DefaultHandle<T> newHandle() {
    //            // 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null
    //            // 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建
    //            if (++ratioCounter >= ratioInterval) {
    //                ratioCounter = 0;
    //                return new DefaultHandle<T>(this);
    //            }
    //            return null;
    //        }
    RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

    // 如果可以打 debug 日志,就打一下配置日志
    if (logger.isDebugEnabled()) {
      if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
        logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
        logger.debug("-Dio.netty.recycler.ratio: disabled");
        logger.debug("-Dio.netty.recycler.chunkSize: disabled");
      } else {
        logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
        logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
        logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
      }
    }
  }

  // 每个线程最大的容量
  private final int maxCapacityPerThread;
  // 间隔
  private final int interval;
  // 块大小
  private final int chunkSize;

  // 这里声明初始化了一个 Netty 自己实现的 FastThreadLocal 是为了在特定情况下提供更高性能的线程局部变量实现。
  // 它通过使用数组索引而非哈希表来加速变量的访问。但要注意,要充分发挥其优势,需要确保线程是 FastThreadLocalThread 或其子类。
  // 这里声明了一个 泛型是 LoaclPool 的 FastThreadLoal,保证每个线程都只有一个自己的 LoaclPool
  private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
    // 每个线程在首次访问 threadLocal 的时候会调用此方法进行初始化
    @Override
    protected LocalPool<T> initialValue() {
      return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
    }

    // 当线程结束或者 FastThreadLocal 被移除时会调用此方法
    @Override
    protected void onRemoval(LocalPool<T> value) throws Exception {
      super.onRemoval(value);
      value.pooledHandles.clear();
    }
  };

  // 无参构造函数,是一个构造函数的重载
  protected Recycler() {
    // 4k
    this(DEFAULT_MAX_CAPACITY_PER_THREAD);
  }

  // 有参构造,传入 maxCapacityPerThread
  protected Recycler(int maxCapacityPerThread) {
    // 默认的 4k  8 32
    this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
  }


  // 实际实现 3 参数,这里初始化了上述几个变量
  protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
    // 间隔初始化为 ratio 默认 8
    interval = max(0, ratio);
    if (maxCapacityPerThread <= 0) {
      this.maxCapacityPerThread = 0;
      this.chunkSize = 0;
    } else {
      // 默认 4k
      this.maxCapacityPerThread = max(4, maxCapacityPerThread);
      // 默认 32
      this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
    }
  }

  /**
     * 这个方法就是真正的分配方法了*
     */
  @SuppressWarnings("unchecked")
  public final T get() {
    // 如果你设置的每个线程的最大容量是 0 ,则每次都会新创建对象,并且不需要 HANDLE 回收处理
    if (maxCapacityPerThread == 0) {
      // 所以这里大概猜想,为了保持代码的一致性,就声明了一个 NOOP_HANDLE 来做此类不需要入池回收的对象创建
      return newObject((Handle<T>) NOOP_HANDLE);
    }
    // 本地池,从 threadLocal 中获取当前线程的 LocalPool
    LocalPool<T> localPool = threadLocal.get();
    // 从本地池获取一个 handle 这个方法有大逻辑,先仅表面瞅一眼
    DefaultHandle<T> handle = localPool.claim();
    T obj;
    if (handle == null) {
      // 如果 handle 是空则会通过 localPool 新建一个 handle, handle 会持有此 localPool 的引用
      handle = localPool.newHandle();
      if (handle != null) {
        // 这里如果新建 handle 成功则通过 newObject 方法传入新建的 handle 新建池化对象,
        // 也就是说这个新建的对象是可以通过 handle 回收池化利用的
        obj = newObject(handle);
        handle.set(obj);
      } else {
        // 新建不了一点的情况,跟没容量一样,直接新建一个不需要池化的对象
        obj = newObject((Handle<T>) NOOP_HANDLE);
      }
    } else {
      // 如果直接从池子里拿到可重用的 handle 对象了,则直接 get 获取,
      // 这里 get 就直接返回了 handle 持有的 value 也就是之前新建分配出去过的对象
      obj = handle.get();
    }

    return obj;
  }


  // 获取 threadLocal 的大小
  final int threadLocalSize() {
    // 这里很有意思,获取的但是 LocalPool 的 pooledHandles 的大小
    // 池化 handles 从这里的调用看也可以了解到,LocalPool 是持有一个 handle 池的
    // 那他实际上是个队列 Queue<DefaultHandle<T>>
    return threadLocal.get().pooledHandles.size();
  }

  // 抽象方法 newObject ,这里联系一下 ObjectPool ,实际的实现是通过调用 ObjectCreator 的 newObject 方法,
  // 所以这里看起来做了一个抽象的方法 newObject 做各类实现,
  // 但实际上只用这个做区别化实现就需要有很多类继承 Recycler , Recycler 又是一个逻辑比较多的类,而且有时候继承不是很方便的使用方式
  // 而组合更加灵活,所以他搞了一个 ObjectCreator 的接口抽象,在实例化 RecyclerObjectPool 的时候传入 ObjectCreator 来实现 newObject
  // 这里实例化 ObjectPool 的时候隐式实现一下这个接口就可以,就会比较方便 (个人理解)
  protected abstract T newObject(Handle<T> handle);

  // 这里又定义了一个 Handle 继承自 ObjectPool 的 Handle ,还有个注释 不能改变这个定义 。 估计是有啥历史原因,先不妄加揣测
  @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
  public interface Handle<T> extends ObjectPool.Handle<T>  { }

  // 默认的 Handle 实现,基本上他池子里实例化的都是这个
  private static final class DefaultHandle<T> implements Handle<T> {
    // 状态常量,已被分配的
    private static final int STATE_CLAIMED = 0;
    // 状态常量,可用的
    private static final int STATE_AVAILABLE = 1;
    // 原子 Integer 字段更新器,泛型给到了 DefaultHandle 用于状态更新 CAS 保证多线程环境下的原子更新
    private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
    static {
      // 静态代码块初始化了 STATE_UPDATER,指定对 DefaultHandle 类的 state 字段可进行原子更新
      AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
      //noinspection unchecked
      STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
    }

    // Handle 的状态字段,是通过 STATE_UPDATER 来进行更新的,用于判断是否可以被分配
    @SuppressWarnings({"FieldMayBeFinal", "unused"}) // Updated by STATE_UPDATER.
    private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
    // 这里定义了一个本地线程池,从之前的逻辑看,在一个 Handle 实例创建的时候会引用当前线程的 LocalPool,这里应该是方便后续释放
    private final LocalPool<T> localPool;
    // value 实际被 new 出来的池化对象,也就是我们池化复用的实际对象,我们所聊的就是 ByteBuf 的实例化对象引用
    private T value;

    // 构造方法
    DefaultHandle(LocalPool<T> localPool) {
      this.localPool = localPool;
    }

    // 回收方法
    @Override
    public void recycle(Object object) {
      // 校验你要回收的对象是不是你持有的
      if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
      }
      // 直接调用了 LocalPool 的 release 方法,并把当前的 Handle 对象传了进去
      localPool.release(this);
    }

    // get 方法返回实例化对象
    T get() {
      return value;
    }

    void set(T value) {
      this.value = value;
    }

    // 判断是否可以分配,如果可以分配会更新状态 为已分配
    boolean availableToClaim() {
      if (state != STATE_AVAILABLE) {
        return false;
      }
      return STATE_UPDATER.compareAndSet(this, STATE_AVAILABLE, STATE_CLAIMED);
    }

    // 更新状态为可用,如果本身已经可用会抛出异常
    void toAvailable() {
      int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
      if (prev == STATE_AVAILABLE) {
        throw new IllegalStateException("Object has been recycled already.");
      }
    }
  }

  // 关键的 本地的池子 LocalPool ,每个线程持有一个,再次提醒 T 泛型是你池化资源的类型,如我们这里就是 ByteBuf
  private static final class LocalPool<T> {
    // 比率间隔,比率间隔?? 又是啥东西,奇怪的命名 同 ratio 和 interval
    private final int ratioInterval;
    // Handles 队列池,从这个接口的设计上看,这个 Queue 应该就是池化的本质了,就是池本池,
    // 所以你如果问我池化实现的本质是什么,我应该会回答,本质就是一个 Handle 队列
    private final Queue<DefaultHandle<T>> pooledHandles;
    // 比率计数器?? 先打个问号,后面懂了来补充 应该是控制生产速率的
    private int ratioCounter;

    // 构造器,向调用方追溯在 FastThreadLocal 的初始化方法中是这样传入的 maxCapacityPerThread 4k, interval 8, chunkSize 32
    LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
      this.ratioInterval = ratioInterval;
      // 你看这里,队列是通过 PlatformDependent.newMpscQueue ,这个东西也没见过,有意思了
      // 我查了查,这个东西叫 MPSC 队列,是一个多生产者单消费者队列的一种实现(这看起来又是一种设计思想),
      // 那在这种场景很明显,Netty 的意图是想构建一个适用于一个或多个线程生产数据,而只有一个线程消费数据的情况。
      // 在对象池的场景中,生产者线程负责将句柄放入队列,而消费者线程负责从队列中取出句柄进行对象的分配和回收。
      // 这样的设计能够有效地在多线程环境中管理对象的生命周期。
      pooledHandles = PlatformDependent.newMpscQueue(chunkSize, maxCapacity);
      ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
    }

    // claim 这个词这里应该是用作索取的意思,索取一个 DefaultHandle
    DefaultHandle<T> claim() {
      // 这里给临时变量赋值为 pooledHandles
      Queue<DefaultHandle<T>> pooledHandles = this.pooledHandles;
      DefaultHandle<T> handle;
      do {
        handle = pooledHandles.poll();
        // 这里会先分配,如果为 null 则直接跳出循环返回 null
        // 如果不为空,且当前的 handle 不是可分配的状态则进行循环继续取下一个
      } while (handle != null && !handle.availableToClaim());
      return handle;
    }

    // 释放方法,这里实际是对 handle 进行操作
    void release(DefaultHandle<T> handle) {
      // 首先修改传入的 handle 为可用状态
      handle.toAvailable();
      // 将 handle 入队
      pooledHandles.offer(handle);
    }

    // 新建一个 handle
    DefaultHandle<T> newHandle() {
      // 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null
      // 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建
      if (++ratioCounter >= ratioInterval) {
        ratioCounter = 0;
        return new DefaultHandle<T>(this);
      }
      return null;
    }
  }
}

到了 Recycler 的设计上看起来就没有那么抽象了,主要的就是一个线程本地变量的控制,以及 LocalPool 对象的池化资源管理。

其实到此为止,并没有实现任何的创建对象的逻辑,所有创建对象逻辑均由 ObjectCreator 接口的实现来控制,这个是灵活控制的,它的体现会在梳理 PooledByteBufAllocator 的整体脉络时得到体现。

文章来源:https://blog.csdn.net/w903328615/article/details/135602345
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。