上篇文章我们通过源码了解了RxJava基本流程,RxJava源码浅析(一): 基础流程 这里我们研究下操作符的源码是怎么实现的。有了上篇文章的基础,这里讲起来会轻松很多。操作符很多,我们随机挑了几个操作符来看看。
还是基于上篇文章来看
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
Log.e("observable","Thread ID:"+Thread.currentThread().getId());
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("onSubscribe","Thread ID:"+Thread.currentThread().getId());
}
@Override
public void onNext(Integer integer) {
Log.e("observer","Thread ID:"+Thread.currentThread().getId()+" "+"value:"+integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
//关联上游和下游
myobservable.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Throwable {
return integer+10;
}
}).subscribe(myobserver);
用map操作符,我们可以得知,onNext返回的数字都是加10。我们来看下map源码吧。
public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
看到onAssembly方法,根据上篇文章我们知道,他是直接返回new ObservableMap<>(this, mapper),那我们直接扒开ObservableMap看看。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
..................................
try {
v = Objects.requireNonNull(mapper.apply(t), "returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
.....................................
}
ObservableMap也是Observable具体现实。这里面创建了MapObserver,包装了myobserver。
有了上一篇文章基础,我们知道基础流程里面是Observable的具体实现通过subscribe方法,把数据传给Observer的具体实现。里面两个主体是一对Observable和Observer。
现在map操作符,我们可以知道。里面创建了一对ObservableMap和MapObserver。
我们知道.subscribe(myobserver)会走到ObservableMap中的subscribeActual方法,这里的source就是myobservable,那source.subscribe(new MapObserver<T,U>(t,function))就会走到Observable中的subscribe方法,最终调用ObservableOnSubscribe.subscribe方法。
这里ObservableEmitter上篇文章讲过了,是Observer的一个包装,此时的Observer是谁?对是MapObserver,emitter.onNext()会调用ObservableMap的onNext()。看下ObservableMap的onNext()源码关键片段
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
其中mapper.apply(t),他的具体实现是
@Override
public Object apply(Integer integer) throws Throwable {
return integer+10;
}
这里的downstream,我们看下他的源头,他的具体实现就是myobserver。那大概的意思就是onNext数据会经过new Function加工,加工后再交给myobserver的onNext。
总结:大概流程就是这样。和基础流程差不多,只是把myobserver包装成MapObserver,MapObserver会对数据进行处理以下再传给myobserver。
myobservable.flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Throwable {
return Observable.just(1,2,4);
}
}).subscribe(myobserver);
这个稍微有点复杂了,flatMap是返回一个ObservableFlatMap,我们不怕,继续看看他们内部。
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
Objects.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarSupplier) {
@SuppressWarnings("unchecked")
T v = ((ScalarSupplier<T>)this).get();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
看完这个我们应该知道,flatMap直接返回ObservableFlatMap,有了上一篇文章,我们也清楚流程了,直接看subscribeActual
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
有了经验,看到这里source.subscribe,直接看MergeObserver onNext方法。
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
这里面核心代码p = Objects.requireNonNull(mapper.apply(t)创建了一个新ObservableSource。上级每次onNext都要生成对应的一个ObservableSource。
mapper.apply(t)返回的就是新生成的Observable
继续看下subscribeInner
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Supplier) {
if (tryEmitScalar(((Supplier<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
正常情况,ObservableSource不是Supplier类型,我们当前ObservableSource是Observable,我们直接走else 。
p.subscribe(inner);直接走InnerObserver的onNext方法。
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
fusionMode默认是QueueDisposable.NONE,那我们继续看parent.tryEmit(t,this);
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
downstream.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,所以同时只会有一个 value 被 actual Observer 发射。由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)
如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射。
void drainLoop() {
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
if (checkTerminate()) {
return;
}
int innerCompleted = 0;
SimplePlainQueue<U> svq = queue;
if (svq != null) {
for (;;) {
if (checkTerminate()) {
return;
}
U o = svq.poll();
if (o == null) {
break;
}
child.onNext(o);
innerCompleted++;
}
drainLoop就是不断遍历SimplePlainQueue队列,(这个SimplePlainQueue就是存放的next数据),不断child.onNext(o);
总结下: faltMap返回一个新的被观察者ObservableB【ObservableFlatMap】,重写ObservableB的subscribeActual方法,在原始的观察者ObserverA【myobserver】对其进行订阅时,新建一个观察者ObserverB对原始的ObservableA进行订阅。新的观察者ObserverB【MergeObserver】持有原始的ObserverA和faltMap接收的匿名对象实例function。当ObserverB监听到原始的被观察者ObservableA的事件时,ObserverB调用function的apply方法获得新新的被观察者ObservableC【Observable.just(1,2,4)】,再创建一个新的观察者ObserverC【InnerObserver】对ObservableC进行订阅,ObserverC持有原始的观察者ObserverA,在ObserverC观察到被观察者ObservableC的时间时,调用原始的观察者ObserverA的方法。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。