前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出 RxJS 之 合并数据流

深入浅出 RxJS 之 合并数据流

作者头像
Cellinlab
发布2023-05-17 20:10:13
1.6K0
发布2023-05-17 20:10:13
举报
文章被收录于专栏:Cellinlab's Blog

功能需求

适用的操作符

将多个数据流以首尾相连方式合并

concat 和 concatAll

将多个数据流中数据以先到先得方式合并

merge 和 mergeAll

将多个数据流中的数据以一一对应方式合并

zip 和 zipAll

持续合并多个数据流中最新产生的数据

combineLatest 和 combineAll 和 widthLatestFrom

从多个数据流中选出第一个产生内容的数据流

race

在数据流前面添加一个指定数据

startWith

只获取多个数据流最后产生的那个数据

forkJoin

从高阶数据流中切换数据源

switch 和 exhaust

合并类操作符

RxJS 提供了一系列可以完成 Observable 组合操作的操作符,这一类操作符称为合并类(combination)操作符,这类操作符都有多个 Observable 对象作为数据来源,把不同来源的数据根据不同的规则合并到一个 Observable 对象中。

不少合并类操作符都有两种形式,既提供静态操作符,又提供实例操作符。当合并两个数据流,假设分别称为 source1$source2$ ,也就可以说 source2$ 汇入了 source1$ ,这时候用一个 source1$ 的实例操作符语义上比较合适;在某些场景下,两者没有什么主次关系,只是两个平等关系的数据流合并在一起,这时候用一个静态操作符更加合适。

# concat:首尾相连

concat 是 concatenate 的缩写,意思就是“连锁”,各种语言各种库中都支持名为 concat 方法。在 JavaScript 中,数组就有 concat 方法,能够把多个数组中的元素依次合并到一个数组中:

代码语言:javascript
复制
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/concat';

const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of(4, 5, 6);
const concated$ = source1$.concat(source2$);
代码语言:javascript
复制
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/concat';

const source1 = Observable.of(1, 2, 3);
const source2 = Observable.of(4, 5, 6);
const concated$ = Observable.concat(source1, source2);

concat 没有限制参数的个数,可以把任意数量的 Observable 对象合并。

因为 concat 开始从下一个 Observable 对象抽取数据只能在前一个 Observable 对象完结之后,所以参与到这个 concat 之中的 Observable 对象应该都能完结,如果一个 Observable 对象不会完结,那排在后面的 Observable 对象永远没有上场的机会。

代码语言:javascript
复制
const source1$ = Observable.interval(1000);
const source2$ = Observable.of(1);
const source3$ = source1$.concat(source2$);

# merge:先到先得快速通过

  1. 数据汇流
代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/map';
import 'rxjs/add/observable/merge';

const source1$ = Observable.timer(0, 1000).map(i => `A: ${i}`);
const source2$ = Observable.timer(500, 1000).map(i => `B: ${i}`);
const merged$ = Observable.merge(source1$, source2$);

merged$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// A: 0
// B: 0
// A: 1
// B: 1
// A: 2
// B: 2

因为 merge 在第一时刻就订阅上游的所有 Observable 对象,所以,如果某个上游 Observable 对象不能完结,并不影响其他 Observable 对象的数据传给 merge 的下游。 merge 只有在所有的上游 Observable 都完结的时候,才会完结自己产生的 Observable 对象。

一般来说, merge 只对产生异步数据的 Observable 才有意义,用 merge 来合并同步产生数据的 Observable 对象没什么意义:

代码语言:javascript
复制
const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of(4, 5, 6);
const merged$ = Observable.merge(source1$, source2$);
// 同步产生的数据不会被合并
// 1 2 3 4 5 6

merge 做的事情很简单:依次订阅上游 Observable 对象,把接收到的数据转给下游,等待所有上游对象 Observable 完结。因为 of 产生的是同步数据流,当 merge 订阅 source1$ 之后,还没来得及去订阅 source2$source1$ 就一口气把自己的数据全吐出来了,所以实际上产生了 concat 的效果。

应该避免用 merge 去合并同步数据流, merge 应该用于合并产生异步数据的 Observable 对象,一个常用场景就是合并 DOM 事件。

  1. 同步限流

