前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >CompletableFuture在RocketMQ中的使用实战!

CompletableFuture在RocketMQ中的使用实战!

原创
作者头像
程序员蜗牛
发布2024-02-18 09:12:24
发布2024-02-18 09:12:24
15300
代码可运行
举报
运行总次数:0
代码可运行

今天想跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,

最后呢我会结合RocketMQ源码分析一下CompletableFuture的使用。

Future接口以及它的局限性

代码语言:javascript
代码运行次数:0
复制
 FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
 new Thread(futureTask).start();
 System.out.println(futureTask.get());

或者使用线程池的方式

代码语言:javascript
代码运行次数:0
复制
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性
图片
图片
代码语言:javascript
代码运行次数:0
复制
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {
  //任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();

什么是CompletableFuture?

CompletableFuture常见api详解

CompletableFuture的方法api多,但主要可以分为以下几类。

1、实例化CompletableFuture

构造方法创建
代码语言:javascript
代码运行次数:0
复制
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

代码语言:javascript
代码运行次数:0
复制
completableFuture.complete("三友")
静态方法创建

除了使用构造方法构造,CompletableFuture还提供了静态方法来创建

代码语言:javascript
代码运行次数:0
复制
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
图片
图片
代码语言:javascript
代码运行次数:0
复制
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());

2、获取任务执行结果

代码语言:javascript
代码运行次数:0
复制
public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

3、主动触发任务完成

代码语言:javascript
代码运行次数:0
复制
public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

4、对任务执行结果进行下一步处理

只能接收任务正常执行后的回调
代码语言:javascript
代码运行次数:0
复制
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
代码语言:javascript
代码运行次数:0
复制
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10)
                .thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

代码语言:javascript
代码运行次数:0
复制
上一步的执行的结果为:10

thenRun示例:

代码语言:javascript
代码运行次数:0
复制
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
      .thenRun(() -> System.out.println("上一步执行完成"));

执行结果:

代码语言:javascript
代码运行次数:0
复制
上一步执行完成

thenAccept示例:

代码语言:javascript
代码运行次数:0
复制
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10)
      .thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));

执行结果:

代码语言:javascript
代码运行次数:0
复制
上一步执行完成,结果为:10

thenApply有异常示例:

代码语言:javascript
代码运行次数:0
复制
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    //模拟异常
    int i = 1 / 0;
    return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

代码语言:javascript
代码运行次数:0
复制
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调
代码语言:javascript
代码运行次数:0
复制
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
代码语言:javascript
代码运行次数:0
复制
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    return 100;
}).exceptionally(e -> {
    System.out.println("出现异常了,返回默认值");
    return 110;
});
System.out.println(completableFuture.join());

执行结果:

代码语言:javascript
代码运行次数:0
复制
100

有异常情况下:

代码语言:javascript
代码运行次数:0
复制
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
    int i = 1 / 0;
    return 100;
}).exceptionally(e -> {
    System.out.println("出现异常了,返回默认值");
    return 110;
});
System.out.println(completableFuture.join());

执行结果:

代码语言:javascript
代码运行次数:0
复制
出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
代码语言:javascript
代码运行次数:0
复制
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);
代码语言:javascript
代码运行次数:0
复制
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
      int i = 1 / 0;
      return 10;
}).whenComplete((r, e) -> {
      System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());

执行结果:

代码语言:javascript
代码运行次数:0
复制
whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

代码语言:javascript
代码运行次数:0
复制
public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

thenCombine的例子请往下继续看。

6、以Async结尾的方法

上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:

代码语言:javascript
代码运行次数:0
复制
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。

持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。

当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

实现代码如下

消息存储刷盘任务和主从复制任务:

代码语言:javascript
代码运行次数:0
复制
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    // 当两个刷盘和主从复制任务都完成的时候,就会回调
    // 如果刷盘没有成功,那么就将消息存储的状态设置为失败
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    // 如果主从复制没有成功,那么就将消息存储的状态设置为失败
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    // 最终返回消息存储的结果
    return putMessageResult;
});

对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:

代码语言:javascript
代码运行次数:0
复制
//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

//监听消息存储的结果
putResultFuture.thenAccept((result) -> {
    // 消息存储完成之后会回调
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Future接口以及它的局限性
    • Future接口的局限性
  • 什么是CompletableFuture?
  • CompletableFuture常见api详解
    • 1、实例化CompletableFuture
      • 构造方法创建
      • 静态方法创建
    • 2、获取任务执行结果
    • 3、主动触发任务完成
    • 4、对任务执行结果进行下一步处理
      • 只能接收任务正常执行后的回调
      • 只能接收任务处理异常后的回调
      • 能同时接收任务执行正常和异常的回调
    • 5、对任务结果进行合并
    • 6、以Async结尾的方法
  • CompletableFuture在RocketMQ中的使用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档