文件系统+通知机制
将conf/目录下的zoo_sample.cfg修改为zoo.cfg,修改dataDir路径为zkData。在zkData目录下创建myid文件,输入唯一的int型集群编号。
bin/zkServer.sh start
bin/zkServer.sh status # 集群模式必须启动半数以上才能看到状态
bin/zkServer.sh stop
bin/zkCli.sh
quit
jps
tickTime:心跳(ms)
initLimit:Leader和Follower在集群启动时之间通信时间(initLimit * tickTime)
syncLimit:Leader和Follower在集群启动后的通信(syncLimit * tickTime)
dataDir:数据存放目录
clientPort
# 集群模式在末尾添加如下。
# A表示第几号服务器;B表示该服务器的ip地址;C表示该服务器与集群中的Leader服务器交换信息的端口;D表示用来重新选举的端口。
server.${A}=${B}:${C}:${D}
(2)选举机制 服务器都只投给自己,服务器编号大的优胜(权重大),已有Leader之后的大编号机器不能占用Leader位置。
Serverid:服务器ID
Zxid:数据ID,服务器中存放的最大数据ID
Epoch:逻辑时钟,或投票的次数,每次投完增加。
Server选举状态:
LOOKING,竞选;
FOLLOWING,随从状态,同步Leader,参与投票;
OBSERVING,观察状态,同步Leader,不参与投票;
LEADING,领导者状态。
短暂(Ephemeral):CS断连后,创建的节点自行删除;节点下线。 Ephemeral_sequential。
(2)ctime znode被创建的毫秒数,从1970年开始。
(3)mzxid znode最后更新的事务zxid。
(4)mtime znode最后修改的毫秒数,从1970年开始。
(5)pZxid znode最后更新的子节点zxid。
(6)cversion znode子节点变化号,znode子节点修改的次数。
(7)dataversion znode数据变化号。
(8)acIVersion 访问控制列表的变化号。
(9)ephemeralOwner 如果是临时节点,该znode拥有着的session id;非临时节点,值为0。
(10)dataLength znode的数据长度。
(11)numChildren znode子节点数量。
# 启动客户端后
bin/zkCli.sh
help
ls / # 查看当前结点所包含的内容
ls2 / # 查看当前结点详细数据
# 创建节点,但必须要写入数据才会注册
create /node01 "message..."
create /node01/cnode01 "node01 cnode01 message..."
# 查看相应目录下的内容
get /node01
# 创建短暂节点,CS断连即删除
create -e /node01/cnode02 "node01 cnode02 message..."
# 创建带有序号的节点,显示创建/node01/cnode03 node03 cnode03 message...0000000001
create -s /node01/cnode03 "node01 cnode03 message..."
# 修改节点数据值
set /node01/cnode01 "node01 cnode01 new message..."
# 节点的值变化的监听
get /node01/cnode01 watch
# 节点的子节点变化的监听
ls /node01 watch
# 删除和递归删除
delete /node01/cnode01
rmr /node01
# 查看节点状态
stat /node01
(1)常用API
<!--zookeeper-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
public class TestZookeeper {
private String connectString = "192.168.73.128:2181";
private int sessionTimeout = 20000;
private ZooKeeper zkClient;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 持续监听动作
WatchNode();
}
});
}
/**
* 1.创建节点
* 查看防火墙状态:systemctl status firewalld.service
* 临时关闭防火墙:systemctl stop firewalld.service
* 彻底关闭防火墙:systemctl disable firewalld.service
*/
@Test
public void createNode() throws KeeperException, InterruptedException {
String path = zkClient.create("/node01", "12".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
/**
* 2.获取子节点,并监控节点的变化
*/
@Test
public void getDataAndWatch() throws InterruptedException {
System.out.println("start watch Node change...");
// 配合init()中的process函数,监听1min
Thread.sleep(1000*60);
System.out.println("stop watch Node change.");
}
private void WatchNode(){
List<String> children;
try {
children = zkClient.getChildren("/", true);
System.out.println("--------start--------");
for (String child : children) {
System.out.println(child);
}
System.out.println("---------end---------");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 3.判断节点是否存在
*/
@Test
public void exist() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/node01", false);
System.out.println(exists == null ? "not exist!" : "exist.");
}
/**
* 4.客户端能实时洞察到服务器上下线的变化
*/
}
(2)服务器动态上下线案例 服务器和客户端对ZK集群来说均是客户端。 理解:创建节点是暂时的带序号的类型(ephemeral_sequence),因为下线就要删除注册信息(主机名称,通过参数传递),序号要递增。
public class DistributeClient {
private ZooKeeper zkClient;
private String connectString = "192.168.73.128:2181";
private int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeClient client = new DistributeClient();
// 1.连接zk集群
client.getConnect();
// 2.注册节点
server.register(args[0]);
// 2*.注册监听
client.getChildren();
// 3.业务逻辑处理
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getChildren() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/servers", true);
ArrayList<String> host = new ArrayList<>();
for (String child : children) {
byte[] data = zkClient.getData("/servers" + child, false, null);
host.add(new String(data));
}
System.out.println(host);
}
private void register(String homename) throws KeeperException, InterruptedException {
String path = zkClient.create("/servers/server", homename.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(homename + "is online.");
}
private void getConnect() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}