merge 可以有一个可选参数 concurrent ,用于指定可以同时合并的 Observable 对象个数。

代码语言:javascript
复制
const source1$ = Observable.timer(0, 1000).map(i => `A: ${i}`);
const source2$ = Observable.timer(500, 1000).map(i => `B: ${i}`);
const source3$ = Observable.timer(1000, 1000).map(i => `C: ${i}`);

const merged$ = source1$.merge(source2$, source3$, 2);

source3$ 中的数据永远不会获得进入 merged$ 的机会,因为 merge 最后一个参数是 2 ,也就限定了同时只能同步合并两个 Observable 对象的数据, source1$source2$ 排在前面,所以优先合并它们两个,只有 source1$source2$ 其中之一完结的时候,才能空出一个名额来给 source3$ ,可是 source1$source2$ 又不会完结,所以 source3$ 没有出头之日。

  1. merge 的应用场景
代码语言:javascript
复制
const click$ = Rx.Observable.fromEvent(element, 'click');
const touchend$ = Rx.Observable.fromEvent(element, 'touchend');

Rx.Observable.merge(click$, touchend$).subscribe(eventHandler);

fromEvent 分别获得给定 DOM 元素的 clicktouchend 事件数据流,然后用 merge 合并,这之后,无论是 click 事件发生还是 touchend 事件发生,都会流到 merge 产生的 Observable 对象中,这样就可以统一用一个事件处理函数 eventHandler 来处理。

# zip:拉链式组合

zip 就像是一个拉条,上游的 Observable 对象就像是拉链的链齿,通过拉条合并,数据一定是一一对应的。

  1. 一对一的合并
代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/zip';

const source1$ = Observable.of(1, 2, 3);
const source2$ = Observable.of('a', 'b', 'c');
const zipped$ = Observable.zip(source1$, source2$);

zipped$.subscribe(
  console.log,
  null,
  () => console.log('complete')
);
// [1, 'a']
// [2, 'b']
// [3, 'c']
// complete

在产生的数据形式上,zipconcatmerge 很不同,concatmerge 会保留原有的数据传给下游,但是 zip 会把上游的数据转化为数组形式,每一个上游 Observable 贡献的数据会在对应数组中占一席之地。

代码语言:javascript
复制
const source1$ = Observable.interval(1000);
const source2$ = Observable.of('a', 'b', 'c');
const zipped$ = Observable.zip(source1$, source2$);

zipped$.subscribe(
  console.log,
  null,
  () => console.log('complete')
);
// [0, 'a']
// [1, 'b']
// [2, 'c']
// complete

虽然 source2$ 第一时间就吐出了字符串 a ,但是 source1$ 并没有吐出任何数据,所以字符串 a 只能等着,直到 1 秒钟的时候, source1$ 吐出了 0zip 就把两个数据合并为一个数据传给下游。

source1$ 是由 interval 产生的数据流,是不会完结的,但是 zip 产生的 Observable 对象却在 source2$ 吐完所有数据之后也调用了 complete ,也就是说,只要任何一个上游的 Observable 完结。 zip 只要给这个完结的 Observable 对象吐出的所有数据找到配对的数据,那么 zip 就会给下游一个 complete 信号。

  1. 数据积压问题

如果某个上游 source1$ 吐出数据的速度很快,而另一个上游 source2$ 吐出数据的速度很慢,那 zip 就不得不先存储 source1$ 吐出的数据,因为 RxJS 的工作方式是“推”, Observable 把数据推给下游之后自己就没有责任保存数据了。被 source1$ 推送了数据之后, zip 就有责任保存这些数据,等着和 source2$ 未来吐出的数据配对。假如 source2$ 迟迟不吐出数据,那么 zip 就会一直保存 source1$ 没有配对的数据,然而这时候 source1$ 可能会持续地产生数据,最后 zip 积压的数据就会越来越多,占用的内存也就越来越多。

对于数据量比较小的 Observable 对象,这样的数据积压还可以忍受,但是对于超大量的数据流,使用 zip 就不得不考虑潜在的内存压力问题, zip 这个操作符自身是解决不了这个问题的。

  1. zip 多个数据流

