其它一些主流的分布式计算引擎,如StarRocks,实现的数据处理模型也是基于时间片。
JAVA库的线程池中的调度&执行模型就是典型地、基于任务的。
一般地,通过实现Runnable
接口,我们可以定义一个任务的执行逻辑,然后通过ExecutorService::submit(FutureTask)
方法提交执行,一旦Runnable::run()
方法执行结束,也就意味着与之绑定的线程的结束。
沿用Trino中的Split概念,在此模型下,run()
方法负责处理一个完整的Split,才会返回处理结果,然后线程由执行态
进入就绪态
,等待新的任务。
一个简单的任务调度模型(So easy, but out of control)样例代码如下:
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> processing(split)), executor);
future.join();
优点:设计实现简单。
缺点:线程执行时间无法预估,例如执行过程中的某个操作被阻塞,则整个线程阻塞
;线程中断逻辑复杂,想要实现中止线程的功能,需要考虑许多情况,并且中断时间延迟无法估计;线程状态的切换开销不可忽略。
类似操作系统中的CPU时间片的调度和执行模型。
沿用Trino中的Split概念,在此模型下,run()
方法可以负责处理一个完整的Split,并且为这个线程分配一个时间片,例如1s,则如果1秒后这个Split没有处理完,当前线程就主动挂起这个Split,转而处理另外一个优先级更高的Split,而不会切换自己的状态为就绪态或阻塞态
;同样地,如果在处理Split的过程中的某个操作被阻塞,则这个线程可以及时地挂起Split,转而去处理其它的Split。
优点:没有线程状态的切换开销;能够充分利用CPU资源,避免阻塞或等待;能够更好地基于权重处理Split;
缺点:实现复杂,不仅需要考虑如何释放任务,还需要考虑如何恢复任务。
由Coordinator调度分发给Worker执行,是对要读取的数据的描述
是对Split的更细粒度的切分,是数据处理的最小单元,是Operator一次处理的数据集合。
作用于Split之上的执行单元,可以认为就是一个方法,对Page处理后产生一个新的Page。
绑定唯一一个Split及一组Operators,负责Split的数据上能够正确地流经Operators。
Worker结点上执行Split的实体,它封装了Driver和Split。
Worker结点上调度Split时的调度对象,它封装了DriverSplitRunner实例,同时带有权重指标。
Coordinator ---分发---> Split ---> Worker
Worker ---接收---> Split ---> DiverSplitRunner ---> PriorityDriverSplitRunner ---> WaitingQueue
TaskExecutor ---调度---> WaitingQueue ---> PriorityDriverSplitRunner
PriorityDriverSplitRunner ---运行---> Driver
Driver ---处理---> Operators ---> Page ---> BlockedFuture
BlockedFuture ---阻塞---> BlockingQueue
BlockedFuture ---不阻---> 时间片结束 ---> WaitingQueue
PriorityDriverSplitRunner ---完成--->
Driver ---完成--->
Operator ---完成--->
Worker结点启动时,会通过Google Inject库,初始化工作线程池,这个线程池就是处理Split的。
@ThreadSafe
public class TaskExecutor
{
/**
* 这个方法被标记为PostConstruct,意味着在TaskExecutor类的实例构造完成后,就调用此方法。
*/
@PostConstruct
public synchronized void start()
{
checkState(!closed, "TaskExecutor is closed");
// 创建runnerThreads个线程,如果没有显示配置这个参数,则其默认值为:
// Runtime.getRuntime().availableProcessors() * 2
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
}
private synchronized void addRunnerThread()
{
try {
// executor是一个工作线程池,可以认为它有runnerThreads个线程,
// 而每一个线程会通过execute方法,绑定一个TaskRunner实例,它会一直Looping,负责处理Splits,见下文。
executor.execute(versionEmbedder.embedVersion(new TaskRunner()));
}
catch (RejectedExecutionException ignored) {
}
}
}
private class TaskRunner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) { // 一直Looping,直到显示地中止此任务,或是被中断
// select next worker
PrioritizedSplitRunner split;
try {
// 从Split等待队列中,取出一个要被处理的Split。
// waitingSplits是一个MultilevelSplitQueue类型的实例对象,支持基于权重排列Splits。
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
// 创建一个RunningSplitInfo实例对象,记录当前线程的计时器
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
runningSplitInfos.add(splitInfo);
runningSplits.add(split);
ListenableFuture<Void> blocked; // 定义一个Future类型的变量,用于获取Split被Blocked或挂起的原因
try {
// split.process是一个异步
blocked = split.process();
}
finally {
// 当前的Split处理完成,或是被BLOCKED时,就从运行队列中移除相关信息
runningSplitInfos.remove(splitInfo);
runningSplits.remove(split);
}
if (split.isFinished()) {
log.debug("%s is finished", split.getInfo());
splitFinished(split);
}
else {
// 如果当前的Split由于某种原因被阻塞
if (blocked.isDone()) {
// 同时阻塞状态已经被重置,一般地是由于上一次时间片用完了,因此可以被放置到等队列中
// 等待被调度执行
waitingSplits.offer(split);
}
else {
// 当前Split依然被阻塞,则将这个Split添加到BLOCKED Map中,并添加一个回调方法,一旦发现BLOCKED状态被重围,
// 就重新将其放入等待队列中
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
blockedSplits.remove(split);
// reset the level priority to prevent previously-blocked splits from starving existing splits
split.resetLevelPriority();
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
// ignore random errors due to driver thread interruption
if (!split.isDestroyed()) {
if (t instanceof TrinoException) {
TrinoException e = (TrinoException) t;
log.error(t, "Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
}
else {
log.error(t, "Error processing %s", split.getInfo());
}
}
splitFinished(split);
}
}
}
finally {
// unless we have been closed, we need to replace this thread
if (!closed) {
addRunnerThread();
}
}
}
}
在之前的文章中,我们知道
SqlTaskExecution
是SqlTask的执行实现,负责接收并执行由Coordinator分发的Split。
下面以调度Source Splits的流程来分析如何为每一个新的Split创建执行实例。
public class SqlTaskExecution
{
private synchronized void schedulePartitionedSource(TaskSource sourceUpdate)
{
mergeIntoPendingSplits(sourceUpdate.getPlanNodeId(), sourceUpdate.getSplits(), sourceUpdate.getNoMoreSplitsForLifespan(), sourceUpdate.isNoMoreSplits());
while (true) {
Iterator<SchedulingLifespan> activeLifespans = schedulingLifespanManager.getActiveLifespans();
boolean madeProgress = false;
while (activeLifespans.hasNext()) {
SchedulingLifespan schedulingLifespan = activeLifespans.next();
Lifespan lifespan = schedulingLifespan.getLifespan();
while (true) {
Optional<PlanNodeId> optionalSchedulingPlanNode = schedulingLifespan.getSchedulingPlanNode();
if (optionalSchedulingPlanNode.isEmpty()) {
break;
}
PlanNodeId schedulingPlanNode = optionalSchedulingPlanNode.get();
DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);
PendingSplits pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode).getLifespan(lifespan);
// Enqueue driver runners with driver group lifecycle for this driver life cycle, if not already enqueued.
if (!lifespan.isTaskWide() && !schedulingLifespan.getAndSetDriversForDriverGroupLifeCycleScheduled()) {
scheduleDriversForDriverGroupLifeCycle(lifespan);
}
// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
// pendingSplits保存了所有来自Coordinator分发过来的、新的Splits,
for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
// create a new driver for the split
runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
}
// 将所有的DriverSplitRunner添加到等待队列中,内部会调用下面的方法将其添加到TaskExecutor对象中
// taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
enqueueDriverSplitRunner(false, runners.build());
// If all driver runners have been enqueued for this plan node and driver life cycle combination,
// move on to the next plan node.
if (pendingSplits.getState() != NO_MORE_SPLITS) {
break;
}
partitionedDriverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));
pendingSplits.markAsCleanedUp();
schedulingLifespan.nextPlanNode();
madeProgress = true;
if (schedulingLifespan.isDone()) {
break;
}
}
}
if (!madeProgress) {
break;
}
}
if (sourceUpdate.isNoMoreSplits()) {
schedulingLifespanManager.noMoreSplits(sourceUpdate.getPlanNodeId());
}
}
}
public List<ListenableFuture<Void>> enqueueSplits(TaskHandle taskHandle, boolean intermediate, List<? extends SplitRunner> taskSplits)
{
List<PrioritizedSplitRunner> splitsToDestroy = new ArrayList<>();
List<ListenableFuture<Void>> finishedFutures = new ArrayList<>(taskSplits.size());
synchronized (this) {
for (SplitRunner taskSplit : taskSplits) {
// 封装所有新的SplitRunner实例为PrioritizedSplitRunner。
// PrioritizedSplitRunner顾名思义,带有权重,因此在被调度执行时,会按权重决定下一个要执行的Runner
PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(
taskHandle,
taskSplit,
ticker,
globalCpuTimeMicros,
globalScheduledTimeMicros,
blockedQuantaWallTime,
unblockedQuantaWallTime);
if (taskHandle.isDestroyed()) {
// If the handle is destroyed, we destroy the task splits to complete the future
splitsToDestroy.add(prioritizedSplitRunner);
}
else if (intermediate) {
// Note: we do not record queued time for intermediate splits
startIntermediateSplit(prioritizedSplitRunner);
// add the runner to the handle so it can be destroyed if the task is canceled
taskHandle.recordIntermediateSplit(prioritizedSplitRunner);
}
else {
// 对于Source Splits来说,会走这里
// add this to the work queue for the task
// taskHandle负责维护所有的Source Splits与Itermediate Splits,只是为了调度时更方便
taskHandle.enqueueSplit(prioritizedSplitRunner);
// if task is under the limit for guaranteed splits, start one
// 这个方法,才会真正地调度Splits执行
scheduleTaskIfNecessary(taskHandle);
// if globally we have more resources, start more
addNewEntrants();
}
finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
}
}
for (PrioritizedSplitRunner split : splitsToDestroy) {
split.destroy();
}
return finishedFutures;
}
从前面的章节知道,一个新的Split到Worker接收,会被封装成一个PrioritizedSplitRunner实例,而PrioritizedSplitRunner就是处理Split的实体。
private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle)
{
// if task has less than the minimum guaranteed splits running,
// immediately schedule new splits for this task. This assures
// that a task gets its fair amount of consideration (you have to
// have splits to be considered for running on a thread).
// 获取当前能够调度的Splits的最大数量
int splitsToSchedule = min(guaranteedNumberOfDriversPerTask, taskHandle.getMaxDriversPerTask().orElse(Integer.MAX_VALUE)) - taskHandle.getRunningLeafSplits();
for (int i = 0; i < splitsToSchedule; ++i) {
// 从taskHandle里面拿到下一个PrioritizedSplitRunner的实例,就是简单从队列中的取出队首元素
PrioritizedSplitRunner split = taskHandle.pollNextSplit();
if (split == null) {
// no more splits to schedule
return;
}
// 执行PrioritizedSplitRunner实例
startSplit(split);
splitQueuedTime.add(Duration.nanosSince(split.getCreatedNanos()));
}
}
private synchronized void startSplit(PrioritizedSplitRunner split)
{
// 将待执行的split首先添加到等待队列中,即waitingSplits中
allSplits.add(split);
waitingSplits.offer(split);
}
从
Worker调度Split
章节知道,Split对应的PrioritizedSplitRunner
实例会被添加进TaskExecutor对象的waitingSplits
成员变量中,而waitingSplits
会在TaskExecution::run()
方法中被访问。
public ListenableFuture<Void> process()
{
try {
// 当前的Split被调度执行了,因此计时开始
long startNanos = ticker.read();
start.compareAndSet(0, startNanos);
lastReady.compareAndSet(0, startNanos);
processCalls.incrementAndGet();
waitNanos.getAndAdd(startNanos - lastReady.get());
// 由于Trino中的Split执行时基于时间片调度模型的,因此这里需要创建一个CpuTimer实例来计时
CpuTimer timer = new CpuTimer();
// SPLIT_RUN_QUANTA限定了当前Split允许的一个时间片的长度,其取值默认为
// SPLIT_RUN_QUANTA = new Duration(1, TimeUnit.SECONDS);
// split.processFor(SPLIT_RUN_QUANTA)方法返回一个Future实例,它记录了当前Split被阻塞时的上下文信息
ListenableFuture<Void> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();
long quantaScheduledNanos = ticker.read() - startNanos;
scheduledNanos.addAndGet(quantaScheduledNanos);
priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
lastRun.set(ticker.read());
if (blocked == NOT_BLOCKED) {
// 如果当前的Split是静态对象NOT_BLOCKED,意味着一个完整的时间片执行完了,因此累计非阻塞时长
unblockedQuantaWallTime.add(elapsed.getWall());
}
else {
// 否则意味着一个完整的时间片没有执行用完,因此累计阻塞时长
blockedQuantaWallTime.add(elapsed.getWall());
}
long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
cpuTimeNanos.addAndGet(quantaCpuNanos);
globalCpuTimeMicros.update(quantaCpuNanos / 1000);
globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);
// 返回当前Split的阻塞信息给上层
return blocked;
}
catch (Throwable e) {
finishedFuture.setException(e);
throw e;
}
}
@Override
public ListenableFuture<Void> processFor(Duration duration)
{
Driver driver;
synchronized (this) {
// if close() was called before we get here, there's not point in even creating the driver
if (closed) {
return immediateVoidFuture();
}
if (this.driver == null) {
// 如果是第一次执行当前的Split,则driver == null,因此需要创建一个Driver实例,负责将一组Operators应用到当前的Split上
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
driver = this.driver;
}
// 最终由Driver实现处理逻辑
return driver.processFor(duration);
}
Driver的一次处理,最多会
占用1秒的CPU时间片
,在非阻塞的情况下,一旦时间片耗尽,就会主动退出当前Driver的执行,并返回NOT_BLOCKED
对象,期望能够被上层调度器继续调度执行。
另外一个需要注意的点是,Driver内部处理数据的最小单元是Page,除非这个Page被处理完成,否则当前线程/Driver不会由于执行时间超过时间片的规定,而强制中止处理
,故这里的1秒钟是一种软限制。
public ListenableFuture<Void> processFor(Duration duration)
{
checkLockNotHeld("Cannot process for a duration while holding the driver lock");
requireNonNull(duration, "duration is null");
// if the driver is blocked we don't need to continue
SettableFuture<Void> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
Optional<ListenableFuture<Void>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
// driverContext维护的当前Driver实例运行时的上下文信息
OperationTimer operationTimer = createTimer();
driverContext.startProcessTimer();
// DriverContext中有一个成员变量DriverYieldSignal,可以认为它是一个带有定时功能的监听器,因此在这里
// 告诉DriverYieldSignal,在经过最大运行时间之后(1秒),就触发一个Yield信号,主动放弃当前Driver的执行。
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
// processInternal(operationTimer)方法的一次调用,只会处理绑定的Split中的一个数据页Page,
// 因此Page可以认为是数据处理的最小单元。
ListenableFuture<Void> future = processInternal(operationTimer);
if (!future.isDone()) {
// 走到这里意味着,当前的Driver执行被阻塞/挂起了,因此强制限制循环
// 被挂起的原因可能是:内存不足、等待RPC返回等。
return updateDriverBlockedFuture(future);
}
}
// 循环执行当前的Driver,直到完成,但如果当前Driver执行完成时,会退出;或者是最大的时间片被用完,也会退出
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
// 方法完成退出,总是会重置DriverYieldSignal的状态,
driverContext.getYieldSignal().reset();
driverContext.recordProcessed(operationTimer);
}
// 只有当前Driver没有被阻塞/挂起时,才会走这里,返回静态常量
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
@GuardedBy("exclusiveLock")
private ListenableFuture<Void> processInternal(OperationTimer operationTimer)
{
checkLockHeld("Lock must be held to call processInternal");
// 尝试回收可以被强扭回收的内存资源
handleMemoryRevoke();
try {
// 此方法会尝试辨别出新接收的Splits
processNewSources();
// If there is only one operator, finish it
// Some operators (LookupJoinOperator and HashBuildOperator) are broken and requires finish to be called continuously
// TODO remove the second part of the if statement, when these operators are fixed
// Note: finish should not be called on the natural source of the pipeline as this could cause the task to finish early
// 如果只有一个Operator时,直接标记其为完成状态
if (!activeOperators.isEmpty() && activeOperators.size() != allOperators.size()) {
Operator rootOperator = activeOperators.get(0);
rootOperator.finish();
rootOperator.getOperatorContext().recordFinish(operationTimer);
}
boolean movedPage = false; // 标记,记录是否处理过数据页
for (int i = 0; i < activeOperators.size() - 1 && !driverContext.isDone(); i++) { // 遍历所有还没有完成的Operator实例
Operator current = activeOperators.get(i);
// 获取下游Operator实例
Operator next = activeOperators.get(i + 1);
// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
// 如果当前Operator实例处理BLOCKED状态,则后续的处理,遍历下一个Operator
continue;
}
// if the current operator is not finished and next operator isn't blocked and needs input...
if (!current.isFinished() && getBlockedFuture(next).isEmpty() && next.needsInput()) {
// 如果当前的Operator还没有完成,同时下游的Operator没有被阻塞,那么就将通过current.getOutput()方法计算并产出一个数据页,
// 并传递给下游的Operator实例
// get an output page from current operator
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(operationTimer, page);
// if we got an output page, add it to the next operator
if (page != null && page.getPositionCount() != 0) {
// 如果当前Operator实例产出的数据页,且有数据时,才会传递全下游Operator实例,并更新标记
next.addInput(page);
next.getOperatorContext().recordAddInput(operationTimer, page);
movedPage = true;
}
if (current instanceof SourceOperator) {
// 如果当前的Operator实例,是一个SourceOperator,即从remote source/table/file读取了数据时,总是更新标记
movedPage = true;
}
}
// if current operator is finished...
if (current.isFinished()) {
// let next operator know there will be no more data
next.finish();
next.getOperatorContext().recordFinish(operationTimer);
}
}
for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// 从后向前遍历还在运行的Operator实例,如果发现了一个处理完成状态的Operator实例时,需要保证在其之前(上游)的Operator实例,也都已经完成了
// close and remove this operator and all source operators
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Throwable throwable = closeAndDestroyOperators(finishedOperators);
finishedOperators.clear();
if (throwable != null) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
// Finish the next operator, which is now the first operator.
if (!activeOperators.isEmpty()) {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
newRootOperator.getOperatorContext().recordFinish(operationTimer);
}
break;
}
}
// if we did not move any pages, check if we are blocked
if (!movedPage) {
// 走到这里,说明没有新的数据生成,因此需要判断是由于阻塞导致的,还是由其它原因导致的
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<Void>> blockedFutures = new ArrayList<>();
// 收集所有可能被阻塞的Operator的信息
for (Operator operator : activeOperators) {
Optional<ListenableFuture<Void>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}
if (!blockedFutures.isEmpty()) {
// 如果发现了确实有某些Operators被阻塞了,那么就返回给上层一个新的ListenableFuture实例,
// 用于监听被阻塞的Operators,可以交由上层决定一旦有一个Operator的状态被重置时的行为。
// unblock when the first future is complete
ListenableFuture<Void> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}
// 到这里,说明没有数据页生成,那就是所有的数据都被过滤掉了,没有BLOCKING行为
return NOT_BLOCKED;
}
catch (Throwable t) {
// 如果当前Driver的执行过程中产生了非预期的错误时,就标记当前Driver实例的状态为失败
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;
}
// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
TrinoException newException = new TrinoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
}
}
通过
TaskRunner的创建及运行
章节的分析可以知道,一个TaskRunner同一时间,只会执行一个PrioritizedSplitRunner实例;而一个PrioritizedSplitRunner在执行期间会由于时间片的耗尽,而被主动挂起,将自己重新添加进等待队列(一个MultilevelSplitQueue类型的实例对象)中。
因此为了能够更加合理地选择,哪些重新入进等待队列的SplitRunner能够首先被调度执行,就需要一定的策略。
MultilevelSplitQueue::take方法返回优先级最高的SplitRunner时,会考虑两个层面的因素,即等待队列权重、SplitRunner(任务)权重。
等待队列的优先级定义:
MultilevelSplitQueue定义了五个不同的权重队列,它们各自绑定了一个固定的时间阈值,即[0, 1, 10, 60, 300]。
例如第一个权重队列,它对应的时间阈值为0,即0秒,表示如果此队列中的SplitRunner被调度过,且调度&执行时间为T时,那么这个队列被选择调度的权重ratio = T / 0 = 0。因此在Trino的调度策略里,简单来说,ration的值越大,说明这个队列被选择调度的次数越少/时间越少,因此应该越被优先选择。
SplitRunner的优先级定义:
当确认了一个最高优先级的等待队列调度时,还需要从这个队列中选择一个优先级最高的SplitRunner返回给上层调度器。而每一个SplitRunner拥有一个成员变量,AtomicReference<Priority> priority;
,它存储了当前SplitRunner相对于当前队列的权重,可以简单地看成是执行时间的长短,如果SplitRunner的已执行时间越短,那么它的权重越大,越被优先选择,但并不意味着它就是最终被调度的,还需要看它是否被提升到更高的等待队列中,因此还需要注意result.updateLevelPriority()
的逻辑。
public PrioritizedSplitRunner take()
throws InterruptedException
{
while (true) {
lock.lockInterruptibly();
try {
PrioritizedSplitRunner result;
// 通过pollSplit()方法,返回一个权重(Priority)最低,即优先级最高的SplitRunner实例
while ((result = pollSplit()) == null) {
notEmpty.await();
}
// 如果这个SplitRunner发生了队列变更,需要将其添加到正确的等待队列,并重新选择新的SplitRunner实例
if (result.updateLevelPriority()) {
offer(result);
continue;
}
//到这里,result就是要被选择执行的SplitRunner,那么就重置相关的标记变量,并更新计数
int selectedLevel = result.getPriority().getLevel();
levelMinPriority[selectedLevel].set(result.getPriority().getLevelPriority());
selectedLevelCounters.get(selectedLevel).update(1);
// 返回给调用者
return result;
}
finally {
// 总是是释放本地锁
lock.unlock();
}
}
}
SplitRunner的挂起,可以认为就是某个Operator被BLOCKED导致的,因此只需要分析Operator被BLOCKED的情况即可。
在SplitRunner执行的过程中,会存在两类BLOCKED的情况:
Revoking Memory(Operator的预计算过程):执行一个Operator执行之前,需要检查当前Operator正在回收内存,如果正处于这个状态,意味着不能触发它的计算,否则会导致内存混乱,这个过程不细说了,需要结合内存管理
相关的内容进行分析。
Operator的计算过程:一个Operator实例的计算过程,就是对Input Page的处理过程,通过getOutput()
方法触发。因此在这个过程中可能会导致链路上的某个Operator实例处理Blocked的状态,表示Page数据还有没准备好。
@Override
public void addInput(Page page)
{
throw new UnsupportedOperationException(getClass().getName() + " cannot take input");
}
@Override
public Page getOutput()
{
// 这个算子是一个intermediate stage的最上游操作子,负责从无端拉取数据。
// 因此它的addInput(Page)方法是不需要实现的,并且通过exhcangeClient拉取数据。
// 从下面的代码可以看到,exchangeClient.pollPage()肯定是异步的,它返回null,也就意味着此实例处于BLOCKED的状态。
SerializedPage page = exchangeClient.pollPage();
if (page == null) {
return null;
}
operatorContext.recordNetworkInput(page.getSizeInBytes(), page.getPositionCount());
// 反序列化数据页的内容,数据交互时,数据面是被压缩的
Page deserializedPage = serde.deserialize(page);
operatorContext.recordProcessedInput(deserializedPage.getSizeInBytes(), page.getPositionCount());
return deserializedPage;
}
ExchangeOperator的BLOCKED状态就是ExchangeClient是否拉到一个Page的数据的状态。
public class ExchangeOperator
implements SourceOperator
{
private ListenableFuture<Void> isBlocked = NOT_BLOCKED;
@Override
public ListenableFuture<Void> isBlocked()
{
// Avoid registering a new callback in the ExchangeClient when one is already pending
if (isBlocked.isDone()) {
// 可以看到,Operator是不是处于BBLOCKED的状态,实际上取决于exchangeClient是否拉取完成一个Page的数据。
isBlocked = exchangeClient.isBlocked();
if (isBlocked.isDone()) {
isBlocked = NOT_BLOCKED;
}
}
return isBlocked;
}
}