RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences 翻译下来就是 “是一个使用可观测序列来组建异步、基于事件的程序的库”。说人话,看了这句话,初学者一脸懵逼。
RxJava如何使用,我这边就不细说了,网上有很多文章写的不错。
RxJava源码有点庞大,我先从基础订阅流程下手。注:RxJava源码是 io.reactivex.rxjava3:rxjava:3.1.3版本。
demo示例来看看吧
//上游-被观察者
Observable<Integer> myobservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//下游-观察者
Observer myobserver=new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
//关联上游和下游
myobservable.subscribe(myobserver);
这个很基础,没有切换线程,没有其他操作符。后面文章会不断增加操作符来学习源码。
我们先从myobservable.subscribe(myobserver)开始吧。
我们可以看到,myobservable是个抽象类,具体实例是通过Observable.create()获得,具体我们通过源码看下
public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
我们看下,最终调用RxJavaPlugins.onAssembly(new ObservableCreate<>(source))
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
return f != null ? (Observable)apply(f, source) : source;
}
onObservableAssembly默认是null,所以RxJavaPlugins.onAssembly(new ObservableCreate<>(source))就是返回ObservableCreate对象。
好了,myobservable具体实现类就是ObservableCreate对象,接下来看下subscribe方法,subscribe在Observable有具体的实现,我们来看下
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); //步骤1
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.");
subscribeActual(observer);//步骤二
}
..............................
}
先看步骤1
@NonNull
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
return f != null ? (Observer)apply(f, source, observer) : observer;
}
这里主要判断 f 也就是onObservableSubscribe是不是null,这里默认是null,我们也没在其他地方设置onObservableSubscribe,那就直接返回observer,也就是下游myobserver。
接下来步骤2,调用当前subscribeActual方法,当前类是Observable,subscribeActual方法没有具体的实现,具体的实现在他的实现类中现实。那我们直接来看ObservableCreate的subscribeActual方法
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
1.这个里面创建了CreateEmitter,是一个分发器,封装了当前的myobserver对象。具体结构后面看。
然后调用了observer.onSubscribe(parent)接口,这里的observer具体现实类是myobserver,具体实现如下
2.接下来执行source.subscribe(parent);这个source是哪里来的呢?我们仔细排查,source就是ObservableOnSubscribe
这里source.subscribe(parent),就是调用上图的subscribe方法,这里parent看下代码,就是CreateEmitter对象。这里ObservableEmitter是个接口,具体实现就是CreateEmitter。
这里调用了emitter.onNext(1);这样的方法,现在我们看下CreateEmitter源码。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
..............................
}
CreateEmitter 利用装饰模式封装了observer,增加了一系列控制Disposable。
我们看下emitter.onNext(1);源码,最终是调用了 observer.onNext(t);这里的observer就是myobserver对象,那就是调用
同理,emitter.onComplete();可以得知,就是调用myobserver的onComplete回调。
在此,Rxjava基本流程完成。
主要有四个主体
ObservableCreate、ObservableOnSubscribe、ObservableEmitter、Observer
个人理解打个比喻,ObservableCreate就是一个包裹发货地,ObservableOnSubscribe是收发员,ObservableEmitter就是包裹,Observer就是收件人地址。
ObservableCreate创建一个场地的同时创建ObservableOnSubscribe(发货员),在subscribeActual中创建(ObservableEmitter)把收件人和包裹绑定后,source.subscribe(parent)发货员把包裹onNext发货,收件人在myobserver中onNext接收。
当然RxJava里面还有其他类型,比如FlowableCreate、ObservableJust等等,但是原理都差不多。
这个就是个人对RxJava订阅流程大概理解,欢迎拍板!
下一篇文章我们加一些操作符,个人理解就是发货员把包裹经过包装后发给收件人。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。