Observable.just("数据源")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return 1;
}
})
.filter(integer -> {
return integer == 1;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
返回了一个将传入的参数封装成了一个 ObservableJust对象
其他的Rxjava创建操作符类似:比如create(), just(),fromArray(),fromIterable(),timer(),interval()等
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) { //将传入的参数赋值给value
this.value = value;
}
//重点方法 稍后看
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
由于just方法返回了一个ObservableJust对象,所以调用链的map方法调用的ObservableJust对象的map方法
但是我们看到ObservableJust类中并没有map方法,所以去看他的父类Observable
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
在他的父类Observable中看到,map()依然是返回了一个ObservableMap对象,这个对象将当前对象(也就是上一步的ObservableJust对象)和map()传入的参数一起封装了起来 从上面的调用链来看就是这一段代码:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); //这里的source也就是上一步的ObservableJust对象
this.function = function; //这里的function就是map就是map()传入的参数
}
//这个方法一样待会分析
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
}
这时候发现ObservableMap和上面的ObservableJust类一样,都实现了subscribeActual()
接着继续分析调用链上的方法filter,一样我们去ObservableMap父类里去找这个方法,他的父类AbstractObservableWithUpstream里面没有这个方法,但是AbstractObservableWithUpstream跟ObservableJust一样继承自Observable
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
看到没 filter和前两个方法还是一样的套路,返回了一个ObservableFilter对象,不出意外这个ObservableFilter里面肯定也有一个subscribeActual方法,并且也是直接或者间接继承自Observable
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
}
一模一样的套路
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
这个subscribeOn用于切换上游线程:
主要是这一句parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
scheduler就是我们传入的Schedulers.io(),上面代码可以看到SubscribeTask是一个Runnable,run()里调用的sourcesource.subscribe(parent),还记得source吗,source就是调用链上一步返回的对象,也就是上一步的
ObservableFilter;
去看看Schedulers.io()返回的是个什么类
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
看到他返回的是一个Scheduler,去Scheduler中找scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
继续往下追踪会发现最终将这个Runable经过各种封装,最后提交到一个线程池(ScheduledExecutorService)中去执行任务,这样就实现了SubscribeOn上游数据源代码的线程切换
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
mplements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (!outputFused && getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (disposed) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
}
downstream.onNext(null);
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
不想看代码直接总结,从ObserveOnObserver类中发现他的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行,这里是用了Handler将任务提交给主线程
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
}
这里调用的是Observable的subscribe
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);//重点看这里
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看到subscribeActual()方法没,原来subscribe()里会调用subscribeActual;
在subscribe方法中会调用当前对象的subscribeActual(),所以往回追溯他首先会去调ObservableObserveOn的subscribeActual(),参数就是最终传入的Observer
回忆一下ObservableObserveOn的subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
继续将Observer封装成ObserveOnObserver,然后调用source.subcribe(),source还记得吧,就是调用链上一步的返回的对象,也就是ObservableSubscribeOn,这个类没有实现subscribe,但是他的父类有这个方法,那不就是Observable的subcribe()吗?是的,也就是跟调用链最后一步调用的subcribe()是同一个方法,只不过他的参数是基于下游的参数的进一步封装,那么同样我他会调用到susscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) { //这里的Observer就是将下游封装后的Observer
//将oberser继续封装
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
//经过刚才的分析 这里是将任务交给线程池处理,所以去看SubscribeTask的run()
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// SubscribeTask的run()
@Override
public void run() {
source.subscribe(parent); //同样继续调用source.subscribe,那么他也是同意是调用到调用链上一步返回对象的subscribeActual(),,也就是ObservableFilter对象对象
}
不出意外ObservableFilter对象里也是将Observer继续封装,然后调用source.subscribe
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
现在来到了第一步ObservableJust的subscribeActual():
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //将Observer和value进行封装,value就是我们第一步传入的数据源了
observer.onSubscribe(sd);
sd.run();
}
//ScalarDisposable的run方法
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value); //这里开始把数据源往下游传, value指数据源 observer就是下游一步一步封装的Observer啦
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
还记得回溯时封装的那些Observer吗?分别是MapObserver,FilterObserver,SubscribeOnObserver,ObserveOnObserver以及调用链上最后一步我们自己自定义的Observer
分别再看他们的onNext(),其他方法套路一致
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
//处理数据源,将数据源转换成想要的类型
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 继续调用下游Observer的onNext
downstream.onNext(v);
}
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
// 数据判断
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
//满足过滤条件继续调用下游onNext
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
由于subscribeOn只是起到切换上游线程的作用,所以对下游他不做任何操作,继续调用下游的onNext
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule(); //切换下游线程,将任务交给线程池或者主线程handler,然后调用下游onNext
}
最终就调到我们自定义的onNext()啦,整个流程就结束了
Rxjava的链式调用整个流程就是从下到上,由上而下
每一步的操作符都是将上游对象作为source封装成新的Observable,然后继续往下传递,直到最后的subsribe方法反向开始调用source.subscribe然后调用到每个soource对象的subscriActual(),每一步的subscribActual()又会将下游传递来的Observer一步步封装,直到传递到最上游,在最上游开始再一步步调用封装好的Observe的相关方法,这样就实现了将数据源传递到下游。
切换上游线程:
创建一个Task,继承自Runable,在Runable的run()里调用source.subscribe(),然后将这个Runable进一步封装,根据传递的参数创建对应的线程池或者主线程Handler,将Runable提交给线程池或者Handler去执行
切换下游线程:
封装的Observer的onSubscribe,onNext,onError,OnNext方法都调用了schedule(),追踪schedule()发现,最终同样是把任务交给了线程池处理,在本例子中由于传递的是AndroidSchedulers.mainThread(),所以下游是切换到主线程执行