Rxjava链式调用解析

发布时间:2024年01月17日

本文以下面代码为例逐步解析

Observable.just("数据源")
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        return 1;
                    }
                })
                .filter(integer -> {
                    return integer == 1;
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onNext(Object o) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
 

从just开始

 public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

返回了一个将传入的参数封装成了一个 ObservableJust对象
其他的Rxjava创建操作符类似:比如create(), just(),fromArray(),fromIterable(),timer(),interval()等

ObservableJust类


public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {  //将传入的参数赋值给value
        this.value = value;
    }


   //重点方法  稍后看
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        observer.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

map方法

由于just方法返回了一个ObservableJust对象,所以调用链的map方法调用的ObservableJust对象的map方法
但是我们看到ObservableJust类中并没有map方法,所以去看他的父类Observable

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

在他的父类Observable中看到,map()依然是返回了一个ObservableMap对象,这个对象将当前对象(也就是上一步的ObservableJust对象)和map()传入的参数一起封装了起来 从上面的调用链来看就是这一段代码:

ObservableMap类

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source); //这里的source也就是上一步的ObservableJust对象
        this.function = function; //这里的function就是map就是map()传入的参数
    }
    
    
    //这个方法一样待会分析
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
            @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
    }
 这时候发现ObservableMap和上面的ObservableJust类一样,都实现了subscribeActual()

filter方法

接着继续分析调用链上的方法filter,一样我们去ObservableMap父类里去找这个方法,他的父类AbstractObservableWithUpstream里面没有这个方法,但是AbstractObservableWithUpstream跟ObservableJust一样继承自Observable

public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }
  看到没 filter和前两个方法还是一样的套路,返回了一个ObservableFilter对象,不出意外这个ObservableFilter里面肯定也有一个subscribeActual方法,并且也是直接或者间接继承自Observable
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
    final Predicate<? super T> predicate;
    public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
        super(source); 
        this.predicate = predicate;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new FilterObserver<T>(observer, predicate));
    }

    static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
        final Predicate<? super T> filter;

        FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
            super(actual);
            this.filter = filter;
        }

        @Override
        public void onNext(T t) {
            if (sourceMode == NONE) {
                boolean b;
                try {
                    b = filter.test(t);
                } catch (Throwable e) {
                    fail(e);
                    return;
                }
                if (b) {
                    downstream.onNext(t);
                }
            } else {
                downstream.onNext(null);
            }
        }
}
一模一样的套路

subscribeOn

 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source); 
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
}

这个subscribeOn用于切换上游线程:
主要是这一句parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
scheduler就是我们传入的Schedulers.io(),上面代码可以看到SubscribeTask是一个Runnable,run()里调用的sourcesource.subscribe(parent),还记得source吗,source就是调用链上一步返回的对象,也就是上一步的
ObservableFilter;
去看看Schedulers.io()返回的是个什么类

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
     看到他返回的是一个Scheduler,去Scheduler中找scheduleDirect
  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
    继续往下追踪会发现最终将这个Runable经过各种封装,最后提交到一个线程池(ScheduledExecutorService)中去执行任务,这样就实现了SubscribeOn上游数据源代码的线程切换

至于下游代码线程切换来看ObserveOn

  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

ObserveOnObserver类:

