上一篇说了ZK是什么以及能干什么,今儿这篇就来用ZK实现分布式锁,分别用java原生的zookeeper客户端、ZKClient实现。
一、分布式锁
分布式锁的思路是每个客户端都在某个目录下注册一个临时有序节点,每次最小的节点会获取锁,当前节点会去监听上一个较小节点,如果较小节点失效之后,就会去获取锁。
java原生zookeeper客户端
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZookeeperClient {
//zk集群地址
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
//计数器,用于等待连接成功
public static CountDownLatch countDownLatch = new CountDownLatch(1);
//连接超时时间
public static final int SESSION_TIMEOUT = 5000;
//用volatile修饰单例,防止赋值时发生指令重排
private volatile static ZooKeeper instance;
//用Double check获取单例
public static ZooKeeper getInstance() throws IOException, InterruptedException {
if (instance == null ){
synchronized (ZookeeperClient.class) {
if (instance == null) {
//连接时注册一个监听,监听连接状态变化
instance = new ZooKeeper(ZOOKEEPER_CONNECT, SESSION_TIMEOUT, new Watcher() {
//监听回调方法
@Override
public void process(WatchedEvent watchedEvent) {
//当连接状态变成connected,就说明连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
});
//等待连接成功
countDownLatch.await();
}
}
}
return instance;
}
public static int getSessionTimeout() {
return SESSION_TIMEOUT;
}
}
上述代码中的CountDownLatch是因为连接时会耗时较长,所以需要添加一个计数器进行阻塞,否则会在connecting阶段就被释放了。
(3)创建分布式锁客户端
public class DistibutedLock {
//根目录,客户端都会去此目录下创建临时有序子节点
private final String ROOT_PATH = "/lock";
//客户端
private ZooKeeper zookeeper;
//session超时时间
private int SESSION_TIMEOUT;
//当前客户端创建有序节点的名称
private String lockId;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public DistibutedLock() throws IOException, InterruptedException {
this.zookeeper =ZookeeperClient.getInstance();
this.SESSION_TIMEOUT = ZookeeperClient.getSessionTimeout();
}
public boolean lock(){
try {
//创建临时有序子节点
lockId = zookeeper.create(ROOT_PATH+"/","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"创建节点"+lockId+",开始竞争锁");
//获取/lock目录下所有子节点
List<String> children = zookeeper.getChildren(ROOT_PATH, true);
//用SortedSet对子节点从小到大进行排序
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
//获取最小节点名称
String first = sortedSet.first();
//如果当前创建节点就是最小节点,则获取锁
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName()+"获取锁"+lockId);
return true;
}
//获取比当前id小的节点集合
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (!frontSet.isEmpty()) {
//取集合中最后一个元素,也就是临近最小节点
String last = frontSet.last();
System.out.println(lockId+"监听"+last);
//当前节点去监听上一个节点,当上一个节点被删除的时候
//当前节点就可以获取锁
zookeeper.exists(last, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
countDownLatch.countDown();
}
}
});
countDownLatch.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
//释放锁
public boolean unLock(){
try {
System.out.println(Thread.currentThread().getName() + "开始删除锁" + lockId);
//删除当前节点
zookeeper.delete(lockId, -1);
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}
}
(4)测试代码
//等待器,当所有线程都执行到某个步骤才停止阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
//模拟十个线程去获取锁
for (int i = 0; i < 10; i++) {
new Thread(()-> {
DistibutedLock lock = null;
try {
lock = new DistibutedLock();
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:按照创建顺序去获取锁
ZKClient
(1)引入jar包
(2)创建ZK客户端连接单例
public class ZKClientInstance {
public static final String ZOOKEEPER_CONNECT="192.168.197.100:2181,192.168.197.110:2181,192.168.197.120:2181";
private volatile static ZkClient instance;
public static ZkClient getInstance(){
if (instance == null) {
synchronized (ZKClientInstance.class) {
if (instance == null) {
instance = new ZkClient(ZOOKEEPER_CONNECT,5000,
5000,new SerializableSerializer());
}
}
}
return instance;
}
}
(3)创建分布式锁客户端
public class ZKClientDisLock {
private static final String ROOT_PATH = "/lock";
private ZkClient zkClient;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private String lockId;
public ZKClientDisLock(ZkClient zkClient) {
this.zkClient = zkClient;
}
public boolean lock(){
lockId = zkClient.createEphemeralSequential(ROOT_PATH + "/", "123");
List<String> children = zkClient.getChildren(ROOT_PATH);
SortedSet<String> sortedSet = new TreeSet<String>();
for (String child : children) {
sortedSet.add(ROOT_PATH+"/"+child);
}
String first = sortedSet.first();
if (first.equals(lockId)) {
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
return true;
}
SortedSet<String> frontSet = sortedSet.headSet(lockId);
if (null != frontSet && frontSet.size() > 0) {
String last = frontSet.last();
IZkDataListener iZkDataListener = null;
try {
System.out.println(lockId + "监听" + last + "节点变化");
iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(last, iZkDataListener);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "获取锁" + lockId);
} catch (Exception e) {
}finally {
zkClient.unsubscribeDataChanges(last,iZkDataListener);
}
return true;
}
return false;
}
public void unLock(){
System.out.println(Thread.currentThread().getName()+ "释放锁"+ lockId + "-----");
zkClient.delete(lockId);
}
}
(3)测试代码
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(()-> {
ZKClientDisLock lock = null;
try {
lock = new ZKClientDisLock(ZKClientInstance.getInstance());
cyclicBarrier.await();
lock.lock();
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} finally {
if(lock!=null){
lock.unLock();
}
}
}).start();
}
运行结果:
上述就是用java原生api以及ZKClient实现的分布式锁。
还有一种是用apache-curator实现,其可以实现可重入锁、排它锁、读写锁。之后有机会介绍curator的使用方法。