什么是 Subscription? Subscription 是一个表示一次性资源的对象,通常是 Observable
的执行。Subscription
有一个重要的方法 unsubscribe
,不接受任何参数,只是释放 Subcription
持有的资源。在之前的 RxJS 中,Subscription
被称为 Disposable
。
import { interval } from 'rxjs';
const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Subscription
本质上只有一个unsubscribe()
函数来释放资源或取消Observable
执行。
Subscription
也可以放在一起,这样调用一个 Subscription
的 unsubscribe()
能取消多个 Subscription
。
import { interval } from 'rxjs';
const observable1 = interval(400);
const observable2 = interval(300);
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubsciption = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubsciption);
setTimeout(() => {
subscription.unsubscribe();
}, 1000);
// second: 0
// first: 0
// second: 1
// first: 1
// second: 2
Subscription
还有个 remove(otherSubscription)
方法,用于撤销添加到 Subscription
的子 Subscription
。
什么是 Subject ? RxJS 中的 Subject
是一种特殊类型的 Observable
,它允许将值多播到多个 Observer
。虽然普通的 Observable
是单播的(每个订阅的 Observer
都拥有 Observable
的独立执行),但 Subject
可以多播。
Subject
类似Observable
,但是它可以多播给多个Observer
。Subject
有点像EventEmitter
:他们都维护多个监听这的注册。
每个 Subject
都是一个 Observable
。 给定一个 Subject
,可以订阅它,使用 Observer
开始正常接收值。从 Observer
角度来看,它无法判断 Observable
的执行时来自普通的单播 Observable
还是 Subject
。
在 Subject
内部,订阅不会调用传递至的新执行。它只是在一个 Observer
列表中注册给定的 Observer
,类似于其他库或语言中 addListener
的工作方式。
每个 Subject
都是一个 Observer
。 它是一个对象,有 next(v)
,error(e)
和 complete()
方法。要为 Subject
提供一个新值,只需调用 next(v)
,它将被多播到注册监听 Subject
的 Observer
。
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
因为 Subject
是一个 Observer
,也就是说可以使用 Subject
作为参数来订阅任何 Observable
。
import { Subject, from } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject);
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
通过上面的方法,我们基本上只是通过 Subject
将单播 Observable
执行转换为多播。这是 Subject
如何使任何 Observable
执行共享给多个 Observer
的唯一方法。
也有一些特殊的 Subject
:BehaviorSubject
,ReplaySubject
和 AsyncSubject
。
“多播 Observable” 通过可能有许多订阅者的 Subject
传递通知,而普通的 “单播 Observable” 仅向单个 Observer
发送通知。
多播的
Observable
在底层使用Subject
来让多个Observer
看到相同的Observable
执行。
多播操作符底层工作原理:Observer
订阅底层 Subject
,Subject
订阅源 Observable
。
import { from, Subject, multicast } from 'rxjs';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
multicasted.connect();
multicast
返回一个看起来像正常 Observable
的 Observable
,但是它在订阅时像 Subject
一样。multicast
返回一个 ConnectableObservable
,它是个有 connect()
方法的 Observable
。
connect()
方法决定共享的 Observable
具体什么时候开始执行。connect()
本质上是执行 source.subscribe(subject)
,coonect()
返回一个 Subscription
,它可以用来取消订阅。
BehaviorSubject
是 Subject
的变体之一,具有“当前值”的概念。它存储发送给其消费者最新的值,并且每当有新的 Observer
订阅时,它将立即接收来自 BehaviorSubject
的 “当前值”。
BehaviorSubject
对于表示 “随时间变化的值” 很有用。如,生日的事件流是一个Subject
,但一个人的年龄是BehaviorSubject
。
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 初始值为 0
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject
和 BehaviorSubject
类似,但它可以给新的订阅者发送旧的值,可以记录 Observable
执行。
ReplaySubject
记录Observable
执行的一些值,并对新的订阅者进行重放。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 缓存 3 个值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
除了缓冲个数外,还可以定义毫秒级的窗口时间,来决定缓存记录可以保留多久。
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500); // 缓存 100 个值,每 500 毫秒清除一次
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
const interval = setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject
也是一种变体,它只将 Observable
执行的最后一个值发送给它的观察者,并且仅在执行完成时发送。
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
// observerA: 5
// observerB: 5
import { Subject } from 'rxjs';
const subject = new Subject();
subject.subscribe({
next: () => console.log('One second has passed')
});
setTimeout(() => {
subject.next();
}, 1000);
// One second has passed