在上一篇blogs中,我详细的解释了CompletionService
的使用方法和ExecutorCompletionService
的详细实现,这篇blogs中,我就介绍使用它的一个小技巧,算是对上一篇blogs的一个补完。在开始之前我们先回顾一下它的实现。
首先,在初始化ExecutorCompletionService
的时候我们需要传入一个Executor
,作为ExecutorCompletionService
执行任务的容器。
public ExecutorCompletionService(Executor executor) { [......]}public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { [......]} |
---|
然后,调用submit
方法,向它提交任务。submit
方法会将我们提交的任务包装成一个QueueingFuture
并提交给Executor
来执行。
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } |
---|
接着,QueueingFuture
会在任务执行完成后把执行结果放到队列中。
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task;} |
---|
最后,我们通过take
或者poll
方法就能拿到任务执行的结果。
下面让我们设想一个场景,我需要从网络上下载几张图片和视频并最后把它们渲染到页面上去,由于下载图片和视频都比较耗时,所以我希望能以多线程的形式进行下载。但是由于资源有限,下载的并发度不能太大,所以需要限制线程池的并发线程大小。但如果将可用线程数平均分给下载图片和下载视频的线程池,当某线程池的所有任务执行完成后,另外一个线程池也无法获取到它所释放的资源。那怎么办呢?我们可以创建一个统一的线程池,然后把两个CompletionService绑定上去,让CompletionService作为一个句柄来使用。
final ExecutorService pool = Executors.newFixedThreadPool(5);final ExecutorCompletionService<Image> imageCompletionService = new ExecutorCompletionService<>(pool);for (final String site : imageSites) { completionService.submit(new Callable<Image>() { @Override public String call() throws Exception { return IOUtils.toString(new URL("http://" + site), StandardCharsets.UTF_8); } });}final ExecutorCompletionService<Video> vidoeCompletionService = new ExecutorCompletionService<>(pool);for (final String site : videoSites) { completionService.submit(new Callable<Video>() { @Override public String call() throws Exception { return IOUtils.toString(new URL("http://" + site), StandardCharsets.UTF_8); } });}List<Image> images = new ArrayList<>();for(int i = 0; i < topSites.size(); ++i) { final Future<String> future = completionService.take(); try { images.add(future.get()); } catch (ExecutionException e) { log.warn("Error while downloading", e.getCause()); }}List<Video> videos = new ArrayList<>();for(int i = 0; i < topSites.size(); ++i) { final Future<String> future = completionService.take(); try { videos.add(future.get()); } catch (ExecutionException e) { log.warn("Error while downloading", e.getCause()); }}// ... do process content |
---|