Observable
是多个值的惰性 Push 集合。他填补了下表中的缺失点:
SINGLE | MULTIPLEXED | |
---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
如,下面是一个 Observable
,它在订阅时立即(同步)推送值 1
、2
、3
,并且从 subscribe
调用开始后过 1 s 再推送值 4
,然后结束。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
要调用 Observable
并查看这些值,我们需要订阅它:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('Before subscribe');
observable.subscribe({
next(x) { console.log('Next: ' + x); },
error(err) { console.error('Error: ' + err); },
complete() { console.log('Complete'); }
});
console.log('After subscribe');
// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete
Pull
和 Push
是两种不同的协议,描述了数据生产者和数据消费者如何进通信。
什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。
每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull
出单个返回值来使用它。
ES 2015 中介绍了生成器函数和迭代器 (opens new window)(function *
),也属于 Pull 系统。调用 iterator.next()
的代码是消费者,从迭代器(生产者)中拉出多个值。
PRODUCER | CONSUMER | |
---|---|---|
Pull | Passive:produces data when requested | Active:decides when data is requested |
Push | Active:produces data at its own pace | Passive:reacts to received data |
什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。
Promise
是目前 JavaScript 中最常见的 Push 系统类型。Promise
(生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。
RxJS 引入了 Observable
,一个新的 JavaScript Push 系统。Observable
是一个多值生产者,推送数据给 Observer
(消费者)。
Observable
不像 EventEmitter
也不像 Promise
用于多个值。在一些情况下 Observable
会表现地像 EventEmitter
,如当使用 RxJS 的 Subject
进行多播时,但通常它们的行为不像 EventEmitter
。
Observable 类似于零参数的函数,但将它们泛化为允许多个值。
function foo () {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42
const y = foo.call(); // same as foo()
console.log(y);
// Hello
// 42
使用 Observable
改写上面的代码:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe(x => {
console.log(x);
});
// Hello
// 42
foo.subscribe(y => {
console.log(y);
});
// Hello
// 42
因为 函数 和 Observable
都是惰性计算。如果你不调用函数,console.log('Hello')
就不会被执行。同样对于 Observable
,如果你不“调用”它(使用 subscribe
), console.log('Hello')
也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable
订阅触发两个单独的副作用。和 EventEmitter
共享副作用并且无论订阅者是否存在都立即触发相反,Observable
没有共享执行并且是惰性计算。
订阅一个 Observable 就是调用一个函数。
部分人觉得 Observable
是异步的,这并不是真的。
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after
使用 Observable
会观察到和函数一样的输出:
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after
这说明,对 foo
的订阅完全是同步的,就像一个函数一样。
Observable
既能同步也可以异步地传递值。
那 Observable
和函数之间的区别是什么?Observable
可以随着时间推移“返回”多个值,这是函数无法做到的。
function foo () {
console.log('Hello');
return 42;
return 100; // dead code, will never happen
}
函数只能返回一个值,而 Observable
可以返回多个值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
也可以异步地返回值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300);
}, 1000);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
// 300
结论:
func.call()
表示同步地返回一个值observable.subscribe()
表示同步或异步地返回 0 或多个值Observable
使用 new Observable
或一个创建操作符来 created,会被 Observer
subscribed,execute 来向 Observer
传递 next
/ error
/ complete
通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable
实例中,当其中一些与其他类型相关,如 Observer
和 Subscription
。
Observable
核心关注点:
Observable
构造函数接受一个参数:subscribe
函数
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
Observable 可以使用
new Observable
来创建。通常,Observable
使用创建函数如of
、from
、interval
等来创建。
observable.subscribe(x => {
console.log(x);
});
这不是个巧合,observable.subscribe
和 new Observable(function subscribe(subscriber) {})
的 subscribe
有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。
这表示订阅调用不会在同一个 Observable
的多个 Observer
之间共享。当使用 Observer
调用 observable.subscribe
时,new Observable(function subscribe(subscriber) {})
中的 subscribe
函数为给定的 subscriber
运行。对 observable.subscribe
的每次调用都会为给定的 subscriber
触发其对应的设置。
对于
Observable
的订阅就像调用一个函数,提供了可以传递数据的回调。
这和 addEventListener
/ removeEventListener
等事件处理程序 API 完全不同。使用 observable.subscribe
,给定的 Observer
不会在 Observable
中注册为监听器。Observable
甚至不维护一个 Observer
列表。
订阅调用只是一种启动 Observable
执行并将值或时间传递给该执行的 Observer
的方法。
new Observable(function subscribe(subscriber) {})
里面的代码表示 Observable
的执行,只发生在每个订阅的 Observer
上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。
Observable
执行可以传递的值类型:
Next
通知:发送一个值,如 Number
、String
、Object
等Error
通知:发送一个错误,如 Error
Complete
通知:不发送值Next
通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。Error
和 Complete
通知在 Observable
执行过程中只可能执行一次,并且只能有一个发生。
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable
严格遵守协议,在 Complete
通知之后的 Next
通知将不会被发送:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered to subscribers
});
可以在 subscribe
代码外包一层 try/catch
块,以捕获错误:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err);
}
});
因为 Observable
执行可能是无限的,但是对于 Observer
来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer
,一旦 Observer
接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。
当 observable.subscribe
被调用时,Observer
被附加到新创建的 Observable
执行中,该调用还会返回 Subscription
对象。
const subscription = observable.subscribe(x => console.log(x));
Subscription
(opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。
import { from } from 'rxjs';
const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));
// Later
subscription.unsubscribe();
当我们使用 create()
创建 Observable
时,每个 Observable
都必须定义如何处理该执行的资源,如可以在函数 subscribe()
中返回自定义取消订阅函数来实现。
const observable = new Observable(function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
});
就像 observable.subscribe
类似于 new Observable(function subscribe (subscriber) {})
, 我们从 subscribe
返回的 unsubscribe
在概念上等同于 subscription.unsubscribe
。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。
function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('completed')
});
// Later
unsubscribe();
之所以使用像 Observable
、Observer
和 Subscription
的 Rx 类型,是为了安全考虑和 Operator
的可组合性。
什么是 Observer
? Observer
作为消费者消费 Observable
派发的值。Observer
只是一组回调,用于 Observable
派发的每种类型的通知:next
, error
和 complete
。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`),
complete: () => console.log('Observer got a complete notification')
};
// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);
Observer
只是有三个回调的对象,用于Observable
可能派发每种类型的通知。
RxJS 中的 Observer
也可能是部分的。如果没有提供某种回调,Observable
也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer
中找不到对应的回调。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`)
};
在订阅 Observable
时,也可以不用将回调放在一个 Observer
对象中,只传一个 next
回调函数作为参数就可以。
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));
在 observable.subscribe
内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer
对象。