本节接着 ObjectPool 的设计脉络,具体看看其具体实现 RecyclerObjectPool 中引用的 Recycler 究竟是怎么实现的
这一张图基本已经说明白了,我再做个总结,对细节感兴趣的可以看看我下面带源码的注释。
对于 Recycler 回收器:它实现了很多方法和类,但实际上除了持有一些静态参数配置外,它仅持有了一个 FastThreadLocal<LocalPool<T>>
那实际上就是每个线程有一个自己的 LocalPool ,真正分配也是通过调用 LocalPool 的方法。
那从设计的角度来说,在思考要设计一个对象池的时候就要考虑到多线程分配的问题,使用线程本地变量可能就是要优先考虑的问题。
然后实际上我们的对象池就是 LocalPool 对象中实例化的一个队列 Queue,这个 Queue 也比较有意思后续章节会详细讨论一下,这里只需要记住它是一个可以支持多线程入队,单线程消费的队列。
这个队列存储了我们分配出去然后空闲释放的句柄 handle,在之后的分配中会首先尝试在对象池中获取可以分配的句柄,如果没有则尝试创建池化句柄,如果没有达到可创建的间隔会直接创建非池化对象
根据解析篇 1 ObjectPool 和 本篇 Recycler ,已经大致对池的设计有了一个大概的了解
package io.netty.util;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.ObjectPool;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* Light-weight object pool based on a thread-local stack.
* 这里源码注释说这就是一个 轻量级的对象池,基于本地线程栈实现。
以下代码我省去了其中废弃的方法
* @param <T> the type of the pooled object
*/
public abstract class Recycler<T> {
// 这是一个日志工具
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
/**
* 定义一个空的 HANDLE 实现,实现 Handle 接口,实现方法不做任何事
*/
private static final Handle<?> NOOP_HANDLE = new Handle<Object>() {
@Override
public void recycle(Object object) {
// NOOP
}
@Override
public String toString() {
return "NOOP_HANDLE";
}
};
// 这里定义了每个线程的初始 最大 容量是 4k
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
// 默认每个线程的最大容量 默认等于上面这个东西 ↑ 4k
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
// 一个比率
private static final int RATIO;
// 默认的每个线程的队列块的大小 默认 32
private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
static {
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
// 在未来的版本,我们可能会为不同的对象设置不同的 maxCapacity
// 这里先取 maxCapacity 再尝试取 maxCapacityPerThread 优先读 maxCapacityPerThread 的设置
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
// 如果你的配置小于 0 ,会使用默认的 4k
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}
// 这里初始化了 默认的每个线程额最大容量,如果不配置的话就是 4k 大小
DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
// 获取每个线程的线程块大小,默认 32
DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
// bursts.
// 这里写的很抽象,我看了半天没搞懂,大概的感觉是 RATIO 是默认为 8 ,也就是说每 8 次的分配中有 1 次是尝试重用的,
// 剩余 7 次都是进行扩容,也就是说,在一定时间的预热后这个池化资源池可能会逼近最大分配值。
// 这种缓慢的增长有助于在热点场景不会那么猛烈的申请空间 (大概……缓慢吧)
// 仅字面理解,后续代码核实后会订正 (问了 GPT3.5 和 通义千问)
// ok 我回来了,这里应该是说这个方法,就是指每 8 次申请才生成一个新的
// DefaultHandle<T> newHandle() {
// // 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null
// // 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建
// if (++ratioCounter >= ratioInterval) {
// ratioCounter = 0;
// return new DefaultHandle<T>(this);
// }
// return null;
// }
RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
// 如果可以打 debug 日志,就打一下配置日志
if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
logger.debug("-Dio.netty.recycler.chunkSize: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
}
}
}
// 每个线程最大的容量
private final int maxCapacityPerThread;
// 间隔
private final int interval;
// 块大小
private final int chunkSize;
// 这里声明初始化了一个 Netty 自己实现的 FastThreadLocal 是为了在特定情况下提供更高性能的线程局部变量实现。
// 它通过使用数组索引而非哈希表来加速变量的访问。但要注意,要充分发挥其优势,需要确保线程是 FastThreadLocalThread 或其子类。
// 这里声明了一个 泛型是 LoaclPool 的 FastThreadLoal,保证每个线程都只有一个自己的 LoaclPool
private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
// 每个线程在首次访问 threadLocal 的时候会调用此方法进行初始化
@Override
protected LocalPool<T> initialValue() {
return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
}
// 当线程结束或者 FastThreadLocal 被移除时会调用此方法
@Override
protected void onRemoval(LocalPool<T> value) throws Exception {
super.onRemoval(value);
value.pooledHandles.clear();
}
};
// 无参构造函数,是一个构造函数的重载
protected Recycler() {
// 4k
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}
// 有参构造,传入 maxCapacityPerThread
protected Recycler(int maxCapacityPerThread) {
// 默认的 4k 8 32
this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
}
// 实际实现 3 参数,这里初始化了上述几个变量
protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
// 间隔初始化为 ratio 默认 8
interval = max(0, ratio);
if (maxCapacityPerThread <= 0) {
this.maxCapacityPerThread = 0;
this.chunkSize = 0;
} else {
// 默认 4k
this.maxCapacityPerThread = max(4, maxCapacityPerThread);
// 默认 32
this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
}
}
/**
* 这个方法就是真正的分配方法了*
*/
@SuppressWarnings("unchecked")
public final T get() {
// 如果你设置的每个线程的最大容量是 0 ,则每次都会新创建对象,并且不需要 HANDLE 回收处理
if (maxCapacityPerThread == 0) {
// 所以这里大概猜想,为了保持代码的一致性,就声明了一个 NOOP_HANDLE 来做此类不需要入池回收的对象创建
return newObject((Handle<T>) NOOP_HANDLE);
}
// 本地池,从 threadLocal 中获取当前线程的 LocalPool
LocalPool<T> localPool = threadLocal.get();
// 从本地池获取一个 handle 这个方法有大逻辑,先仅表面瞅一眼
DefaultHandle<T> handle = localPool.claim();
T obj;
if (handle == null) {
// 如果 handle 是空则会通过 localPool 新建一个 handle, handle 会持有此 localPool 的引用
handle = localPool.newHandle();
if (handle != null) {
// 这里如果新建 handle 成功则通过 newObject 方法传入新建的 handle 新建池化对象,
// 也就是说这个新建的对象是可以通过 handle 回收池化利用的
obj = newObject(handle);
handle.set(obj);
} else {
// 新建不了一点的情况,跟没容量一样,直接新建一个不需要池化的对象
obj = newObject((Handle<T>) NOOP_HANDLE);
}
} else {
// 如果直接从池子里拿到可重用的 handle 对象了,则直接 get 获取,
// 这里 get 就直接返回了 handle 持有的 value 也就是之前新建分配出去过的对象
obj = handle.get();
}
return obj;
}
// 获取 threadLocal 的大小
final int threadLocalSize() {
// 这里很有意思,获取的但是 LocalPool 的 pooledHandles 的大小
// 池化 handles 从这里的调用看也可以了解到,LocalPool 是持有一个 handle 池的
// 那他实际上是个队列 Queue<DefaultHandle<T>>
return threadLocal.get().pooledHandles.size();
}
// 抽象方法 newObject ,这里联系一下 ObjectPool ,实际的实现是通过调用 ObjectCreator 的 newObject 方法,
// 所以这里看起来做了一个抽象的方法 newObject 做各类实现,
// 但实际上只用这个做区别化实现就需要有很多类继承 Recycler , Recycler 又是一个逻辑比较多的类,而且有时候继承不是很方便的使用方式
// 而组合更加灵活,所以他搞了一个 ObjectCreator 的接口抽象,在实例化 RecyclerObjectPool 的时候传入 ObjectCreator 来实现 newObject
// 这里实例化 ObjectPool 的时候隐式实现一下这个接口就可以,就会比较方便 (个人理解)
protected abstract T newObject(Handle<T> handle);
// 这里又定义了一个 Handle 继承自 ObjectPool 的 Handle ,还有个注释 不能改变这个定义 。 估计是有啥历史原因,先不妄加揣测
@SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
public interface Handle<T> extends ObjectPool.Handle<T> { }
// 默认的 Handle 实现,基本上他池子里实例化的都是这个
private static final class DefaultHandle<T> implements Handle<T> {
// 状态常量,已被分配的
private static final int STATE_CLAIMED = 0;
// 状态常量,可用的
private static final int STATE_AVAILABLE = 1;
// 原子 Integer 字段更新器,泛型给到了 DefaultHandle 用于状态更新 CAS 保证多线程环境下的原子更新
private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
static {
// 静态代码块初始化了 STATE_UPDATER,指定对 DefaultHandle 类的 state 字段可进行原子更新
AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
//noinspection unchecked
STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
}
// Handle 的状态字段,是通过 STATE_UPDATER 来进行更新的,用于判断是否可以被分配
@SuppressWarnings({"FieldMayBeFinal", "unused"}) // Updated by STATE_UPDATER.
private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
// 这里定义了一个本地线程池,从之前的逻辑看,在一个 Handle 实例创建的时候会引用当前线程的 LocalPool,这里应该是方便后续释放
private final LocalPool<T> localPool;
// value 实际被 new 出来的池化对象,也就是我们池化复用的实际对象,我们所聊的就是 ByteBuf 的实例化对象引用
private T value;
// 构造方法
DefaultHandle(LocalPool<T> localPool) {
this.localPool = localPool;
}
// 回收方法
@Override
public void recycle(Object object) {
// 校验你要回收的对象是不是你持有的
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
// 直接调用了 LocalPool 的 release 方法,并把当前的 Handle 对象传了进去
localPool.release(this);
}
// get 方法返回实例化对象
T get() {
return value;
}
void set(T value) {
this.value = value;
}
// 判断是否可以分配,如果可以分配会更新状态 为已分配
boolean availableToClaim() {
if (state != STATE_AVAILABLE) {
return false;
}
return STATE_UPDATER.compareAndSet(this, STATE_AVAILABLE, STATE_CLAIMED);
}
// 更新状态为可用,如果本身已经可用会抛出异常
void toAvailable() {
int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
if (prev == STATE_AVAILABLE) {
throw new IllegalStateException("Object has been recycled already.");
}
}
}
// 关键的 本地的池子 LocalPool ,每个线程持有一个,再次提醒 T 泛型是你池化资源的类型,如我们这里就是 ByteBuf
private static final class LocalPool<T> {
// 比率间隔,比率间隔?? 又是啥东西,奇怪的命名 同 ratio 和 interval
private final int ratioInterval;
// Handles 队列池,从这个接口的设计上看,这个 Queue 应该就是池化的本质了,就是池本池,
// 所以你如果问我池化实现的本质是什么,我应该会回答,本质就是一个 Handle 队列
private final Queue<DefaultHandle<T>> pooledHandles;
// 比率计数器?? 先打个问号,后面懂了来补充 应该是控制生产速率的
private int ratioCounter;
// 构造器,向调用方追溯在 FastThreadLocal 的初始化方法中是这样传入的 maxCapacityPerThread 4k, interval 8, chunkSize 32
LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
this.ratioInterval = ratioInterval;
// 你看这里,队列是通过 PlatformDependent.newMpscQueue ,这个东西也没见过,有意思了
// 我查了查,这个东西叫 MPSC 队列,是一个多生产者单消费者队列的一种实现(这看起来又是一种设计思想),
// 那在这种场景很明显,Netty 的意图是想构建一个适用于一个或多个线程生产数据,而只有一个线程消费数据的情况。
// 在对象池的场景中,生产者线程负责将句柄放入队列,而消费者线程负责从队列中取出句柄进行对象的分配和回收。
// 这样的设计能够有效地在多线程环境中管理对象的生命周期。
pooledHandles = PlatformDependent.newMpscQueue(chunkSize, maxCapacity);
ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
}
// claim 这个词这里应该是用作索取的意思,索取一个 DefaultHandle
DefaultHandle<T> claim() {
// 这里给临时变量赋值为 pooledHandles
Queue<DefaultHandle<T>> pooledHandles = this.pooledHandles;
DefaultHandle<T> handle;
do {
handle = pooledHandles.poll();
// 这里会先分配,如果为 null 则直接跳出循环返回 null
// 如果不为空,且当前的 handle 不是可分配的状态则进行循环继续取下一个
} while (handle != null && !handle.availableToClaim());
return handle;
}
// 释放方法,这里实际是对 handle 进行操作
void release(DefaultHandle<T> handle) {
// 首先修改传入的 handle 为可用状态
handle.toAvailable();
// 将 handle 入队
pooledHandles.offer(handle);
}
// 新建一个 handle
DefaultHandle<T> newHandle() {
// 看这里创建的逻辑就明白了,这里是每 ratioInterval 次才实际执行一次新建,其他的时候都直接返回 null
// 这里默认每 8 次才初始化一个新的句柄,因为 ratioCounter 初始化为 ratioInterval ,所以第一次会执行新建
if (++ratioCounter >= ratioInterval) {
ratioCounter = 0;
return new DefaultHandle<T>(this);
}
return null;
}
}
}
到了 Recycler 的设计上看起来就没有那么抽象了,主要的就是一个线程本地变量的控制,以及 LocalPool 对象的池化资源管理。
其实到此为止,并没有实现任何的创建对象的逻辑,所有创建对象逻辑均由 ObjectCreator 接口的实现来控制,这个是灵活控制的,它的体现会在梳理 PooledByteBufAllocator 的整体脉络时得到体现。