首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJS -如何共享昂贵的可观察对象的输出,但如果在N秒后再次请求该可观察对象,如何重新运行该可观察对象?

RxJS -如何共享昂贵的可观察对象的输出,但如果在N秒后再次请求该可观察对象,如何重新运行该可观察对象?
EN

Stack Overflow用户
提问于 2018-08-31 11:57:13
回答 3查看 1.3K关注 0票数 6

假设我们有这个全局常量:

代码语言:javascript
复制
const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
    .pipe(throttleTime(1000), shareReply(1));

页面加载后,多个组件将同时订阅此内容:

代码语言:javascript
复制
isSignedIn.subscribe(() => console.log('do 1st'));
isSignedIn.subscribe(() => console.log('do 2nd'));
isSignedIn.subscribe(() => console.log('do 3rd'));

上面只会调用一次API,但是如果另一个组件订阅了它,我需要它再次调用API (即1秒后)。

代码语言:javascript
复制
isSignedIn.subscribe(() => console.log('button press'));

如何使用RxJS实现这一点?

EN

回答 3

Stack Overflow用户

发布于 2019-01-10 10:59:37

我想这就是你想要的:

一个pipeable操作符(在某个地方全局声明并导入)

代码语言:javascript
复制
 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                             source.pipe(
                                   repeatWhen(obs => obs.pipe(delay(duration))),
                                   publishReplay(1), 
                                   refCount());

然后像这样使用它:

代码语言:javascript
复制
data$ = fetch('/api/is-signed-in').pipe(refreshAfter(5000));   // refresh after 5000 ms

注意:您实际上要求这样做:

如果另一个组件订阅了它,我需要它再次调用

(即1秒后)。

我不太确定这是你真正想说的。我想你真正的意思是--你希望在一个过期时间之后,当前订阅的所有组件的数据都被刷新。无论如何,我的回答都会将新值发送给所有的监听器。如果你真的想要你最初说的东西,你需要添加一些替代的重复触发器。

但如果这是一个全局常量-上面就是我在相同场景中使用的。

注意:我还没有实际测试重复项时对错误条件的处理,但我认为错误将传播到所有侦听器。

票数 1
EN

Stack Overflow用户

发布于 2018-08-31 16:07:29

编辑:答案是错误的。BufferSize是重放最后N个事件的时长。在此之后,流完成。

代码语言:javascript
复制
signature: shareReplay(
  bufferSize?: number,
  windowTime?: number,
  scheduler?: IIScheduler
):Observable

@param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
@param {Number} [windowTime=Number.MAX_VALUE] Maximum time length of the replay buffer in milliseconds.

尝试将1000作为第二个参数添加到shareReply:

代码语言:javascript
复制
const isSignedIn = fromPromise(fetch('/api/is-signed-in'))
    .pipe(throttleTime(1000), shareReplay(1, 1000));

shareReplay.ts -注意refCount-- on unsubcribe,因为它可以触发额外的请求。

票数 0
EN

Stack Overflow用户

发布于 2018-09-01 00:48:49

如果我们重新实现ShareReplay,那么它:

  • 永远不会取消对源的订阅,即使它没有更多的订阅者(删除refCount,潜在的内存泄漏)。
  • 接受rerunAfter参数,即从上次subscribe到源经过的时间。

代码语言:javascript
复制
import {Subject, of, Observable, ReplaySubject, Subscriber} from 'rxjs';
import {pluck, shareReplay, tap, delay} from 'rxjs/operators';

function shareForeverReplayRerun<T>(bufferSize: number, rerunAfter: number) {
  let subject;
  let subscription;
  let hasError = false;
  let isComplete = false;
  let lastSubTime = 0;

  return source => Observable.create((observer: Subscriber<T>) => {
    if (!subject || hasError || (Date.now() - lastSubTime) >= rerunAfter) {

      lastSubTime = Date.now();
      hasError = false;
      subject = new ReplaySubject<T>(bufferSize);
      subscription = source.subscribe({
        next(value) { subject.next(value); },
        error(err) {
          hasError = true;
          subject.error(err);
        },
        complete() {
          isComplete = true;
          subject.complete();
        },
      });
    }

    const innerSub = subject.subscribe(observer);
    // never unsubscribe from source
    return () => {
      innerSub.unsubscribe();
    };
  })
}





const source = of('Initial').pipe(
  tap(()=>console.log('COMPUTE')),
  delay(200),
  shareReplayRerun(1, 1000),
);

source.subscribe(console.log.bind(null, 'syncI:'));
source.subscribe(console.log.bind(null, 'syncII:'));

setTimeout(()=>source.subscribe(console.log.bind(null, 'after500:')), 500);

setTimeout(()=>source.subscribe(console.log.bind(null, 'after900:')), 900);

setTimeout(()=>source.subscribe(console.log.bind(null, 'after1500:')), 1500);

作为输出,我们有:

代码语言:javascript
复制
COMPUTE
syncI:    Initial
syncII:   Initial
after500: Initial
after900: Initial
COMPUTE
after1500:Initial
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52108244

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档