Rxjava出来已经好久了,但始终存在于会使用的阶段,今天来做个总结。 参考: https://mp.weixin.qq.com/s/7sKjc5ahHI4fmadXW-uP_w https://mp.weixin.qq.com/s/V0hEyiZwC7z5PiC3Uz0wyA https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw
RxJava是观察者模式的扩展,是响应式函数的扩展库,在观察者模式上实现了发送者(observable)和接受者(observer)解耦;链式调用降低业务之间的依赖,使得代码很简介;支持泛型,减少冗余代码,增强代码可读性;支持设置同步异步切换,简单实现异步回调;观察者与被观察者的继承,多态,更好解决复杂逻辑的嵌套。
首先RxJava一般有三要素
//RxJava的相关依赖
implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
注意:RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发。
/**
* 执行RxJava相关代码
*/
private void doRxJava() {
//创建被观察者
Observable observable =Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//通过ObservableEmitter发射器发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建观察者
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.e(TAG, "======================onComplete");
}
};
//被观察者通过subscribe()订阅观察者
observable.subscribe(observer);
}
打印信息如下:
image.png
通过打印信息可以看到: 在被观察者通过订阅绑定观察者之后,他们先后执行了观察者的onSubscribe()->被观察者的subscribe(),然后是观察者的onNext() -> onComplete().当然onError()方法没有执行,因为我们的请求时成功的,那么这些方法都有什么含义呢:
当然也可以换一种写法,通过链式调用的方式,如下:
/**
* 执行RxJava相关代码
*/
private void doRxJava2() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "======================onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "======================onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "======================onError");
}
@Override
public void onComplete() {
Log.e(TAG, "======================onComplete");
}
});
}
总结: 被观察者(Observable):通过订阅行为(subscribe())把事件按顺序发送到 观察者(Observer)。 观察者(Observer):按顺序接收到事件&做出响应反馈。
你可能看到过这种写法:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1)
}
})..subscribe(
new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
}
);
这是RxJava的时候的写法,我们说说Action1或者Action是什么意思呢,为什么不用new Observer了呢? 如果我们到的Observer不想实现 OnComplete ()和 OnError()方法,只需要在 onNext 时做一些处理,可以用 Action1 或Action类,明白了吧。 subscribe 方法有一个重载版本,接受1~3个 Action1 类型的参数,分别对应 OnNext,OnError,OnComplete然后我们现在只需要 onNext,就只需要传入一个参数。 同样的我们来看看如果你升级到RxJava2,改如果写呢?
//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e(TAG, "======================" + Thread.currentThread().getName());
//通过ObservableEmitter发射器发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//好比onNext
Consumer<Integer> onNextConsumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "======================onNext " + integer);
}
};
//好比onError
Consumer<Integer> onErroyCusumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) {
Log.e(TAG, "======================onError");
}
};
//好比onComplete
Action completeAction = new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "======================onComplete");
}
};
//被观察者通过subscribe()订阅观察者
observable.subscribe(onNextConsumer, onErroyCusumer, completeAction);
RxJava和RxJava2关于Acton的的最大区别是:Action -> Consumer 这就是RxJava2的写法,同志们需要了解一下RxJava和RxJava2升级的区别,网上有很多,就不推荐了。
RxJava操作符是RxJava中重要的部分 ,操作符实质上就是RxJava函数式编程模式的体现,而操作符的种类包括创建操作符,变换操作符,合拼操作符,过滤操作符,条件操作符,其他操作符,今天我们来逐一讲解:
含义:被观察者把事件发送至观察者。 具体应用:
应用种类 | ||||
---|---|---|---|---|
数据遍历 | just() | fromArray() | fromlterable() | range() |
定时任务 | interval() | intervalRange() | ||
异步嵌套回调 | create() | |||
延迟任务 | defer() | time() |
这么多,我们每个类别说一个就好了,其余自行百度。
Observable.just("123456789").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "======================accept"+s);
}
});
这里不得不提一下fromArray(),from和fromArray的区别是:fromArray用于转换多个数据,比如 ArrayList等包含多个数据的数组或者列表,可以传入多于10个的变量,并且可以传入一个数组;,而 just 只用于处理单个的数据。 代码:
List<String> list = Arrays.asList(
"1", "2", "3", "4", "5", "6"
);
Observable.fromArray(list).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
Log.e(TAG, "======================accept"+strings);
}
});
打印结果:
image.png
还忍不住想说一下fromlterable(),fromlterable和fromArray的区别是fromIterable发送一个 List 集合数据给观察者,并依此打印集合中的元素。
Observable.interval(3,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "======================accept"+aLong);
}
});
结果:
image.png
intervalRange()可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。
int num = 0;
/**
* 创建操作符Defer
*/
private void rxJavaDefer(){
num=1;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return Observable.just(num);
}
});
num = 2;
Consumer<Integer> consumer =new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "================onNext " + integer);
}
};
observable.subscribe(consumer);
}
结果:
获取的是最后赋值的 num = 2;也就是Observable.defer()并没有在创建的时候执行。如果这么说还不明白,我们看一下直接用just操作符执行的结果
int a = 0;
private void rxJavaDefer(){
a =1;
Observable<Integer> observable = Observable.just(a);
a = 2;
Consumer<Integer> consumer =new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "================onNext " + integer);
}
};
observable.subscribe(consumer);
}
结果打印:
image.png
这下明白了吧!
含义:被观察者把序列事件加工为其他序列事件(变换)。
应用种类 | ||
---|---|---|
变换 | map() | flatmap()/concatmap() |
Observable.just(1,2,3).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer+"";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept:Integer转化为String后的数据"+s );
}
});
结果:
image.png
private class User{
//姓名
String name;
//任务列表
List<Plan> listPlan;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Plan> getListPlan() {
return listPlan;
}
public void setListPlan(List<Plan> listPlan) {
this.listPlan = listPlan;
}
}
//详细计划实体类
public class Plan{
//姓名
String name;
//时间
String time;
//工作内容
String content;
public String getTime() {
return time;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public void setTime(String time) {
this.time = time;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Plan(String name ,String time, String content) {
this.time = time;
this.content = content;
this.name=name;
}
}
执行代码:
/**
* concatMap,有序将事件序列中的元素进行整合加工,返回一个新的被观察者,对应flatMap是无序的
*/
private void rxJavaconcatMap(){
LinkedList<Plan> zhangsan =new LinkedList<>();
Plan plan =new Plan("张三","上午","打扫卫生");
Plan plan2 =new Plan("张三","下午","学习");
zhangsan.add(plan);
zhangsan.add(plan2);
LinkedList<Plan> lisi =new LinkedList<>();
Plan plan3 =new Plan("李四","上午","打扫卫生");
Plan plan4 =new Plan("李四","下午","参加体育活动");
lisi.add(plan3);
lisi.add(plan4);
User user =new User();
User user2 =new User();
user.setName("张三");
user.setListPlan(zhangsan);
user2.setName("李四");
user2.setListPlan(lisi);
List<User> link =new LinkedList<>();
link.add(user);
link.add(user2);
//我们上有提到fromIterable操作符是将集合中的元素输出
Observable.fromIterable(link).concatMap(new Function<User, Observable<Plan>>(){
@Override
public Observable<Plan> apply(User user) throws Exception {
//注意这里是返回一个新的被观察者
return Observable.fromIterable(user.getListPlan());
}
}).subscribe(new Consumer<Plan>() {
@Override
public void accept(Plan plan) throws Exception {
Log.e(TAG, "accept() returned: " +plan.getName()+plan.getTime()+""+plan.getContent() );
}
});
}
结果:
image.png
含义:将多个被观察组合 & 将它们需要发送的事件合拼。
应用种类 | ||||
---|---|---|---|---|
组合多个被观察者,合并事件 | concatArray()(发送事件--串行) | concatDelayError() | mergeArray()(发送事件--并发) | mergeArrayDelayError() |
组合多个被观察者,组合为一个被观察者 | zip() | combinelatest() | ||
发送事件前追加其他事件 | startWithArray() | startWith() | ||
组合多个事件为一个事件 | reduce | collect() | ||
汇总发送事件数量 | count() |
Observable.concatArray(Observable.just(1,2),
Observable.just(3,4),
Observable.just(5,6),
Observable.just(7,8),
Observable.just(9,10)).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: "+integer);
}
});
结果:
image.png
而mergeArray则和concatArray相反这里就不说了。
/**
* 组合操作符zip()
*/
private void rxJavaZip() {
//intervalRange()参数依此是:从那个数开始,发送次数,首次延迟,发送间隔
Observable.zip(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s = "A" + aLong;
Log.e(TAG, "===================A 发送的事件 " + s);
return s;
}
}),
Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
String s2 = "B" + aLong;
Log.e(TAG, "===================B 发送的事件 " + s2);
return s2;
}
}),
new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
String res = s + s2;
return res;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "===================组合A和B发送的事件 " + s);
}
});
}
打印结果:
image.png
上面代码中有两个 Observable,第一个发送事件的数量为3个,第二个发送事件的数量为4个,可以发现最终接收到的事件数量是3,那么为什么第二个 Observable 没有发送第4个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。
-startWithArray & startWith 在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
Observable.just(5, 6, 7)
.startWithArray(2, 3, 4)
.startWith(1)
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "================accept " + integer);
}
});
打印:
image.png
/**
* 组合操作符reduce
*/
private void reduce(){
Observable.just(0, 1, 2)
.reduce(new BiFunction < Integer, Integer, Integer > () {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
int resullt = integer + integer2;
Log.e(TAG, "====================integer " + integer);
Log.e(TAG, "====================integer2 " + integer2);
Log.e(TAG, "====================resullt " + resullt);
return resullt;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "==================accept " + integer);
}
});
}
打印:
image.png
看到没得,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止。
/**
* 组合操作符count
*/
private void count (){
Observable.just(1, 2, 3,4,5)
.count()
.subscribe(new Consumer < Long > () {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG, "accept: "+aLong );
}
});
}
结果打印
:
先整这么多吧,剩下的下一篇内容再说。