首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Rxjava2和Retrofit2对不同线程的多个/并行/同时api(post)调用

Rxjava2和Retrofit2对不同线程的多个/并行/同时api(post)调用
EN

Stack Overflow用户
提问于 2017-04-03 08:35:14
回答 3查看 1.4K关注 0票数 0

下面有下面的代码,我试图使用Retrofit2 + RxJava2实现多个连续的api调用

代码语言:javascript
运行
复制
@Override
        public void onClick(View v) {

            count++;
            request.setName("rober");
            request.setVarryingValue(count);

            mApiService.apiService()
                    .getAccessToken(<params>)
                    .subscribeOn(Schedulers.newThread())
                    .flatMap(new Function<Auth, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Auth authentication) throws Exception {

                            Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
                                    .subscribeOn(Schedulers.io());
                            postObservable.subscribe(new Observer<Void>() {

                                @Override
                                public void onSubscribe(Disposable d) {}

                                @Override
                                public void onNext(Void value) {
                                    Log.e("Thread", " Thread : " + Thread.currentThread());
                                }

                                @Override
                                public void onError(Throwable e) {
                                    e.printStackTrace();
                                }

                                @Override
                                public void onComplete() {}
                            });

                            return postObservable;
                        }
                    }).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(Object value) {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onComplete() {}
            });
        }
    });

我的期望是,对于每一次单击,都向一个新的/不同的线程发出一个调用,以执行一个特定的api POST调用,但是im具有来自内部api调用的这些post值。

代码语言:javascript
运行
复制
{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

似乎它只执行最后一次调用,我了解到在Rx中没有确切的“并行”调用,因为如果这样做,它将违反所有的反应原则,但是有很多解决办法,他们说,现在我试图用我发布的代码实现“并行”调用,但没有运气:(我需要一些帮助,

任何帮助都将不胜感激。谢谢!

编辑:这个过程应该是这样的: 1.先获得auth令牌,2.在一个成功的auth令牌之后,继续到api POST调用

数字2依赖于数字1

1&2总是在单击事件上执行。

编辑:我发布了一张图片来清楚地说明它

  • 我有两个来源(这可以是3个或更多)
  • 这些源来自单击事件。
  • 每个源将处理不同的网络呼叫。
  • 我试图在不同的线程中执行它(使它是异步的)
  • 每个文件都会在服务器上发布不同的值。

  • 但它似乎只处理最后一个事件(单击)。
  • 使用上面的图像,我期待的职位A和B将并行/不同地发生。
  • 但它只执行B项(实际上是2项B员额)
  • 我希望它能以不同的方式执行A&B
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-04-04 09:37:28

在发送第一个POST请求时,request的变化值已经是4,这是因为在您单击4次之后,第一个POST请求在那个时候将接受request (与4),而不是在设置其变化值时的request

解决方案是使request对象在onClick()方法中是本地的。

代码语言:javascript
运行
复制
public void onClick(View v) {
    RequestClass request = new RequestClass();
    request.setName("rober");
    request.setVarryingValue(count);

    //Your code
}

或者将count值赋值给一个临时变量,并在发送POST请求之前将其设置为您的request,但是要注意线程安全性。

代码语言:javascript
运行
复制
public void onClick(View v) {
    //Saving the value
    int temp = count++;
    request.setName("rober");

    mApiService.apiService()
            .getAccessToken(<params>)
            .subscribeOn(Schedulers.newThread())
            .flatMap(new Function<Auth, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Auth authentication) throws Exception {
                    //Set it to the request
                    //BE CAREFUL  because the `request` object is now being accessed from multiple threads
                    request.setVarryingValue(temp);

                    Observable<Void> postObservable = 
                        mApiService
                            .apiService()
                            .postCall(request, authentication.getAuth())
                            .subscribeOn(Schedulers.io())
                            .subscribe();

                    return postObservable;
                }
            })
            .subscribe();
}
票数 2
EN

Stack Overflow用户

发布于 2017-04-04 08:38:59

我不太清楚为什么所有的post请求都是同时完成的,包含onClick()定时和getAccessToken()请求的完整日志,以及onNext()排放可能会有所帮助。

您的代码似乎做得很好,您将为每个请求打开一个新线程。

但是,无论如何,由于日志上有4个打印,但所有打印的计数都是相同的,问题可能是因为request param是一个字段,因此在 getAccessToken()发出项后共享和访问了getAccessToken(),因此您基本上执行了4个请求,但数据相同。

您应该为每个新创建的flatMap()分配一个专用计数变量。

此外,在postObservable操作符中订阅flatMap()是错误的,您应该只返回它,流就会订阅它,并将它合并为onNext()排放。

使用您的代码,您将执行每个post请求两次,一个显式地由您执行,另一个由flatMap()操作符执行(这也可能解释相同请求的多个日志,但是如果不看到日志中的所有数据也很难判断)。

票数 1
EN

Stack Overflow用户

发布于 2017-04-05 03:17:53

基于你提出的两个答案,我想出了下面的代码,

  • 请求成为局部变量,而不是字段。
  • 删除在平面图内的订阅
  • 把它链接在外部可观测的onNext上 @覆盖公共无效onClick(视图v) { count++;请求请求=新请求();request.setName("rober");request.setVarryingValue(计数);mApiService.apiService() .getAccessToken() .subscribeOn(Schedulers.io()) .flatMap(新Function() {@覆盖公共ObservableSource apply(Auth身份验证))抛出异常{返回mApiService.apiService().postCall(request,authentication.getAuth());}) .subscribe(新的Observer() {@覆盖公共空onSubscribe(Disposable d) ) {} @覆盖公共空onNext(ObservableSource sourceFromFlatMap) {sourceFromFlatMap.subscribe(新的观察者(){@重写)公共无效onSubscribe(Disposable d) {} @覆盖公共空onNext(Void值) {} @覆盖公共空onError(Throwable ) {} @覆盖公共空onComplete() {}};}@覆盖公共空onError(Throwable ) {} @覆盖公共空onComplete(} });});

虽然我仍然不确定我是否以正确的方式使用Rx

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43179976

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档