import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* 文件功能:zookeeper分布式锁
* 故事:5个线程(模拟分布式进程)争夺分布式锁
*/
public class CuratorLocksExample {
private static InterProcessLock distributeLock=null;
private static CountDownLatch count=new CountDownLatch(5);
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.64.128:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
distributeLock=new InterProcessMutex(client,"/mylock");
for(int i=0;i<5;i++){
new Thread(new MyHostThread("线程"+i,client,distributeLock,count)).start();
}
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("释放客户端,断开zk连接");
client.close();
}
}
}
class MyHostThread implements Runnable{
private String name;
private CuratorFramework client;
private InterProcessLock lock;
private CountDownLatch count;
public MyHostThread(String name,CuratorFramework client,InterProcessLock lock,CountDownLatch count){
this.name=name;
this.client=client;
this.lock=lock;
this.count=count;
}
@Override
public void run() {
System.out.println(this.name + "开始竞争锁");
try {
lock.acquire();//阻塞等待..
System.out.println(this.name + "获取到锁");
Thread.sleep(new Random().nextInt(2000));//模拟业务处理2s内
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
System.out.println(this.name + "业务处理完,释放锁");
lock.release();//释放锁
count.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
预期输出:
线程3开始竞争锁 线程0开始竞争锁 线程4开始竞争锁 线程1开始竞争锁 线程2开始竞争锁 线程0获取到锁 线程0业务处理完,释放锁 线程4获取到锁 线程4业务处理完,释放锁 线程1获取到锁 线程1业务处理完,释放锁 线程3获取到锁 线程3业务处理完,释放锁 线程2获取到锁 线程2业务处理完,释放锁 释放客户端,断开zk连接