在写并发代码来提升性能时,会遵循某些最佳写法,而不是只用基础的wait和notify来控制复杂的流程。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具,使用这些更高等的同步工具来编写代码,让我们的程序可以不费力气就得到优化。这些工具还在由一些优秀的工程师不断优化和完善,我们不必重复造轮子:
脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中我们先看工具类部分:
1.ReentrantLock
2.Condition
3.Semaphore
4.ReentrantReadWriteLock
5.CountDownLatch
6.CyclicBarrrer
1.1可重入
单线程可以重复进入,但必须重复退出。
public class ReentrantLock1 implements Runnable{
private static int a = 0;
ReentrantLock lock = new ReentrantLock();
public void run() {
for (int i = 0; i < 10000000; i++) {
//一个线程拿到几个许可 就得释放几次,不然就造成等待 可在命令行查看jps -->jstack 9016
lock.lock(); //获取锁
lock.lock(); //获取锁
try {
a++;
} finally {
lock.unlock(); //释放锁
//lock.unlock(); //释放锁
}
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLock1 rt = new ReentrantLock1();
Thread t1 = new Thread(rt);
Thread t2 = new Thread(rt);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(a);
}
}
一个线程拿到几个许可,就得释放几次,不然就造成线程等待,可在命令行查看等待的线程:jps -->jstack [ option ] pid
1.2可中断
长期线程在锁上等待问题,可以通过中断来解决:
public class ReentrantLock2 implements Runnable{
int lock ;
private static ReentrantLock lock2 = new ReentrantLock();
private static ReentrantLock lock1 = new ReentrantLock();
public ReentrantLock2(int lock){
this.lock=lock;
}
//两个线程控制加锁顺序,构造死锁现象.
//lock1去申请lock2,lock2会申请lock1,如果使用lock方法,不太有办法把它解开.
//使用lockInterruptibly加锁,可以使用中断导致线程正常结束
public void run() {
try {
if(lock==1){
//可中断的加锁.如果不加这个,只是简单的lock,是不会响应中断的
lock1.lockInterruptibly();
Thread.sleep(500);
lock2.lockInterruptibly();
}else{
lock2.lockInterruptibly();
Thread.sleep(500);
lock1.lockInterruptibly();
}
} catch (InterruptedException e) {
e.printStackTrace();
//实际业务中可以在中断后做一些补救措施....
}finally{
if(lock1.isHeldByCurrentThread()){//如果拿了这把锁
lock1.unlock();//解锁
}
if(lock2.isHeldByCurrentThread()){
lock2.unlock();
}
System.out.println(Thread.currentThread().getId()+":线程退出");
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLock2 rt1 = new ReentrantLock2(1);
ReentrantLock2 rt2 = new ReentrantLock2(2);
Thread t1 = new Thread(rt1);
Thread t2 = new Thread(rt2);
t1.start();t2.start();
Thread.sleep(1000);
//中断其中一个线程
//DeadLockCheckChecker.check();放开它就可以中断线程
}
}
class DeadLockCheckChecker {
private final static ThreadMXBean mbean=ManagementFactory.getThreadMXBean();
final static Runnable deadLockCheck=new Runnable() {
@Override
public void run() {
while (true){
long[] deadLockedThreadIds=mbean.findDeadlockedThreads();
if(deadLockedThreadIds!=null){
ThreadInfo[] threadInfos=mbean.getThreadInfo(deadLockedThreadIds);
for(Thread t: Thread.getAllStackTraces().keySet()){
for(int i=0;i<threadInfos.length;i++){
if(t.getId()==threadInfos[i].getThreadId()){//如果检查到了死锁
t.interrupt();//中断当前线程
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
};
public static void check(){
Thread thread=new Thread(deadLockCheck);
thread.setDaemon(true);//如果说整个程序都退出了,我就没有必要做死锁检查,所以要设置为守护线程
thread.start();
}
}
可在命令行查看死锁:jps -->jstack [ option ] pid
1.3可限时
可限时也是一个避免永久等待构成死锁的解决方法:
public class ReentrantLock3 implements Runnable{
ReentrantLock lock = new ReentrantLock();
public void run() {
try {
if(lock.tryLock(2,TimeUnit.SECONDS)){
Thread.sleep(3000);
}else{
System.out.println("申请锁失败");
}
}catch(InterruptedException e){
e.printStackTrace();
}finally {
if(lock.isHeldByCurrentThread()){//如果线程持有这把锁
lock.unlock(); //释放锁
}
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLock3 rt = new ReentrantLock3();
Thread t1 = new Thread(rt);
Thread t2 = new Thread(rt);
t1.start();t2.start();
}
}
1.4公平锁
先来的线程先得到锁。如果先来的线程一直拿不到锁,则会产生饥饿现象,公平锁虽然不会产生饥饿现象,因为产生排队问题,会导致程序效率差。通过阅读ReentrantLock的源码发现:默认是非公平的,如果传入true,则是公平锁:
Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/notify()。但是它与ReentrantLock结合使用,有await和signal与之对应.
利用Condition实现顺序执行:
public class ReentrantLockOfCondition {
volatile private static int nextPrintWho = 1;
private static ReentrantLock lock = new ReentrantLock();
final private static Condition conditionA = lock.newCondition();
final private static Condition conditionB = lock.newCondition();
final private static Condition conditionC = lock.newCondition();
public static void main(String[] args) {
Thread threadA = new Thread() {
public void run() {
try {
lock.lock();
while (nextPrintWho != 1) {
conditionA.await();
}
for (int i = 0; i < 3; i++) {
System.out.println("ThreadA " + (i + 1));
}
nextPrintWho = 2;
conditionB.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread threadB = new Thread() {
public void run() {
try {
lock.lock();
while (nextPrintWho != 2) {
conditionB.await();
}
for (int i = 0; i < 3; i++) {
System.out.println("ThreadB " + (i + 1));
}
nextPrintWho = 3;
conditionC.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread threadC = new Thread() {
public void run() {
try {
lock.lock();
while (nextPrintWho != 3) {
conditionC.await();
}
for (int i = 0; i < 3; i++) {
System.out.println("ThreadC " + (i + 1));
}
nextPrintWho = 1;
conditionA.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
};
Thread[] aArray = new Thread[5];
Thread[] bArray = new Thread[5];
Thread[] cArray = new Thread[5];
for (int i = 0; i < 5; i++) {
aArray[i] = new Thread(threadA);
bArray[i] = new Thread(threadB);
cArray[i] = new Thread(threadC);
aArray[i].start();
bArray[i].start();
cArray[i].start();
}
}
}
翻译为信号量,允许多个线程进入临界区。说白了就是一个广义上的锁,相当于共享锁。比如信号量中我可以给它指定10个许可,每一个许可可以分配给若干个线程(当然一个线程也可以拿多个许可),拿到许可线程可以执行,如果许可分发完了,后面的线程就和锁一样去做等待。换句话说,当信号量等于1的时候,就相当于一把锁。
比如早期做限流时,我们系统是8核cpu,设置同时请求任务为8个,超过8个,可以用信号量让线程等待来加以控制:
public class UseSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能8个线程同时访问
final Semaphore semp = new Semaphore(8);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " + NO);
//模拟实际业务逻辑
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
// 访问完后,释放
semp.release();
}
}
};
exec.execute(run);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//System.out.println(semp.getQueueLength());
// 退出线程池
exec.shutdown();
}
}
一般限流java用的不多,实际中一种思路是用redis,存用户的session到redis中,统计一分钟之内访问url的次数count,如果count大于额定次数,则是恶意攻击。redis里有个expried,每过60秒,把index清零,然后重新统计。现在互联网公司成熟的限流方案是使用:nginx + lua + redis 防刷和限流。
ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限, 它和后者都是单独的实现,彼此之间没有继承或实现的关系。读写锁可以很好的提高程序效率,如果读也加锁的话,每次只有一个线程能访问,不符合高并发程序设计。ReentrantLock和synchronized都属于阻塞的并行,会把线程挂起,而ReadWriteLock属于无等待的并发。
访问情况:读读共享,读写互斥,写写互斥
public class ReadWriteLockTest {
public static void main(String[] args) throws InterruptedException {
final Queue3 q3 = new Queue3();
Thread t1 = new Thread("t1"){
public void run() {
q3.get();
}
};
Thread t2 = new Thread("t2"){
public void run() {
q3.get();
}
};
Thread t3 = new Thread("t3"){
public void run() {
q3.get();
}
};
Thread t4 = new Thread("t4"){
public void run() {
q3.put(new Random().nextInt(10000));
}
};
Thread t5 = new Thread("t5"){
public void run() {
q3.put(new Random().nextInt(10000));
}
};
Thread t6 = new Thread("t6"){
public void run() {
q3.put(new Random().nextInt(10000));
}
};
t4.start();t5.start();t6.start();
t4.join();t5.join();t6.join();
t1.start();t2.start();t3.start();
}
}
class Queue3 {
private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public void get() {
rwl.readLock().lock();// 上读锁,其他线程只能读不能写
System.out.println(Thread.currentThread().getName()
+ " be ready to read data!");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " have read data :" + data);
rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
}
public void put(Object data) {
rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
System.out.println(Thread.currentThread().getName()
+ " be ready to write data!");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName()
+ " have write data: " + data);
rwl.writeLock().unlock();// 释放写锁
}
}
实际开发中经常用于监听某些初始化操作,等待初始化完成后,通知主线程继续工作,它相当于一个栅栏
public class UseCountDownLatch {
public static void main(String[] args) {
//new CountDownLatch(2)表示await过后,得有两个countDown才能唤醒
final CountDownLatch countDown = new CountDownLatch(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入线程t1" + "等待其他线程处理完成...");
countDown.await();
System.out.println("t1线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t2线程进行初始化操作...");
Thread.sleep(3000);
System.out.println("t2线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("t3线程进行初始化操作...");
Thread.sleep(4000);
System.out.println("t3线程初始化完毕,通知t1线程继续...");
countDown.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
t3.start();
}
}
倒数计时器。假设有一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才能一起出发,只要有一个人没有准备好,大家都等待:
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(name + " 准备OK.");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " Go!!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3); // 3
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "张三")));
executor.submit(new Thread(new Runner(barrier, "李四")));
executor.submit(new Thread(new Runner(barrier, "王五")));
executor.shutdown();
}
}
最后,我们解决高并发可以从以下几个方面入手:1.Nginx网络带宽硬件方面等。2.业务细粒度化。3.限流。4.集群。5.异步。6.缓存。其中最主要的还是业务细粒度化。
系列:
【JDK并发包基础】线程池详解
【JDK并发包基础】并发容器详解
【JDK并发包基础】工具类详解
【JDK并发基础】Java内存模型详解