我正在尝试实验RxJava可观测和观察者代码。我的目标是检查底层源接收新数据值时是如何工作的。我的代码是:
List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
@Override
public void run() {
int i = 100;
while(i < 110) {
numbers.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
};
numbers.add(0);
numbers.add(1);
numbers.add(2);
Observable.fromIterable(numbers)
.observeOn(Schedulers.io())
.subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
e -> e.printStackTrace());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t = new Thread(r);
t.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
所以我有一个号码列表。然后,我有一个runnable,它在这个列表中添加了新的数字,在添加之间有一个时间间隔。我还没开始织线呢。我将0,1,2添加到列表中,然后用它创建一个可观察的,从池中调度线程上的观察者,最后订阅可观察到的线程。当订阅发生时,可观察的发出值0,1,2和观察者被调用(执行传递给订阅的lambda)。然后在主线程上引入1秒的延迟,然后使用前面创建的runnable生成一个新线程,并添加最后一个延迟,这样应用程序就不会立即退出。
我期望的是,当新的数字被添加到列表中时,必须调用观察者,从而打印消息。但这是不可能的。在我的理解上,我肯定弄错了。我是否也需要将可观察性放在调度程序上?
发布于 2021-01-09 04:20:55
Observable.fromIterable()
方法是每次构建订阅时可观察到的值的“一次”加载。在构建订阅之后发生的事情不再有影响了。当您使用带有subscribe(onNext, onError, onComplete)
参数的onComplete
方法时,您将看到订阅已经完全消耗,三个初始值已经打印出来。
您可以使用Subject
(类似于PublishSubject
),其中使用onNext()
方法添加“新值”,而之前构建的订阅仍然处于活动状态(尚未完成)。这样,您就可以先构建订阅,然后继续为主题中的新值调用onNext()
,直到完成并调用onCompleted()
。
https://stackoverflow.com/questions/65640038
复制