如果用 zip 组合超过两个 Observable 对象,游戏规则依然一样,组合而成的 Observable 吐出的每个数据依然是数组,数组元素个数和上游 Observable 对象数量相同,每个上游 Observable 对象都要贡献一个元素,如果某个 Observable 对象没有及时吐出数据,那么 zip 会等,等到它吐出匹配的数据,或者等到它完结。

# combineLatest:合并最后一个数据

combineLatest 合并数据流的方式是当任何一个上游 Observable 产生数据时,从所有输入 Observable 对象中拿最后一次产生的数据(最新数据),然后把这些数据组合起来传给下游。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/combineLatest';
import 'rxjs/add/operator/map';

const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const result$ = source1$.combineLatest(source2$);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);
// [0, 0]
// [1, 0]
// [1, 1]
// [2, 1]
// [2, 2]
// [3, 2]

值得注意的是,并不是说上游产生任何一个数据都会引发 combineLatest 给下游传一个数据,只要有一个上游数据源还没有产生数据,那么 combineLatest 也没有数据输出,因为凑不齐完整的数据集合,只能等待。

单独某个上游 Observable 完结不会让 combineLatest 产生的 Observable 对象完结,因为当一个 Observable 对象完结之后,它依然有“最新数据”啊,就是它在完结之前产生的最后一个数据, combineLatest 记着呢,还可以继续使用这个“最新数据”。只有当所有上游 Observable 对象都完结之后, combineLatest 才会给下游一个 complete 信号,表示不会有任何数据更新了。

代码语言:javascript
复制
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.of('a');
const result$ = source1$.combineLatest(source2$);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);
// [0, 'a']
// [1, 'a']
// [2, 'a']
// [3, 'a']

只要 source1$ 持续产生数据,那 combineLatest 就会持续拿 source2$ 的最后一个数据字符串 a 去和 source1$ 产生的新数据组合传给下游。

代码语言:javascript
复制
const source1$ = Observable.of('a', 'b', 'c');
const source2$ = Observable.of(1, 2, 3);
const result$ = source1$.combineLatest(source2$);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);
// 由于 source1$ 是同步数据流,在被订阅时吐出所有数据,最后吐出的是 c
// source2$ 吐出 1 时,和 source1$ 吐出的 c 去和 1 做组合
// ['c', 1]
// ['c', 2]
// ['c', 3]
// complete

combineLatest 会顺序订阅所有上游的 Observable 对象,只有所有上游 Observable 对象都已经吐出数据了,才会给下游传递所有上游“最新数据”组合的数据。

代码语言:javascript
复制
const source1$ = Observable.of('a', 'b', 'c');
const source2$ = Observable.of(1, 2, 3);
const source3$ = Observable.of('x', 'y');
const result$ = source1$.combineLatest(source2$, source3$);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// ['c', 3, 'x']
// ['c', 3, 'y']
// complete

combineLatest 工作原理:

  1. 定制下游数据

如果 combineLatest 的输入只有 Observable 对象,那么传递给下游的数据就是一个包含所有上游“最新数据”的数组。

combineLatest 的最后一个参数可以是一个函数,称为 projectproject 的作用是让 combineLatest 把所有上游的“最新数据”扔给下游之前做一下组合处理,这样就可以不用传递一个数组下去,可以传递任何由“最新数据”产生的对象。 project 可以包含多个参数,每一个参数对应的是上游 Observable 的最新数据, project 返回的结果就是 combineLatest 塞给下游的结果。

代码语言:javascript
复制
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$, project);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// '0 and 0'
// '1 and 0'
// '1 and 1'
// '2 and 1'

实际上等同于下面的代码:

代码语言:javascript
复制
const source1$ = Observable.timer(500, 1000);
const source2$ = Observable.timer(1000, 1000);
const project = (a, b) => `${a} and ${b}`;
const result$ = source1$.combineLatest(source2$)
  .map(arr => project(...arr));
  1. 多重依赖问题

combineLatest 产生的 Observable 对象数据依赖于上游的多个 Observable 对象,如果上游的多个 Observable 对象又共同依赖于另一个 Observable 对象,这就是多重依赖问题。

多重继承可能会导致一些很反常识的问题,因为一个属性很难说清楚是从哪条关系继承下来的,所以在其他编程语言中往往放弃多重继承的功能。

