功能需求 | 适用的操作符 |
---|---|
将多个数据流以首尾相连方式合并 | concat 和 concatAll |
将多个数据流中数据以先到先得方式合并 | merge 和 mergeAll |
将多个数据流中的数据以一一对应方式合并 | zip 和 zipAll |
持续合并多个数据流中最新产生的数据 | combineLatest 和 combineAll 和 widthLatestFrom |
从多个数据流中选出第一个产生内容的数据流 | race |
在数据流前面添加一个指定数据 | startWith |
只获取多个数据流最后产生的那个数据 | forkJoin |
从高阶数据流中切换数据源 | switch 和 exhaust |
RxJS 提供了一系列可以完成 Observable
组合操作的操作符,这一类操作符称为合并类(combination)操作符,这类操作符都有多个 Observable
对象作为数据来源,把不同来源的数据根据不同的规则合并到一个 Observable
对象中。
不少合并类操作符都有两种形式,既提供静态操作符,又提供实例操作符。当合并两个数据流,假设分别称为 source1$
和 source2$
,也就可以说 source2$
汇入了 source1$
,这时候用一个 source1$
的实例操作符语义上比较合适;在某些场景下,两者没有什么主次关系,只是两个平等关系的数据流合并在一起,这时候用一个静态操作符更加合适。
concat
是 concatenate 的缩写,意思就是“连锁”,各种语言各种库中都支持名为 concat
方法。在 JavaScript 中,数组就有 concat
方法,能够把多个数组中的元素依次合并到一个数组中:
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/concat';
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of(4, 5, 6);
const concated$ = source1$.concat(source2$);
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/concat';
const source1 = Observable.of(1, 2, 3);
const source2 = Observable.of(4, 5, 6);
const concated$ = Observable.concat(source1, source2);
concat
没有限制参数的个数,可以把任意数量的 Observable
对象合并。
因为 concat
开始从下一个 Observable
对象抽取数据只能在前一个 Observable
对象完结之后,所以参与到这个 concat
之中的 Observable
对象应该都能完结,如果一个 Observable
对象不会完结,那排在后面的 Observable
对象永远没有上场的机会。
const source1$ = Observable.interval(1000);
const source2$ = Observable.of(1);
const source3$ = source1$.concat(source2$);
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/map';
import 'rxjs/add/observable/merge';
const source1$ = Observable.timer(0, 1000).map(i => `A: ${i}`);
const source2$ = Observable.timer(500, 1000).map(i => `B: ${i}`);
const merged$ = Observable.merge(source1$, source2$);
merged$.subscribe(
console.log,
null,
() => console.log('completed')
);
// A: 0
// B: 0
// A: 1
// B: 1
// A: 2
// B: 2
因为 merge
在第一时刻就订阅上游的所有 Observable
对象,所以,如果某个上游 Observable
对象不能完结,并不影响其他 Observable
对象的数据传给 merge
的下游。 merge
只有在所有的上游 Observable
都完结的时候,才会完结自己产生的 Observable
对象。
一般来说, merge
只对产生异步数据的 Observable
才有意义,用 merge
来合并同步产生数据的 Observable
对象没什么意义:
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of(4, 5, 6);
const merged$ = Observable.merge(source1$, source2$);
// 同步产生的数据不会被合并
// 1 2 3 4 5 6
merge
做的事情很简单:依次订阅上游 Observable
对象,把接收到的数据转给下游,等待所有上游对象 Observable
完结。因为 of
产生的是同步数据流,当 merge
订阅 source1$
之后,还没来得及去订阅 source2$
, source1$
就一口气把自己的数据全吐出来了,所以实际上产生了 concat
的效果。
应该避免用 merge
去合并同步数据流, merge
应该用于合并产生异步数据的 Observable
对象,一个常用场景就是合并 DOM 事件。
merge
可以有一个可选参数 concurrent
,用于指定可以同时合并的 Observable
对象个数。
const source1$ = Observable.timer(0, 1000).map(i => `A: ${i}`);
const source2$ = Observable.timer(500, 1000).map(i => `B: ${i}`);
const source3$ = Observable.timer(1000, 1000).map(i => `C: ${i}`);
const merged$ = source1$.merge(source2$, source3$, 2);
source3$
中的数据永远不会获得进入 merged$
的机会,因为 merge
最后一个参数是 2 ,也就限定了同时只能同步合并两个 Observable
对象的数据, source1$
和 source2$
排在前面,所以优先合并它们两个,只有 source1$
和 source2$
其中之一完结的时候,才能空出一个名额来给 source3$
,可是 source1$
和 source2$
又不会完结,所以 source3$
没有出头之日。
const click$ = Rx.Observable.fromEvent(element, 'click');
const touchend$ = Rx.Observable.fromEvent(element, 'touchend');
Rx.Observable.merge(click$, touchend$).subscribe(eventHandler);
用 fromEvent
分别获得给定 DOM 元素的 click
和 touchend
事件数据流,然后用 merge
合并,这之后,无论是 click
事件发生还是 touchend
事件发生,都会流到 merge
产生的 Observable
对象中,这样就可以统一用一个事件处理函数 eventHandler
来处理。
zip
就像是一个拉条,上游的 Observable
对象就像是拉链的链齿,通过拉条合并,数据一定是一一对应的。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of('a', 'b', 'c');
const zipped$ = Observable.zip(source1$, source2$);
zipped$.subscribe(
console.log,
null,
() => console.log('complete')
);
// [1, 'a']
// [2, 'b']
// [3, 'c']
// complete
在产生的数据形式上,zip
和 concat
、merge
很不同,concat
、 merge
会保留原有的数据传给下游,但是 zip
会把上游的数据转化为数组形式,每一个上游 Observable
贡献的数据会在对应数组中占一席之地。
const source1$ = Observable.interval(1000);
const source2$ = Observable.of('a', 'b', 'c');
const zipped$ = Observable.zip(source1$, source2$);
zipped$.subscribe(
console.log,
null,
() => console.log('complete')
);
// [0, 'a']
// [1, 'b']
// [2, 'c']
// complete
虽然 source2$
第一时间就吐出了字符串 a
,但是 source1$
并没有吐出任何数据,所以字符串 a
只能等着,直到 1
秒钟的时候, source1$
吐出了 0
时 zip
就把两个数据合并为一个数据传给下游。
source1$
是由 interval
产生的数据流,是不会完结的,但是 zip
产生的 Observable
对象却在 source2$
吐完所有数据之后也调用了 complete
,也就是说,只要任何一个上游的 Observable
完结。 zip
只要给这个完结的 Observable
对象吐出的所有数据找到配对的数据,那么 zip
就会给下游一个 complete
信号。
如果某个上游 source1$
吐出数据的速度很快,而另一个上游 source2$
吐出数据的速度很慢,那 zip
就不得不先存储 source1$
吐出的数据,因为 RxJS 的工作方式是“推”, Observable
把数据推给下游之后自己就没有责任保存数据了。被 source1$
推送了数据之后, zip
就有责任保存这些数据,等着和 source2$
未来吐出的数据配对。假如 source2$
迟迟不吐出数据,那么 zip
就会一直保存 source1$
没有配对的数据,然而这时候 source1$
可能会持续地产生数据,最后 zip
积压的数据就会越来越多,占用的内存也就越来越多。
对于数据量比较小的 Observable
对象,这样的数据积压还可以忍受,但是对于超大量的数据流,使用 zip
就不得不考虑潜在的内存压力问题, zip
这个操作符自身是解决不了这个问题的。
如果用 zip
组合超过两个 Observable
对象,游戏规则依然一样,组合而成的 Observable
吐出的每个数据依然是数组,数组元素个数和上游 Observable
对象数量相同,每个上游 Observable
对象都要贡献一个元素,如果某个 Observable
对象没有及时吐出数据,那么 zip
会等,等到它吐出匹配的数据,或者等到它完结。
combineLatest
合并数据流的方式是当任何一个上游 Observable
产生数据时,从所有输入 Observable
对象中拿最后一次产生的数据(最新数据),然后把这些数据组合起来传给下游。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/combineLatest';
import 'rxjs/add/operator/map';
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// [0, 0]
// [1, 0]
// [1, 1]
// [2, 1]
// [2, 2]
// [3, 2]
值得注意的是,并不是说上游产生任何一个数据都会引发 combineLatest
给下游传一个数据,只要有一个上游数据源还没有产生数据,那么 combineLatest
也没有数据输出,因为凑不齐完整的数据集合,只能等待。
单独某个上游 Observable
完结不会让 combineLatest
产生的 Observable
对象完结,因为当一个 Observable
对象完结之后,它依然有“最新数据”啊,就是它在完结之前产生的最后一个数据, combineLatest
记着呢,还可以继续使用这个“最新数据”。只有当所有上游 Observable
对象都完结之后, combineLatest
才会给下游一个 complete
信号,表示不会有任何数据更新了。
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.of('a');
const result$ = source1$.combineLatest(source2$);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// [0, 'a']
// [1, 'a']
// [2, 'a']
// [3, 'a']
只要 source1$
持续产生数据,那 combineLatest
就会持续拿 source2$
的最后一个数据字符串 a
去和 source1$
产生的新数据组合传给下游。
const source1$ = Observable.of('a', 'b', 'c');
const source2$ = Observable.of(1, 2, 3);
const result$ = source1$.combineLatest(source2$);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// 由于 source1$ 是同步数据流,在被订阅时吐出所有数据,最后吐出的是 c
// source2$ 吐出 1 时,和 source1$ 吐出的 c 去和 1 做组合
// ['c', 1]
// ['c', 2]
// ['c', 3]
// complete
combineLatest
会顺序订阅所有上游的 Observable
对象,只有所有上游 Observable
对象都已经吐出数据了,才会给下游传递所有上游“最新数据”组合的数据。
const source1$ = Observable.of('a', 'b', 'c');
const source2$ = Observable.of(1, 2, 3);
const source3$ = Observable.of('x', 'y');
const result$ = source1$.combineLatest(source2$, source3$);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// ['c', 3, 'x']
// ['c', 3, 'y']
// complete
combineLatest
工作原理:
如果 combineLatest
的输入只有 Observable
对象,那么传递给下游的数据就是一个包含所有上游“最新数据”的数组。
combineLatest
的最后一个参数可以是一个函数,称为 project
, project
的作用是让 combineLatest
把所有上游的“最新数据”扔给下游之前做一下组合处理,这样就可以不用传递一个数组下去,可以传递任何由“最新数据”产生的对象。 project
可以包含多个参数,每一个参数对应的是上游 Observable
的最新数据, project
返回的结果就是 combineLatest
塞给下游的结果。
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$, project);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// '0 and 0'
// '1 and 0'
// '1 and 1'
// '2 and 1'
实际上等同于下面的代码:
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$)
.map(arr => project(...arr));
combineLatest
产生的 Observable
对象数据依赖于上游的多个 Observable
对象,如果上游的多个 Observable
对象又共同依赖于另一个 Observable
对象,这就是多重依赖问题。
多重继承可能会导致一些很反常识的问题,因为一个属性很难说清楚是从哪条关系继承下来的,所以在其他编程语言中往往放弃多重继承的功能。
withLatestFrom
的功能类似于 combineLatest
,但是给下游推送数据只能由一个上游 Observable
对象驱动。
withLatestFrom
只有实例操作符的形式,而且所有输入 Observable
的地位并不相同,调用 withLatestFrom
的那个 Observable
对象起到主导数据产生节奏的作用,作为参数的 Observable
对象只能贡献数据,不能控制产生数据的时机。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
const source2$ = Observable.timer(500, 1000);
const result$ = source1$.withLatestFrom(source2$, (a, b) => a + b);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// 101
// 203
// 305
// 407
如果用 withLatestFrom
,那么对应的多重依赖问题可以得到解决,因为产生的下游 Observable
对象中数据生成节奏只由一个输入 Observable
对象决定。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x + 'a');
const source2$ = original$.map(x => x + 'b');
const result$ = source1$.withLatestFrom(source2$);
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// [ '0a', '0b' ]
// [ '1a', '1b' ]
// [ '2a', '2b' ]
// [ '3a', '3b' ]
一般来说,当要合并多个 Observable
的“最新数据”,要从 combineLatest
和 withLatestFrom
中选一个操作符来操作,根据下面的原则来选择:
Observable
对象,使用 combineLatest
Observable
对象“映射”成新的数据流,同时要从其他 Observable
对象获取“最新数据”,就是用 withLatestFrom
第一个吐出数据的 Observable
对象就是胜者, race
产生的 Observable
就会完全采用胜者 Observable
对象的数据,其余的输入 Observable
对象则会被退订而抛弃。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/race';
import 'rxjs/add/operator/map';
const source1$ = Observable.timer(0, 2000).map(x => x + 'a');
const source2$ = Observable.timer(500, 1000).map(x => x + 'b');
const winner$ = source1$.race(source2$);
winner$.subscribe(
console.log,
null,
() => console.log('complete')
);
// 0a
// 1a
// 2a
startWith
只有实例操作符的形式,其功能是让一个 Observable
对象在被订阅的时候,总是先吐出指定的若干个数据。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/startWith';
const original$ = Observable.timer(0, 1000);
const result$ = original$.startWith('start');
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// start
// 0
// 1
startWith
的功能完全可以通过 concat
来实现,但如果使用 concat
,那无论用静态操作符或者实例操作符的形式, original$
都只能放在参数列表里,不能调用 original$
的 concat
函数,这样一来,也就没有办法形成连续的链式调用。 startWith
满足了需要连续链式调用的要求。
forkJoin
只有静态操作符的形式,可以接受多个 Observable
对象作为参数, forkJoin
产生的 Observable
对象也很有特点,它只会产生一个数据,因为它会等待所有参数 Observable
对象的最后一个数据,也就是说,只有当所有 Observable
对象都完结,确定不会有新的数据产生的时候, forkJoin
就会把所有输入 Observable
对象产生的最后一个数据合并成给下游唯一的数据。
所以说, forkJoin
就是 RxJS 界的 Promise.all
, Promise.all
等待所有输入的 Promise
对象成功之后把结果合并, forkJoin
等待所有输入的 Observable
对象完结之后把最后一个数据合并。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';
const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
const concated$ = Observable.forkJoin(source1$, source2$);
concated$.subscribe(
console.log,
null,
() => console.log('completed')
);
// [0a, 2b]
// completed
数据流虽然管理的是数据,数据流自身也可以认为是一种数据,既然数据可以用 Observable
来管理,那么数据流本身也可以用 Observable
来管理,让需要被管理的 Observable
对象成为其他 Observable
对象的数据,用现成的管理 Observable
对象的方法来管理 Observable
对象,这就是高阶 Observable
的意义。
RxJS 提供对应的处理高阶 Observable
的合并类操作符,名称就是在原有操作符名称的结尾加上 All
,如下所示:
concatAll
mergeAll
zipAll
combineAll
concatAll
concat
是把所有输入的 Observable
首尾相连组合在一起, concatAll
做的事情也一样,只不过 concatAll
只有一个上游 Observable
对象,这个 Observable
对象预期是一个高阶 Observable
对象, concatAll
会对其中的内部 Observable
对象做 concat
的操作。
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.concatAll();
concated$.subscribe(console.log);
// 0:0
// 0:1
// 1:0
// 1:1
concatAll
首先会订阅上游产生的第一个内部 Observable
对象,抽取其中的数据,然后,只有当第一个 Observable
对象完结的时候,才会去订阅第二个内部 Observable
对象。
mergeAll
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.mergeAll();
mergeAll
对内部 Observable
的订阅策略和 concatAll
不同, mergeAll
只要发现上游产生一个内部 Observable
就会立刻订阅,并从中抽取收据。
zipAll
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.zipAll();
concated$.subscribe(
console.log,
null,
() => console.log('completed')
);
// ['0:0', '1:0']
// ['0:1', '1:1']
// completed
还可以给 zipAll
一个函数类型的参数,就和 zip
的 project
参数一样定制产出的数据形式。
combineAll
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.combineAll();
concated$.subscribe(
console.log,
null,
() => console.log('completed')
);
// ['0:0', '1:0']
// ['0:1', '1:0']
// ['0:1', '1:1']
// completed
combineAll
和 zipAll
一样,必须上游高阶 Observable
完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部 Observable
对象的个数,才能拼凑出第一个传给下游的数据。
很多场景下并不需要无损的数据流连接,也就是说,可以舍弃掉一些数据,至于怎么舍弃,就涉及另外两个合并类操作符,分别是 switch
和 exhaust
,这两个操作符是 concatAll
的进化版本。
switch
的含义就是“切换”,总是切换到最新的内部 Observable
对象获取数据。每当 switch
的上游高阶 Observable
产生一个内部 Observable
对象, switch
都会立刻订阅最新的内部 Observable
对象上,如果已经订阅了之前的内部 Observable
对象,就会退订那个过时的内部 Observable
对象,这个“用上新的,舍弃旧的”动作,就是切换。
const ho$ = Observable.interval(1000)
.take(2)
.map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const result$ = ho$.switch();
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// 1:0
// 1:1
// completed
switch
首先订阅了第一个内部 Observable
对象,但是这个内部对象还没来得及产生第一个数据 0:0
,第二个内部 Observable
对象就产生了,这时候 switch
就会做切换动作,切换到第二个内部 Observable
上,因为之后没有新的内部 Observable
对象产生, switch
就会一直从第二个内部 Observable
对象获取数据,于是最后得到的数据就是 1:0
和 1:1
。
exhaust
的含义就是“耗尽”,在耗尽当前内部 Observable
的数据之前不会切换到下一个内部 Observable
对象。
const ho$ = Observable.interval(1000)
.take(3)
.map(x => Observable.interval(700).map(y => x + ':' + y).take(2));
const result$ = ho$.exhaust();
result$.subscribe(
console.log,
null,
() => console.log('completed')
);
// 0:0
// 0:1
// 2:0
// 2:1
// completed