Kotlin协程的JVM实现源码分析(上)

发布时间:2024年01月18日

本文从协程的启动launch源码入手分析,协程JVM实现分为两篇:

  1. 协程启动和执行源码分析
  2. 无栈协程 和 Continuation

基本环境:

  • IntelliJ IDEA 2023.3.2
  • Kotlin 1.8.20
  • kotlinx-coroutines-core 1.7.3
  • gradle 8.2

一、协程的启动和执行

GlobalScope.launch 启动协程分析:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

调用关系:CoroutineScope.launch -> StandaloneCoroutine.start ->
CoroutineStart.invoke -> block.startCoroutineCancellable

1. startCoroutineCancellable 启动流程

启动协程,默认 执行 block.startCoroutineCancellable 扩展方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

说明:

  1. createCoroutineUnintercepted, 创建 Continuation;
  2. intercepted,拦截生成 DispatchedContinuation;
  3. resumeCancellableWith,调度器 派发 执行协程。
intercepted()

关键在 intercepted() 方法:

  • 获取 ContinuationInterceptor,默认值是 Dispatchers.Default,
  • 使用 CoroutineDispatcher.interceptContinuation 生成 DispatchedContinuation
// ContinuationImpl 拦截方法
@Transient
private var intercepted: Continuation<Any?>? = null

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

// Dispatchers.Default 源码
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
    DispatchedContinuation(this, continuation)
resumeCancellableWith

DispatchedContinuation.resumeCancellableWith 实现了 调度器(线程池)的任务 派发,也就是 Runnable

inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled(state)) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

默认执行到
dispatcher.dispatch(context, this),此时调度器 派发 执行。
DispatchedContinuation 实现了 Runnable 接口, run() 调用即开始执行阶段,接下来分析。

2. DispatchedContinuation

DispatchedContinuation 继承 DispatchedTask,委托了 Contanuation

协程的最终执行:

DispatchedContinuation.resumeCancellableWith -> dispatcher.dispatch()
-> DispatchedTask.run() -> DispatchedContinuation.continuation.resumeWith

最终 DispatchedContinuation.continuation 也就是我们 launch {} 块生成的 SuspendLambda 类对象。

ContinuationImpl

无论是 launch {} 块生成的 SuspendLambda类,还是 suspend fun 函数 生成 ContinuationImpl 匿名类。
它们都 继承 BaseContinuationImpl

基类 BaseContinuationImpl,实现了 resumeWith

try {
    val outcome = invokeSuspend(param)
    if (outcome === COROUTINE_SUSPENDED) return
    Result.success(outcome)
} catch (exception: Throwable) {
    Result.failure(exception)
}

调用了 抽象 invokeSuspend,也就是 launch {} 块编译后的代码。

执行完成后,会执行 协程 completion.resumeWith(outcome),最终完成。

resumeWith -> onCompletionInternal -> onCompleted 或 onCancelled

launch {} 块编译代码分析

launch {}async {} 编译后,都继承 SuspendLambda

反编译.class,通过 jadx-gui 看到 块 代码一般是下面 形式:

public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    int label;

    @NotNull
    public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> continuation) {
        return new AnonymousClass1(continuation);
    }

    @Nullable
    public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<? super Unit> continuation) {
        return create(p1, continuation).invokeSuspend(Unit.INSTANCE);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        // launch {} 执行代码块 逻辑,会编译到这里
        ...
    }
}

小结

通过launch启动协程源码分析,了解了:

  • Dispatchers 调度器何时派发任务
  • 协程 Continuation 挂起、执行 和 恢复

补充类图

Continuation
Continuation
context
resumeWith(result)
BaseContinuationImpl
resumeWith(result)
invokeSuspend(result)
ContinuationImpl
SuspendLambda
RestrictedContinuationImpl
RestrictedSuspendLambda

DispatchedContinuation

DispatchedContinuation
dispatcher
continuation
resumeWith(result)
resumeCancellableWith(result)
DispatchedTask
run()
Runnable

关键类和文件

  1. kotlin.coroutines.Continuation.kt
  2. kotlin.coroutines.CoroutineContext
  3. kotlin.coroutines.jvm.internal.BaseContinuationImpl.kt 对应 Continuation 默认实现。
  4. kotlinx.coroutines.CoroutineStart 启动线程方式、调用

文档

文章来源:https://blog.csdn.net/w709835509/article/details/135673829
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。