# withLatestFrom

withLatestFrom 的功能类似于 combineLatest ,但是给下游推送数据只能由一个上游 Observable 对象驱动。

withLatestFrom 只有实例操作符的形式,而且所有输入 Observable 的地位并不相同,调用 withLatestFrom 的那个 Observable 对象起到主导数据产生节奏的作用,作为参数的 Observable 对象只能贡献数据,不能控制产生数据的时机。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';

const source1$ = Observable.timer(0, 2000).map(x => 100 * x);
const source2$ = Observable.timer(500, 1000);
const result$ = source1$.withLatestFrom(source2$, (a, b) => a + b);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// 101
// 203
// 305
// 407

# 解决 glitch

如果用 withLatestFrom ,那么对应的多重依赖问题可以得到解决,因为产生的下游 Observable 对象中数据生成节奏只由一个输入 Observable 对象决定。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';

const original$ = Observable.timer(0, 1000);
const source1$ = original$.map(x => x + 'a');
const source2$ = original$.map(x => x + 'b');
const result$ = source1$.withLatestFrom(source2$);

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// [ '0a', '0b' ]
// [ '1a', '1b' ]
// [ '2a', '2b' ]
// [ '3a', '3b' ]

一般来说,当要合并多个 Observable 的“最新数据”,要从 combineLatestwithLatestFrom 中选一个操作符来操作,根据下面的原则来选择:

  • 如果要合并完全独立的 Observable 对象,使用 combineLatest
  • 如何要把一个 Observable 对象“映射”成新的数据流,同时要从其他 Observable 对象获取“最新数据”,就是用 withLatestFrom

# race:胜者通吃

第一个吐出数据的 Observable 对象就是胜者, race 产生的 Observable 就会完全采用胜者 Observable 对象的数据,其余的输入 Observable 对象则会被退订而抛弃。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/race';
import 'rxjs/add/operator/map';

const source1$ = Observable.timer(0, 2000).map(x => x + 'a');
const source2$ = Observable.timer(500, 1000).map(x => x + 'b');
const winner$ = source1$.race(source2$);

winner$.subscribe(
  console.log,
  null,
  () => console.log('complete')
);
// 0a
// 1a
// 2a

# startWith

startWith 只有实例操作符的形式,其功能是让一个 Observable 对象在被订阅的时候,总是先吐出指定的若干个数据。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/startWith';

const original$ = Observable.timer(0, 1000);
const result$ = original$.startWith('start');

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);
// start
// 0
// 1

startWith 的功能完全可以通过 concat 来实现,但如果使用 concat ,那无论用静态操作符或者实例操作符的形式, original$ 都只能放在参数列表里,不能调用 original$concat 函数,这样一来,也就没有办法形成连续的链式调用。 startWith 满足了需要连续链式调用的要求。

# forkJoin

forkJoin 只有静态操作符的形式,可以接受多个 Observable 对象作为参数, forkJoin 产生的 Observable 对象也很有特点,它只会产生一个数据,因为它会等待所有参数 Observable 对象的最后一个数据,也就是说,只有当所有 Observable 对象都完结,确定不会有新的数据产生的时候, forkJoin 就会把所有输入 Observable 对象产生的最后一个数据合并成给下游唯一的数据。

所以说, forkJoin 就是 RxJS 界的 Promise.allPromise.all 等待所有输入的 Promise 对象成功之后把结果合并, forkJoin 等待所有输入的 Observable 对象完结之后把最后一个数据合并。

代码语言:javascript
复制
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/interval';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/take';

const source1$ = Observable.interval(1000).map(x => x + 'a').take(1);
const source2$ = Observable.interval(1000).map(x => x + 'b').take(3);
const concated$ = Observable.forkJoin(source1$, source2$);

concated$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// [0a, 2b]
// completed

# 高阶 Observable

# 高阶 Observable 的意义

数据流虽然管理的是数据,数据流自身也可以认为是一种数据,既然数据可以用 Observable 来管理,那么数据流本身也可以用 Observable 来管理,让需要被管理的 Observable 对象成为其他 Observable 对象的数据,用现成的管理 Observable 对象的方法来管理 Observable 对象,这就是高阶 Observable 的意义。

