前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava源码浅析(四): observeOn线程切换和多次切换

RxJava源码浅析(四): observeOn线程切换和多次切换

原创
作者头像
笔头
发布2022-03-20 18:49:41
1.1K0
发布2022-03-20 18:49:41
举报
文章被收录于专栏:Android记忆

上篇文章RxJava源码浅析(三): subscribeOn线程切换和多次切换 我们清楚了subscribeOn线程切换,对于Rxjava线程切换原理有了大致的理解。subscribeOn线程切换,是整个订阅流程线程切换,而observeOn只是针对下游线程切换。

这篇我们来看下observeOn切换线程以及他多次切换的影响。

一、observeOn

先来个demo

代码语言:javascript
复制
 //上游-被观察者
    Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
            Log.e("subscribe",Thread.currentThread().getName()+"");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();

        }
    });
    //下游-观察者
    Observer myobserver=new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Disposable dd=d;

            Log.e("onSubscribe",Thread.currentThread().getName()+"");
        }
        @Override
        public void onNext(Integer integer) {
            Log.e("onNext",Thread.currentThread().getName()+"--"+integer+"");
        }
        @Override
        public void onError(Throwable t) {
        }

        @Override
        public void onComplete() {
        }
    };
    //关联上游和下游
    myobservable.observeOn(Schedulers.newThread()).subscribe(myobserver);

有了前面文章的基础,Schedulers.newThread()是创建了一个线程池,我们直接看observeOn。经过不断跳转我们知道这个方法最终是创建了ObservableObserveOn这个类,也是个Observable。我们直接来看subscribeActual吧。

代码语言:javascript
复制
@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
    }
}

scheduler.createWorker()就是创建了一个线程,当前是Schedulers.newThread(),接着调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));这句话我们读出信息:订阅流程的上游没有切换线程,下游ObserveOnObserver切换了线程。

继续看下ObserveOnObserver源码

我们主要看下onNext方法,他调用了schedule,继续调用了worker.schedule(this);启动线程任务。此时的this就是当前ObserveOnObserver,他是个Runnable,那我们就直接看他的run方法。

代码语言:javascript
复制
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

继续看drainNormal();

代码语言:javascript
复制
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

这里主要两个方法a.onNext(v)这个a,就是下游myobserver。所以我们知道下游的onNext在新线程中执行。

我们发现ObserveOnObserver中的onComplete、onError也是调用schedule();他主要是看checkTerminated,这个方法根据error、done来处理调用onComplete还是onError,还是其他。

observeOn就讲这么多了,有了前几篇文章,我们就很快能理解observeOn是怎么做的了。

总结下,observeOn就是把下游切换线程,相比subscribeOn好理解些。

二、多次observeOn

如果我们多次调用observeOn呢?是以哪个为准呢?假如两次调用observeOn,第一次是线程1,第二次是线程2。

上游

一开始,调用subscribe(myobserver)会调用上游ObservableObserveOn中的subscribeActual方法,进行订阅,创建新线程2,创建ObserveOnObserver(命名为AObserver),AObserver onNext、onComplete等运行在线程2中。

上上游

调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));则调用上上游。同样,会调用ObservableObserveOn中的subscribeActual方法,进行订阅,创建新线程1,创建ObserveOnObserver(命名为BObserver,此时BObserver中的downstream是下游的ObserveOnObserver,也就是AObserver),AObserver onNext、onComplete等运行在线程1中。

最上游

在上上游中调用source.subscribe(newObserveOnObserver<>(observer, w, delayError, bufferSize));会调用ObservableCreate

中的subscribeActual方法,此时的observer是谁?对是BObserver,ObservableCreate的源头下发消息执行onNext的时候会调用BObserver的onNext方法。源码我们看过,BObserver中的onNext会开启新线程执行他的onNext方法,同时我们也发现,这个时候onNext方法会调用下游也就是downstream的onNext,也就是AObserver的onNext,同样AObserver的onNext也会调用myobserver的onNext,最终是执行了最后一次observeOn对应线程中的myobserver的onNext方法。

所有,不管多少次observeOn,都是调用最后一次observeOn。

我们发现RxJava是逆向向上调用的,然后不断向下一级一级的下发消息,最后一个observer来处理消息。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、observeOn
  • 二、多次observeOn
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档