mplements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (!outputFused && getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        void drainFused() {
            int missed = 1;

            for (;;) {
                if (disposed) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if (!delayError && d && ex != null) {
                    disposed = true;
                    downstream.onError(error);
                    worker.dispose();
                    return;
                }

                downstream.onNext(null);

                if (d) {
                    disposed = true;
                    ex = error;
                    if (ex != null) {
                        downstream.onError(ex);
                    } else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (disposed) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (delayError) {
                    if (empty) {
                        disposed = true;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    if (e != null) {
                        disposed = true;
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        disposed = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() throws Exception {
            return queue.poll();
        }

        @Override
        public void clear() {
            queue.clear();
        }

        @Override
        public boolean isEmpty() {
            return queue.isEmpty();
        }
    }
    

不想看代码直接总结,从ObserveOnObserver类中发现他的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行,这里是用了Handler将任务提交给主线程

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }
}

在这里插入图片描述

终于到了最后一步suscribe

这里调用的是Observable的subscribe

 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);//重点看这里
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

看到subscribeActual()方法没,原来subscribe()里会调用subscribeActual;

现在往回追溯:

在subscribe方法中会调用当前对象的subscribeActual(),所以往回追溯他首先会去调ObservableObserveOn的subscribeActual(),参数就是最终传入的Observer

回忆一下ObservableObserveOn的subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

继续将Observer封装成ObserveOnObserver,然后调用source.subcribe(),source还记得吧,就是调用链上一步的返回的对象,也就是ObservableSubscribeOn,这个类没有实现subscribe,但是他的父类有这个方法,那不就是Observable的subcribe()吗?是的,也就是跟调用链最后一步调用的subcribe()是同一个方法,只不过他的参数是基于下游的参数的进一步封装,那么同样我他会调用到susscribeActual()

 @Override
    public void subscribeActual(final Observer<? super T> observer) {  //这里的Observer就是将下游封装后的Observer
    
        //将oberser继续封装
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

//经过刚才的分析 这里是将任务交给线程池处理,所以去看SubscribeTask的run()
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

// SubscribeTask的run()
        @Override
        public void run() {
            source.subscribe(parent); //同样继续调用source.subscribe,那么他也是同意是调用到调用链上一步返回对象的subscribeActual(),,也就是ObservableFilter对象对象
        }

不出意外ObservableFilter对象里也是将Observer继续封装,然后调用source.subscribe

  @Override
    public void subscribeActual(Observer<? super T> observer) {
        source.subscribe(new FilterObserver<T>(observer, predicate));
    }

现在来到了第一步ObservableJust的subscribeActual():

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //将Observer和value进行封装,value就是我们第一步传入的数据源了
        observer.onSubscribe(sd);
        sd.run();
    }
    
    //ScalarDisposable的run方法
        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);   //这里开始把数据源往下游传, value指数据源  observer就是下游一步一步封装的Observer啦
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

还记得回溯时封装的那些Observer吗?分别是MapObserver,FilterObserver,SubscribeOnObserver,ObserveOnObserver以及调用链上最后一步我们自己自定义的Observer
分别再看他们的onNext(),其他方法套路一致

MapObserver:

 
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
            //处理数据源,将数据源转换成想要的类型
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            
            // 继续调用下游Observer的onNext
            downstream.onNext(v);
        }

FilterObserver

 @Override
        public void onNext(T t) {
            if (sourceMode == NONE) {
                boolean b;
                try {
                // 数据判断
                    b = filter.test(t);
                } catch (Throwable e) {
                    fail(e);
                    return;
                }
                
                //满足过滤条件继续调用下游onNext
                if (b) {
                    downstream.onNext(t);
                }
            } else {
                downstream.onNext(null);
            }
        }

SubscribeOnObserver

由于subscribeOn只是起到切换上游线程的作用,所以对下游他不做任何操作,继续调用下游的onNext

     @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
       

ObserveOnObserver:

    @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();  //切换下游线程,将任务交给线程池或者主线程handler,然后调用下游onNext
           
        }

最终就调到我们自定义的onNext()啦,整个流程就结束了

总结一下

Rxjava的链式调用整个流程就是从下到上,由上而下
每一步的操作符都是将上游对象作为source封装成新的Observable,然后继续往下传递,直到最后的subsribe方法反向开始调用source.subscribe然后调用到每个soource对象的subscriActual(),每一步的subscribActual()又会将下游传递来的Observer一步步封装,直到传递到最上游,在最上游开始再一步步调用封装好的Observe的相关方法,这样就实现了将数据源传递到下游。

切换上游线程:
创建一个Task,继承自Runable,在Runable的run()里调用source.subscribe(),然后将这个Runable进一步封装,根据传递的参数创建对应的线程池或者主线程Handler,将Runable提交给线程池或者Handler去执行

切换下游线程:
封装的Observer的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行

文章来源:https://blog.csdn.net/weixin_43243916/article/details/135652683
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。