前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RxJava源码浅析(二): 深入操作符解析

RxJava源码浅析(二): 深入操作符解析

原创
作者头像
笔头
修改2022-03-21 10:46:27
4160
修改2022-03-21 10:46:27
举报
文章被收录于专栏:Android记忆

上篇文章我们通过源码了解了RxJava基本流程,RxJava源码浅析(一): 基础流程 这里我们研究下操作符的源码是怎么实现的。有了上篇文章的基础,这里讲起来会轻松很多。操作符很多,我们随机挑了几个操作符来看看。

还是基于上篇文章来看

代码语言:javascript
复制
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
        Log.e("observable","Thread ID:"+Thread.currentThread().getId());
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.e("onSubscribe","Thread ID:"+Thread.currentThread().getId());
    }
    @Override
    public void onNext(Integer integer) {
        Log.e("observer","Thread ID:"+Thread.currentThread().getId()+"  "+"value:"+integer);
    }
    @Override
    public void onError(Throwable t) {
    }

    @Override
    public void onComplete() {
    }
};

一、map源码解析

代码语言:javascript
复制
//关联上游和下游
myobservable.map(new Function<Integer, Object>() {
    @Override
    public Object apply(Integer integer) throws Throwable {
        return integer+10;
    }
}).subscribe(myobserver);

用map操作符,我们可以得知,onNext返回的数字都是加10。我们来看下map源码吧。

代码语言:javascript
复制
public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}

看到onAssembly方法,根据上篇文章我们知道,他是直接返回new ObservableMap<>(this, mapper),那我们直接扒开ObservableMap看看。

代码语言:javascript
复制
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
           ..................................
            try {
                v = Objects.requireNonNull(mapper.apply(t), "returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
         .....................................
    }

ObservableMap也是Observable具体现实。这里面创建了MapObserver,包装了myobserver。

有了上一篇文章基础,我们知道基础流程里面是Observable的具体实现通过subscribe方法,把数据传给Observer的具体实现。里面两个主体是一对Observable和Observer。

现在map操作符,我们可以知道。里面创建了一对ObservableMap和MapObserver。

我们知道.subscribe(myobserver)会走到ObservableMap中的subscribeActual方法,这里的source就是myobservable,那source.subscribe(new MapObserver<T,U>(t,function))就会走到Observable中的subscribe方法,最终调用ObservableOnSubscribe.subscribe方法。

这里ObservableEmitter上篇文章讲过了,是Observer的一个包装,此时的Observer是谁?对是MapObserver,emitter.onNext()会调用ObservableMap的onNext()。看下ObservableMap的onNext()源码关键片段

代码语言:javascript
复制
try {
    v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);
    return;
}
downstream.onNext(v);

其中mapper.apply(t),他的具体实现是

代码语言:javascript
复制
@Override
public Object apply(Integer integer) throws Throwable {
    return integer+10;
}

这里的downstream,我们看下他的源头,他的具体实现就是myobserver。那大概的意思就是onNext数据会经过new Function加工,加工后再交给myobserver的onNext。

总结:大概流程就是这样。和基础流程差不多,只是把myobserver包装成MapObserver,MapObserver会对数据进行处理以下再传给myobserver。

二、flatMap源码解析

代码语言:javascript
复制
myobservable.flatMap(new Function<Integer, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Integer integer) throws Throwable {
        return Observable.just(1,2,4);
    }
}).subscribe(myobserver);

这个稍微有点复杂了,flatMap是返回一个ObservableFlatMap,我们不怕,继续看看他们内部。

代码语言:javascript
复制
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

    public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        Objects.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarSupplier) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarSupplier<T>)this).get();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

看完这个我们应该知道,flatMap直接返回ObservableFlatMap,有了上一篇文章,我们也清楚流程了,直接看subscribeActual

代码语言:javascript
复制
@Override
public void subscribeActual(Observer<? super U> t) {
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }
    source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

有了经验,看到这里source.subscribe,直接看MergeObserver onNext方法。

代码语言:javascript
复制
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        upstream.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }

    subscribeInner(p);
}

这里面核心代码p = Objects.requireNonNull(mapper.apply(t)创建了一个新ObservableSource。上级每次onNext都要生成对应的一个ObservableSource。

mapper.apply(t)返回的就是新生成的Observable

继续看下subscribeInner

代码语言:javascript
复制
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Supplier) {
            if (tryEmitScalar(((Supplier<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                boolean empty = false;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        empty = true;
                    }
                }
                if (empty) {
                    drain();
                    break;
                }
            } else {
                break;
            }
        } else {
            InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

正常情况,ObservableSource不是Supplier类型,我们当前ObservableSource是Observable,我们直接走else 。

p.subscribe(inner);直接走InnerObserver的onNext方法。

代码语言:javascript
复制
public void onNext(U t) {
    if (fusionMode == QueueDisposable.NONE) {
        parent.tryEmit(t, this);
    } else {
        parent.drain();
    }
}

fusionMode默认是QueueDisposable.NONE,那我们继续看parent.tryEmit(t,this);

代码语言:javascript
复制
void tryEmit(U value, InnerObserver<T, U> inner) {
    if (get() == 0 && compareAndSet(0, 1)) {
        downstream.onNext(value);
        if (decrementAndGet() == 0) {
            return;
        }
    } else {
        SimpleQueue<U> q = inner.queue;
        if (q == null) {
            q = new SpscLinkedArrayQueue<>(bufferSize);
            inner.queue = q;
        }
        q.offer(value);
        if (getAndIncrement() != 0) {
            return;
        }
    }
    drainLoop();
}

MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,所以同时只会有一个 value 被 actual Observer 发射。由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)

如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射。

代码语言:javascript
复制
void drainLoop() {
    final Observer<? super U> child = this.downstream;
    int missed = 1;
    for (;;) {
        if (checkTerminate()) {
            return;
        }
        int innerCompleted = 0;
        SimplePlainQueue<U> svq = queue;

        if (svq != null) {
            for (;;) {
                if (checkTerminate()) {
                    return;
                }

                U o = svq.poll();

                if (o == null) {
                    break;
                }

                child.onNext(o);
                innerCompleted++;
            }

drainLoop就是不断遍历SimplePlainQueue队列,(这个SimplePlainQueue就是存放的next数据),不断child.onNext(o);

总结下: faltMap返回一个新的被观察者ObservableB【ObservableFlatMap】,重写ObservableB的subscribeActual方法,在原始的观察者ObserverA【myobserver】对其进行订阅时,新建一个观察者ObserverB对原始的ObservableA进行订阅。新的观察者ObserverB【MergeObserver持有原始的ObserverA和faltMap接收的匿名对象实例function。当ObserverB监听到原始的被观察者ObservableA的事件时,ObserverB调用function的apply方法获得新新的被观察者ObservableC【Observable.just(1,2,4)】,再创建一个新的观察者ObserverC【InnerObserver】对ObservableC进行订阅,ObserverC持有原始的观察者ObserverA,在ObserverC观察到被观察者ObservableC的时间时,调用原始的观察者ObserverA的方法。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、map源码解析
  • 二、flatMap源码解析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档