在org.apache.kafka.clients.consumer.internals
包内的RequestFuture类可用来定义异步流程,常用的addListener、compose作用如图所示:
addListener可以使一个流程添加到RequestFuture后
compose利用addListener,使其挂在RequestFuture<T>完成后的流程上。同时返回一个新创建的RequestFuture<S>。
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
// 创建了一个RequestFuture<S>并在方法结束时返回,但并没有调用其complete或raise方法。
final RequestFuture<S> adapted = new RequestFuture<>();
addListener(new RequestFutureListener<T>() {
@Override
public void onSuccess(T value) {
adapter.onSuccess(value, adapted); // 在用户实现的onSuccess中,可以完成adapted,也可以为它添加前置流程
}
@Override
public void onFailure(RuntimeException e) {
adapter.onFailure(e, adapted);
}
});
return adapted;
}
compose的效果如下图所示,加了listener2是出于严谨考虑,因为compose调用了addListener方法。
什么叫"完成RequestFuture<S>"? 比如下面的实现,在onSuccess中可以调用future.complete
什么叫"也可以在RequestFuture<S>前增添其它的异步流程"?这是第二种用法。 我们先看CoordinatorResponseHandler,onSuccess会调用handle接口。
再看它的一个实现类JoinGroupResponseHandler,调用onJoinLeader新创建了一个RequestFuture,并调用chain,将handle方法参数中的future接在了新建RequestFuture的流程后面。这样,我们就为future添加了前置流程
以上两种用法图示就相当于下图: