本文主要介绍Java 8 中的异步处理的方式,主要是 CompletableFuture类的一些特性。 为了展示CompletableFuture的强大特性,我们会创建一个名为“最佳价格查询器” (best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public class Shop {
private final String name;
private final Random random;
public Shop(String name) {
this.name = name;
random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
}
/**
* 获取产品价格的同步方法
* @param product 产品名称
* @return 产品价格
*/
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
//一个模拟的延迟方法
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
public static void delay() {
int delay = 1000;
//int delay = 500 + RANDOM.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public String getName() {
return name;
}
}
很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对 网络中的所有商店都要重复这种操作。在本文的下个小节中,你会了解如何以异步方式使用同 步API解决这个问题。
我们使用新的CompletableFuture类来将getPrice
方法转换为异步的getPriceAsync
方法。
/**
* 异步的获取产品价格
*
* @param product 产品名
* @return 最终价格
*/
public Future<Double> getPriceAsync(String product) {
//创建CompletableFuture 对象,它会包含计算的结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
//在另一个线程中以异步方式执行计算
new Thread(() -> {
double price = calculatePrice(product);
//需长时间计算的任务结 束并得出结果时,设置 Future的返回值
futurePrice.complete(price);
}).start();
// 无需等待还没结束的计算,直接返回Future对象
return futurePrice;
}
在这段代码中,你创建了一个代表异步计算的CompletableFuture对象实例,它在计算完 成时会包含计算的结果。 使用这个API的客户端,可以通过下面的这段 代码对其进行调用。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ShopMain {
public static void main(String[] args) {
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
//查询商店,试图 取得商品的价格
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime
+ " msecs");
// 执行更多任务,比如查询其他商店
doSomethingElse();
// 在计算商品价格的同时
try {
//从Future对象中读 取价格,如果价格 未知,会发生阻塞
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
}
private static void doSomethingElse() {
System.out.println("Doing something else...");
}
}
Output: Invocation returned after 43 msecs Price is 123.26 Price returned after 1045 msecs
你会发现getPriceAsync方法的调用返回远远早于最终价格计算完成的时间。接下来我们看看如何正确地管理 异步任务执行过程中可能出现的错误。
如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误 会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制 在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结 果的客户端永久地被阻塞。 解决这种问题的方法有两种:
为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用 CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问 题的异常抛出。代码如下所示:
/**
* 抛出CompletableFuture内的异常版本的getPriceAsyncForException方法
*
* @param product 产品名
* @return 最终价格
*/
public Future<Double> getPriceAsyncForException(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
//如果价格计算正常结束,完成Future操作并设置商品价格
futurePrice.complete(price);
} catch (Exception ex) {
//否则就抛出导致失败的异常,完成这 次Future操作
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}
如果该方法抛出了一个运 行时异常“product not available”,客户端就会得到像下面这样一段ExecutionException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: product not available
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2237)
at lambdasinaction.chap11.AsyncShopClient.main(AsyncShopClient.java:14)
... 5 more
Caused by: java.lang.RuntimeException: product not available
at lambdasinaction.chap11.AsyncShop.calculatePrice(AsyncShop.java:36)
at lambdasinaction.chap11.AsyncShop.lambda$getPrice$0(AsyncShop.java:23) at lambdasinaction.chap11.AsyncShop$$Lambda$1/24071475.run(Unknown Source) at java.lang.Thread.run(Thread.java:744)
目前为止我们已经了解了如何通过编程创建CompletableFuture对象以及如何获取返回值了。