下面有下面的代码,我试图使用Retrofit2 + RxJava2实现多个连续的api调用
@Override
public void onClick(View v) {
count++;
request.setName("rober");
request.setVarryingValue(count);
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io());
postObservable.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Void value) {
Log.e("Thread", " Thread : " + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {}
});
return postObservable;
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Object value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
}
});
我的期望是,对于每一次单击,都向一个新的/不同的线程发出一个调用,以执行一个特定的api POST调用,但是im具有来自内部api调用的这些post值。
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST
http://mydomain.test.post.server http/1.1
似乎它只执行最后一次调用,我了解到在Rx中没有确切的“并行”调用,因为如果这样做,它将违反所有的反应原则,但是有很多解决办法,他们说,现在我试图用我发布的代码实现“并行”调用,但没有运气:(我需要一些帮助,
任何帮助都将不胜感激。谢谢!
编辑:这个过程应该是这样的: 1.先获得auth令牌,2.在一个成功的auth令牌之后,继续到api POST调用
数字2依赖于数字1
1&2总是在单击事件上执行。
编辑:我发布了一张图片来清楚地说明它
发布于 2017-04-04 09:37:28
在发送第一个POST请求时,request
的变化值已经是4,这是因为在您单击4次之后,第一个POST请求在那个时候将接受request
(与4),而不是在设置其变化值时的request
。
解决方案是使request
对象在onClick()
方法中是本地的。
public void onClick(View v) {
RequestClass request = new RequestClass();
request.setName("rober");
request.setVarryingValue(count);
//Your code
}
或者将count
值赋值给一个临时变量,并在发送POST请求之前将其设置为您的request
,但是要注意线程安全性。
public void onClick(View v) {
//Saving the value
int temp = count++;
request.setName("rober");
mApiService.apiService()
.getAccessToken(<params>)
.subscribeOn(Schedulers.newThread())
.flatMap(new Function<Auth, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Auth authentication) throws Exception {
//Set it to the request
//BE CAREFUL because the `request` object is now being accessed from multiple threads
request.setVarryingValue(temp);
Observable<Void> postObservable =
mApiService
.apiService()
.postCall(request, authentication.getAuth())
.subscribeOn(Schedulers.io())
.subscribe();
return postObservable;
}
})
.subscribe();
}
发布于 2017-04-04 08:38:59
我不太清楚为什么所有的post请求都是同时完成的,包含onClick()
定时和getAccessToken()
请求的完整日志,以及onNext()
排放可能会有所帮助。
您的代码似乎做得很好,您将为每个请求打开一个新线程。
但是,无论如何,由于日志上有4个打印,但所有打印的计数都是相同的,问题可能是因为request
param是一个字段,因此在 getAccessToken()
发出项后共享和访问了getAccessToken()
,因此您基本上执行了4个请求,但数据相同。
您应该为每个新创建的flatMap()
分配一个专用计数变量。
此外,在postObservable
操作符中订阅flatMap()
是错误的,您应该只返回它,流就会订阅它,并将它合并为onNext()
排放。
使用您的代码,您将执行每个post请求两次,一个显式地由您执行,另一个由flatMap()
操作符执行(这也可能解释相同请求的多个日志,但是如果不看到日志中的所有数据也很难判断)。
发布于 2017-04-05 03:17:53
基于你提出的两个答案,我想出了下面的代码,
虽然我仍然不确定我是否以正确的方式使用Rx
https://stackoverflow.com/questions/43179976
复制相似问题