RxJava是通过事件传递,并且在传递过程中对事件内部数据进行修改,最终发送给接收者的响应式框架。
借助某个同学的一张图可以更直观的了解:

RxJava事件流向
上图只是在同个线程中,可以让事件携带数据按顺序从上层流转到下层。而在事件流转的过程中,RxJava提供了很多操作符可以对源头事件进行处理再往下传递。
在Module中引入即可:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'Observable:一个可被subscribe的对象,也可以理解成被监听的对象,而该对象中保存着一个名为ObservableEmitter的对象,ObservableEmitter对象就是上面提到的发送事件的对象。
Observer:接收Observable发送事件的对象。
Consumer: 只接收onNext事件的对象
本质RxJava就是一套非常强大的Observer框架
在Observable对象中调用onNext发射(Emitter)了1,2,以及Error、Complete这四个事件。
而在Observer对应的几个响应函数中打印日志(为了方便,把Log.e替换成了System.out.println)。
Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
Log.e(TAG, "Emitter onError...${Thread.currentThread().name}")
emitter.onError(NullPointerException())
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
}.subscribe(object : Observer<Int> {
override fun onComplete() = System.out.println("onComplete...${Thread.currentThread().name}")
override fun onSubscribe(d: Disposable?) = System.out.println("onSubscribe...${Thread.currentThread().name}...Disposable:$d")
override fun onNext(value: Int?) = System.out.println("onNext...${Thread.currentThread().name}...Value:$value")
override fun onError(e: Throwable?) = System.out.println("onError...${Thread.currentThread().name}...Throwable:$e")
})而在同一个线程中,输出结果如下:
E/SelectImageActivity: onSubscribe...main...Disposable:null
E/SelectImageActivity: Emitter onNext1...main
E/SelectImageActivity: onNext...main...Value:1
E/SelectImageActivity: Emitter onNext2...main
E/SelectImageActivity: onNext...main...Value:2
E/SelectImageActivity: Emitter onError...main
E/SelectImageActivity: onError...main...Throwable:java.lang.NullPointerException
E/SelectImageActivity: Emitter onComplete...main可以发现:
1. 在同一个线程中,发送一个事件,就会接收一个事件,再发送下一个事件
2. 在发送完onError事件后,即使发送了onComplete事件,也无法接收
3. 在发送完onComplete事件后,再发送了onError事件,则会将该Throwable对象抛出,出现crash
4. 在发送完onComplete与onError事件后,再发送onNext事件,则无法接收
在大多数情况下,我们只用关心onNext或者onError单独的事件,而对于其他的事件均不关心,这种情况下,我们就可以使用Consumer对象
对于subscribe函数的重载函数有这些:
public final void subscribe(Observer<? super T> observer)
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)举例,我们只关心onNext事件,则可以这样来表示:
Observable.create<Int> { emitter ->
Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
emitter.onNext(1)
Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
emitter.onNext(2)
Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
emitter.onComplete()
Log.e(TAG, "Emitter onNext3...${Thread.currentThread().name}")
emitter.onNext(3)
}.subscribe { data ->
Log.e(TAG, "onNext...$data")
}在接收端,仅仅只接收了onNext事件。
E/SelectImageActivity: Emitter onNext1...main
E/SelectImageActivity: onNext...1
E/SelectImageActivity: Emitter onNext2...main
E/SelectImageActivity: onNext...2
E/SelectImageActivity: Emitter onComplete...main
E/SelectImageActivity: Emitter onNext3...main