假设我们有这个全局常量:
const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
.pipe(throttleTime(1000), shareReply(1));页面加载后,多个组件将同时订阅此内容:
isSignedIn.subscribe(() => console.log('do 1st'));
isSignedIn.subscribe(() => console.log('do 2nd'));
isSignedIn.subscribe(() => console.log('do 3rd'));上面只会调用一次API,但是如果另一个组件订阅了它,我需要它再次调用API (即1秒后)。
isSignedIn.subscribe(() => console.log('button press'));如何使用RxJS实现这一点?
发布于 2019-01-10 10:59:37
我想这就是你想要的:
一个pipeable操作符(在某个地方全局声明并导入)
export const refreshAfter = (duration: number) => (source: Observable<any>) =>
source.pipe(
repeatWhen(obs => obs.pipe(delay(duration))),
publishReplay(1),
refCount());然后像这样使用它:
data$ = fetch('/api/is-signed-in').pipe(refreshAfter(5000)); // refresh after 5000 ms注意:您实际上要求这样做:
如果另一个组件订阅了它,我需要它再次调用
(即1秒后)。
我不太确定这是你真正想说的。我想你真正的意思是--你希望在一个过期时间之后,当前订阅的所有组件的数据都被刷新。无论如何,我的回答都会将新值发送给所有的监听器。如果你真的想要你最初说的东西,你需要添加一些替代的重复触发器。
但如果这是一个全局常量-上面就是我在相同场景中使用的。
注意:我还没有实际测试重复项时对错误条件的处理,但我认为错误将传播到所有侦听器。
发布于 2018-08-31 16:07:29
编辑:答案是错误的。BufferSize是重放最后N个事件的时长。在此之后,流完成。
signature: shareReplay(
bufferSize?: number,
windowTime?: number,
scheduler?: IIScheduler
):Observable
@param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
@param {Number} [windowTime=Number.MAX_VALUE] Maximum time length of the replay buffer in milliseconds.尝试将1000作为第二个参数添加到shareReply:
const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
.pipe(throttleTime(1000), shareReplay(1, 1000));shareReplay.ts -注意refCount-- on unsubcribe,因为它可以触发额外的请求。
发布于 2018-09-01 00:48:49
如果我们重新实现ShareReplay,那么它:
subscribe到源经过的时间。import {Subject, of, Observable, ReplaySubject, Subscriber} from 'rxjs';
import {pluck, shareReplay, tap, delay} from 'rxjs/operators';
function shareForeverReplayRerun<T>(bufferSize: number, rerunAfter: number) {
let subject;
let subscription;
let hasError = false;
let isComplete = false;
let lastSubTime = 0;
return source => Observable.create((observer: Subscriber<T>) => {
if (!subject || hasError || (Date.now() - lastSubTime) >= rerunAfter) {
lastSubTime = Date.now();
hasError = false;
subject = new ReplaySubject<T>(bufferSize);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subject.complete();
},
});
}
const innerSub = subject.subscribe(observer);
// never unsubscribe from source
return () => {
innerSub.unsubscribe();
};
})
}
const source = of('Initial').pipe(
tap(()=>console.log('COMPUTE')),
delay(200),
shareReplayRerun(1, 1000),
);
source.subscribe(console.log.bind(null, 'syncI:'));
source.subscribe(console.log.bind(null, 'syncII:'));
setTimeout(()=>source.subscribe(console.log.bind(null, 'after500:')), 500);
setTimeout(()=>source.subscribe(console.log.bind(null, 'after900:')), 900);
setTimeout(()=>source.subscribe(console.log.bind(null, 'after1500:')), 1500);作为输出,我们有:
COMPUTE
syncI: Initial
syncII: Initial
after500: Initial
after900: Initial
COMPUTE
after1500:Initialhttps://stackoverflow.com/questions/52108244
复制相似问题