日常中会有开启多个线程去并发执行任务,而主线程要等所有子线程执行完之后才能运行的需求。之前我们是使用Thread.join方法来实现的,过程如下:
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread( () -> {
try {
Thread.sleep(1000);
System.out.println("t1 over");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread( () -> {
try {
Thread.sleep(2000);
System.out.println("t2 over");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("mian over");
}
t1 over
t2 over
mian over
join()方法不够灵活,现在JDK提供了CountDownLatch这个类来实现所需功能
private static CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
ExecutorService t = Executors.newCachedThreadPool();
Runnable r1 = () -> {
try {
System.out.println("r1 sleep");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
Runnable r2 = () -> {
try {
System.out.println("r2 sleep");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
};
t.submit(r1);
t.submit(r2);
System.out.println("main wait");
countDownLatch.await();
System.out.println("main over");
}
main wait
r1 sleep
r2 sleep
main over
CountDownLatch流程:
CountDownLatch优点:
CountDownLatch原理:
内部维护了一个计数器,当计数器为0就放行,源码就不放了,熟悉AQS的同学想想就知道怎么回事
满足多个线程都到达同一个位置后才全部开始运行的需求。CountDownLatch是一次性使用的,计数器为0后再次调用会直接返回,此时升级版的CyclicBarrier来了,其一可以满足计数器重置功能,且二还可以让一组线程达到一个状态后再全部同时执行
场景要求:假设一个任务分为3个阶段,每个线程要串行地从低阶段执行到高阶段
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,
() -> System.out.println("一个阶段完成"));
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
Runnable r1 = () -> {
try {
System.out.println(Thread.currentThread() + "Step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + "Step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + "Step3");
} catch (Exception e) {
e.printStackTrace();
}
};
Runnable r2 = () -> {
try {
System.out.println(Thread.currentThread() + "Step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + "Step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + "Step3");
} catch (Exception e) {
e.printStackTrace();
}
};
service.submit(r1);
service.submit(r2);
service.shutdown();
}
Thread[pool-1-thread-1,5,main]Step1
Thread[pool-1-thread-2,5,main]Step1
一个阶段完成
Thread[pool-1-thread-1,5,main]Step2
Thread[pool-1-thread-2,5,main]Step2
一个阶段完成
Thread[pool-1-thread-1,5,main]Step3
Thread[pool-1-thread-2,5,main]Step3
CyclicBarrier的流程
CyclicBarrier的原理
不同与前两者,Semaphore信号量内部计数器是递增的,在需要同步的地方调用acquire指定需要同步的个数即可
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
Runnable r1 = () -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "over");
semaphore.release();;
};
Runnable r2 = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "over");
semaphore.release();
};
service.submit(r1);
service.submit(r2);
semaphore.acquire(2);
System.out.println("All child thread over");
service.shutdown();
}
Semaphore的流程
Semaphore的原理