社区首页 >问答首页 >用棱角和rxjs重新连接websocket?

用棱角和rxjs重新连接websocket?
EN

Stack Overflow用户
提问于 2017-05-18 18:06:42
回答 3查看 30K关注 0票数 22

我有一个基于ngrx/store (v2.2.2)和rxjs (v5.1.0)的应用程序,它使用可观察的方法侦听web套接字输入的数据。当我启动应用程序时,我会完美地接收到传入的数据。

然而,过了一段时间(更新非常少见),连接似乎丢失了,我再也得不到传入的数据了。我的代码:

服务

代码语言:javascript
代码运行次数:0
复制
import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);
    }

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
        observer.next(evt);
      };
    })
      .map(res => res.data)
      .share();
  }
}

订户

代码语言:javascript
代码运行次数:0
复制
  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {
        console.log(JSON.stringify(ex));
      }
    })
  }
}

当网络套接字超时时,我需要做什么才能重新连接它,这样可观察到的节点就会继续发出传入值?

我看了一下一些Q&A hereherehere,它似乎没有解决这个问题(以我能理解的方式)。

注意:wss://notessensei.mybluemix.net/ws/time的websocket是实时的,每分钟发出一次时间戳(以防有人想要测试)。

非常感谢您的建议!

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-05-19 02:42:00

实际上,现在在rxjs中有一个WebsocketSubject!

代码语言:javascript
代码运行次数:0
复制
 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify({ op: 'hello' }));

当您重新订阅断开的连接时,它确实会处理重连接。因此,例如,编写这个来重新连接:

代码语言:javascript
代码运行次数:0
复制
subject.retry().subscribe(...)

有关更多信息,请参阅文档。不幸的是,搜索框没有显示该方法,但您可以在这里找到它:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

那个#-导航在我的浏览器中不起作用,所以在那个页面上搜索"webSocket“。

来源:http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

票数 36
EN

Stack Overflow用户

发布于 2021-07-06 18:49:09

对于rxjs 6实现

代码语言:javascript
代码运行次数:0
复制
import { webSocket } from 'rxjs/webSocket'
import { retry, RetryConfig } from "rxjs/operators";

const retryConfig: RetryConfig = {
  delay: 3000,
};

let subject = webSocket('ws://localhost:8081');
subject.pipe(
   retry(retryConfig) //support auto reconnect
).subscribe(...)
票数 4
EN

Stack Overflow用户

发布于 2017-05-18 22:45:01

这可能不是一个好的答案,但它的方式太多了,不能发表评论。

问题可能来自于您的服务:

代码语言:javascript
代码运行次数:0
复制
listenToTheSocket(): Observable<any> {
  this.websocket = new WebSocket(this.destination);

  this.websocket.onopen = () => {
    console.log("WebService Connected to " + this.destination);
  }

  return Observable.create(observer => {
    this.websocket.onmessage = (evt) => {
      observer.next(evt);
    };
  })
  .map(res => res.data)
  .share();
}

您认为您在组件中多次使用ngOnInit方法吗?

您应该尝试将一个console.log放入ngOnInit中以确保。

因为如果这样做,在您的服务中,您将使用一个新的this.websocket

你应该试试这样的方法:

代码语言:javascript
代码运行次数:0
复制
@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private websocketSubject$ = new BehaviorSubject<any>();
  private websocket$ = this.websocketSubject$.asObservable();

  private destination = 'wss://notessensei.mybluemix.net/ws/time';

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {
    if (this.websocket) {
      return this.websocket$;
    }

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => console.log(`WebService Connected to ${this.destination}`);

    this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
  }
}

如果BehaviorSubject在订阅事件之前收到一个事件,它将发送它的最后一个值。此外,由于它是一个主题,不需要使用share运算符。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44060315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文