前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaConsumer RequestFuture异步流程控制

KafkaConsumer RequestFuture异步流程控制

作者头像
平凡的学生族
发布2020-07-02 11:17:00
5080
发布2020-07-02 11:17:00
举报
文章被收录于专栏:后端技术

org.apache.kafka.clients.consumer.internals包内的RequestFuture类可用来定义异步流程,常用的addListener、compose作用如图所示:

addListener

addListener可以使一个流程添加到RequestFuture后

compose

compose利用addListener,使其挂在RequestFuture<T>完成后的流程上。同时返回一个新创建的RequestFuture<S>。

  • 用户需要实现RequestFutureAdapter的接口onSucess/onFailure,进一步加工这个异步流程。当onSucess/onFailure被调用时,上游流程已经完成,那么在你实现的方法中:
    • 既可以选择完成RequestFuture<S>
    • 又可以在RequestFuture<S>前增添其它的异步流程。
    • 我们将在下文举例说明这两种用法。
代码语言:javascript
复制
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        // 创建了一个RequestFuture<S>并在方法结束时返回,但并没有调用其complete或raise方法。
        final RequestFuture<S> adapted = new RequestFuture<>();
        addListener(new RequestFutureListener<T>() {
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);  // 在用户实现的onSuccess中,可以完成adapted,也可以为它添加前置流程
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

compose的效果如下图所示,加了listener2是出于严谨考虑,因为compose调用了addListener方法。


什么叫"完成RequestFuture<S>"? 比如下面的实现,在onSuccess中可以调用future.complete


什么叫"也可以在RequestFuture<S>前增添其它的异步流程"?这是第二种用法。 我们先看CoordinatorResponseHandler,onSuccess会调用handle接口。

再看它的一个实现类JoinGroupResponseHandler,调用onJoinLeader新创建了一个RequestFuture,并调用chain,将handle方法参数中的future接在了新建RequestFuture的流程后面。这样,我们就为future添加了前置流程

以上两种用法图示就相当于下图:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • addListener
  • compose
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档