# 操作高阶 Observable 的合并类操作符

RxJS 提供对应的处理高阶 Observable 的合并类操作符,名称就是在原有操作符名称的结尾加上 All ,如下所示:

  • concatAll
  • mergeAll
  • zipAll
  • combineAll
  1. concatAll

concat 是把所有输入的 Observable 首尾相连组合在一起, concatAll 做的事情也一样,只不过 concatAll 只有一个上游 Observable 对象,这个 Observable 对象预期是一个高阶 Observable 对象, concatAll 会对其中的内部 Observable 对象做 concat 的操作。

代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(2)
  .map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));

const concated$ = ho$.concatAll();

concated$.subscribe(console.log);
// 0:0
// 0:1
// 1:0
// 1:1

concatAll 首先会订阅上游产生的第一个内部 Observable 对象,抽取其中的数据,然后,只有当第一个 Observable 对象完结的时候,才会去订阅第二个内部 Observable 对象。

  1. mergeAll
代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(2)
  .map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.mergeAll();

mergeAll 对内部 Observable 的订阅策略和 concatAll 不同, mergeAll 只要发现上游产生一个内部 Observable 就会立刻订阅,并从中抽取收据。

  1. zipAll
代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(2)
  .map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.zipAll();

concated$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// ['0:0', '1:0']
// ['0:1', '1:1']
// completed

还可以给 zipAll 一个函数类型的参数,就和 zipproject 参数一样定制产出的数据形式。

  1. combineAll
代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(2)
  .map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const concated$ = ho$.combineAll();

concated$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// ['0:0', '1:0']
// ['0:1', '1:0']
// ['0:1', '1:1']
// completed

combineAllzipAll 一样,必须上游高阶 Observable 完结之后才能开始给下游产生数据,因为只有确定了作为输入的内部 Observable 对象的个数,才能拼凑出第一个传给下游的数据。

# 进化的高阶 Observable 处理

很多场景下并不需要无损的数据流连接,也就是说,可以舍弃掉一些数据,至于怎么舍弃,就涉及另外两个合并类操作符,分别是 switchexhaust ,这两个操作符是 concatAll 的进化版本。

  1. switch:切换输入 Observable

switch 的含义就是“切换”,总是切换到最新的内部 Observable 对象获取数据。每当 switch 的上游高阶 Observable 产生一个内部 Observable 对象, switch 都会立刻订阅最新的内部 Observable 对象上,如果已经订阅了之前的内部 Observable 对象,就会退订那个过时的内部 Observable 对象,这个“用上新的,舍弃旧的”动作,就是切换。

代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(2)
  .map(x => Observable.interval(1500).map(y => x + ':' + y).take(2));
const result$ = ho$.switch();

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// 1:0
// 1:1
// completed

switch 首先订阅了第一个内部 Observable 对象,但是这个内部对象还没来得及产生第一个数据 0:0,第二个内部 Observable 对象就产生了,这时候 switch 就会做切换动作,切换到第二个内部 Observable 上,因为之后没有新的内部 Observable 对象产生, switch 就会一直从第二个内部 Observable 对象获取数据,于是最后得到的数据就是 1:01:1

  1. exhaust

exhaust 的含义就是“耗尽”,在耗尽当前内部 Observable 的数据之前不会切换到下一个内部 Observable 对象。

代码语言:javascript
复制
const ho$ = Observable.interval(1000)
  .take(3)
  .map(x => Observable.interval(700).map(y => x + ':' + y).take(2));
const result$ = ho$.exhaust();

result$.subscribe(
  console.log,
  null,
  () => console.log('completed')
);

// 0:0
// 0:1
// 2:0
// 2:1
// completed
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/7/21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 合并类操作符
    • # concat:首尾相连
      • # merge:先到先得快速通过
        • # zip:拉链式组合
          • # combineLatest:合并最后一个数据
            • # withLatestFrom
              • # 解决 glitch
                • # race:胜者通吃
                  • # startWith
                    • # forkJoin
                    • # 高阶 Observable
                      • # 高阶 Observable 的意义
                        • # 操作高阶 Observable 的合并类操作符
                          • # 进化的高阶 Observable 处理
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档