上一篇我们说了Android-RxJava(上)主要包括组合操作符,变换操作符,创建操作符,我们再接再厉,继续下半部分内容,也就是剩余的操作符:
含义:过滤/筛选 被观察者发送的事件。
过滤操作符filter(),通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
相关代码:
private void rxJavaFilter(){
Observable.just(1,2,3,4).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
可看到我们要求打印integer小于3的,所以打印了1和2.
过滤操作符ofType(),可以过滤不符合该类型事件。
private void rxJavaOfType(){
Observable.just(1,2,3,4,"薛之涛").ofType(String.class).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
我们要求过滤Integer数据类型,留下String类型,打印结果正确!
过滤操作符skip,跳过正序部分事件,参数为跳过前多少个事件。
Observable.just(1,2,3,4).skip(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
skipLast和skip操作符相反,它是跳过后多少个事件打印其之前的事件
过滤操作符distinct,过滤事件序列中的重复事件
代码:
private void rxJavadistinct(){
Observable.just(1,2,3,3,3,4,2).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
Observable.just(1,2,3,3,3,2,1).distinctUntilChanged().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
过滤操作符take(),控制观察者接收的事件的数量。
代码:
Observable.just(1,2,3,3,3,2,1).take(3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
Observable.just(1,2,3,3,3,2,1).takeLast(3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
过滤操作符 elementAt(),可以指定取出事件序列中事件,下标从0开始,但如果指定的index大于总的事件序列数,则无反应
代码:
/**
* 过滤操作符elementAt,指定队列中的事件下标,取出该事件
*/
private void rxjavaElementAt(){
Observable.just(1,2,3,4).elementAt(3).subscribe(new Consumer<Integer>(){
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
}
结果:
image.png
过滤操作符throttleFirst (),可以和rxbinding2结合使用和绑定view的点击事件,防止在规定时间内多次点击,也就是防止规定事件防止重复点击。先加入rxbinding2依赖。
implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
代码:
/**
* throttleFirst操作符绑定view的点击事件,防止在规定时间内多次点击
*/
int clickNum =1;
private void rxjavathrottleFirst(){
TextView tv=findViewById(R.id.tv);
RxView.clicks(tv).throttleFirst(3,TimeUnit.SECONDS).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: "+"第"+clickNum+"次点击了TextView");
clickNum ++ ;
}
});
}
结果:
image.png
我在程序运行期间不断点击TextView,结果只打印了3秒间隔之后的第一次点击。
-throttleWithTimeout
throttleWithTimeout() 操作符,如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
含义:通过指定条件,判断是否接收被观察者发送的事件。
条件操作符all(),主要用来判断所有事件是否满足.如果都满足则返回 true,反之则返回 false
代码:
/**
* 条件操作符,all如果都满足则返回 true,反之则返回 false
*/
private void rxJavaAll(){
Observable.just(1,2,3,4).all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
}).subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept: "+aBoolean);
}
});
}
结果:
image.png
条件操作符takeWhile(),当判断发送的事件不满足条件时,就会终止后续事件的接受
代码:
/**
* 某个数据满足条件时就会发送该数据,反之则不发送
*/
private void rxJavaTakeWhile(){
Observable.just(3,2,1,4).takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer >= 2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
}
结果:
image.png
条件操作符skipWhile(),当判断发送的事件不满足条件时,才接受后续事件,反之亦然。
代码:
/**
* 满足条件的事件不发送,不满足时发送其及其之后的事件,注意其之后的数据是不判断的
*/
private void rxJavaSkipWhile(){
Observable.just(1,2,4,3).skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
Log.d(TAG, "test: "+integer);
return integer >= 3 ;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
}
结果:
image.png
条件操作符takeUntil(),满足条件时,其之后的事件不会被发送
代码:
private void rxJavaTakeUntil(){
Observable.just(1,2,4,3).takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
Log.d(TAG, "test: "+integer);
return integer >= 3 ;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: "+integer);
}
});
}
结果:
image.png
条件操作符sequenceEqual(),判断两个非观察者发送的事件是否一样
代码:
private void rxJavaSequenceEqual(){
Observable.sequenceEqual(Observable.just(1, 2, 3),
Observable.just(1,2,3))
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept====" + aBoolean);
}
});
}
结果:
注意如果是:Observable.just(1, 2, 3) 和Observable.just(3,2,1)比较返回结果为false,是有顺序之分的
条件操作符contains(),判断是否包含指定数据
代码:
private void rxJavaContains(){
Observable.just(1, 2, 3)
.contains(4)
.subscribe(new Consumer < Boolean > () {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept====" + aBoolean);
}
});
}
结果:
image.png
条件操作符sEmpty(),判断发送的数据是否为空,如果事件序列中元素为空则返回true
代码:
private void rxJavaIsEmpty() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onComplete();
}
})
.isEmpty()
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
Log.d(TAG, "accept===" + aBoolean);
}
});
}
结果:
image.png
含义:被观察者发送事件时,进行功能性拓展。
/**
*doOnEach(),Observable 每发送一件事件之前都会先回调这个方法。
*/
private void rxjavaDoOnEach(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG, "accept: "+"执行了doOnEach获取的元素值为:"+integerNotification.getValue());
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
Log.d(TAG, "subscribe: ");
}
}).doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "doOnLifecycle ===accept: ");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnLifecycle ===Action: ");
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnDispose === run: ");
}
}).subscribe(new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
mDisposable =d;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
//此处取消订阅
mDisposable.dispose();
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。那我们如果使用 doOnLifecycle 进行取消订阅,来看看结果:
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "doOnLifecycle ===accept: ");
disposable.dispose();
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnLifecycle ===Action: ");
}
}).doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "doOnDispose === run: ");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
结果:
image.png
可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。
其余的我就不写代码了,大家也都能明白.
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException());
}
}).onErrorReturn(new Function<Throwable,Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.d(TAG, "onErrorReturn ==== apply: "+throwable);
return 500;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException());
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
Log.d(TAG, "onErrorResumeNext ==== apply: "+throwable);
return Observable.just(4, 5, 6);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe: ");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException());
}
}).retry(2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: "+integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: "+e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
结果:
image.png
bservable.create(new ObservableOnSubscribe < Integer > () {
@Override
public void subscribe(ObservableEmitter < Integer > e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
.repeat(2)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "===================onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "===================onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "===================onComplete ");
}
});
结果:
image.png
timeout(long timeout, TimeUnit timeUnit):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出 TimeoutException,以一个错误通知终止Observable。 timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout 在超时时会切换到使用一个你指定的备用的 Observable。 timeout(Function<> itemTimeoutIndicator):timeout使用一个Function对原始Observable发射的每一项进行观察,如果当这个Function执行完但原始Observable还没有发射下一个数据时,系统就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止原始Observable。
就先说这么多吧,告辞!