theme: awesome-green
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.3.12 | 文章首发 |
1.0.1 | 2020.3.16 | 改进部分大小写问题及形容方式 |
1.0.2 | 2020.3.21 | 改进可能会引起错误理解的部分 |
1.0.3 | 2020.3.29 | 修改标题 |
1.0.4 | 2020.4.18 | 改进小结部分 |
1.0.5 | 2020.6.26 | 更新部分部分解释,改进注释风格 |
1.0.6 | 2020.7.6 | 增加部分详细解释 |
在上篇文章中,我们简单提到了Zookeeper的几个核心点。在这篇文章中,我们就来探索其存储技术。在开始前,读者可以考虑思考下列问题:
抱着问题,我们进入下面的内容。
众所周知,Zookeeper不擅长大量数据的读写,因为:
seek
。而内存中的数据,也被称为ZkDatabase(Zk的内存数据库),由它来负责管理Zk的会话DataTree存储和事务日志,它也会定时向磁盘dump快照数据,在Zk启动时,也会通过事务日志和快照数据来恢复内存中的数据。
既然Zk的数据是在内存里的,那么它是如何解决数据持久化问题的呢?上一段我们已经提到了:即通过事务日志——WAL,在每次写请求前,都会根据目前的zxid来写log,将请求先记录到日志中。
接下来,我们来谈谈WAL的优化措施。
一般的WAL中每次写完END都要调用一次耗时的sync API,这其实是会影响到系统的性能。为了解决这个问题,我们可以一次提交多个数据写入——只在最后一个数据写入的END日志之后,才调用sync API。like this:
BEGIN Data1 END Sync
BEGIN Data2 END Sync
BEGIN Data3 END Sync
BEGIN Data1 END BEGIN Data2 END BEGIN Data3 END Sync
凡事都有代价,这可能会引起数据一致性相关的问题。
在往 WAL 里面追加日志的时候,如果当前的文件 block 不能保存新添加的日志,就要为文件分配新的 block,这要更新文件 inode 里面的信息(例如 size)。如果我们使用的是 HHD 的话,就要先 seek 到 inode 所在的位置,然后回到新添加 block 的位置进行日志追加,这些都是发生在写事务日志时,这会明显拖慢系统的性能。
为了减少这些 seek,我们可以预先为 WAL 分配 block。例如 ZooKeeper 当检测到当前事务日志文件不足4KB时,就会填充0使该文件到64MB(这里0仅仅作为填充位)。并新建一个64MB的文件。
所以这也是Zookeeper不擅长读写大数据的原因之一,这会引起大量的block分配。
如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。
除了解决启动时间过长的问题之外,快照还可以减少存储空间的使用。WAL 的多个日志条目有可能是对同一个数据的改动,通过快照,就可以只保留最新的数据改动(Merge)。
Zk的确采用了这个方案来做优化。还带来的一个好处是:在一个节点加入时,可以用最新的Snapshot传过去便于同步数据。
本节内容都以3.5.7版本为例
TxnLog是我们前面提到的事务日志。那么接下来我们就来看它的相关源码。
先看注释:
package org.apache.zookeeper.server.persistence;
import ...
/**
* This class implements the TxnLog interface. It provides api's
* to access the txnlogs and add entries to it.
* <p>
* The format of a Transactional log is as follows:
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
*
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
*
* TxnList:
* Txn || Txn TxnList
*
* Txn:
* checksum Txnlen TxnHeader Record 0x42
*
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>
*/
public class FileTxnLog implements TxnLog, Closeable {
在注释中,我们可以看到一个FileLog由三部分组成:
关于FileHeader,可以理解其为一个标示符。TxnList则为主要内容。ZeroPad是一个终结符。
我们来看看最典型的append方法,可以将其理解WAL过程中的核心方法:
/**
* append an entry to the transaction log
* @param hdr the header of the transaction
* @param txn the transaction part of the entry
* returns true iff something appended, otw false
*/
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) { //为null意味着这是一个读请求,直接返回
return false;
}
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream==null) { //为空的话则new一个Stream
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader"); //写file header
// Make sure that the magic number is written before padding.
logStream.flush(); // zxid必须比日志先落盘
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos); //加入需要Flush的队列
}
filePadding.padFile(fos.getChannel()); //确定是否要扩容。每次64m扩容
byte[] buf = Util.marshallTxnEntry(hdr, txn); //序列化写入
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm(); //生成butyArray的checkSum
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");//写入日志里
Util.writeTxnBytes(oa, buf);
return true;
}
这里有个zxid(ZooKeeper Transaction Id),有点像MySQL的GTID。每次对Zookeeper的状态的改变都会产生一个zxid,zxid是全局有序的,如果zxid1小于zxid2,则zxid1在zxid2之前发生。
简单分析一下写入过程:
这个方法被调用的时机大致有:
/**
* commit the logs. make sure that everything hits the
* disk
*/
public synchronized void commit() throws IOException {
if (logStream != null) {
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);//对应fdataSync
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
if(serverStats != null) {
serverStats.incrementFsyncThresholdExceedCount();
}
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
代码非常的简单。如果logStream还有,那就先刷下去。然后遍历待flush的队列(是个链表,用来保持操作顺序),同时还会关注写入的时间,如果过长,则会打一个Warn的日志。
DataTree是Zk的内存数据结构——就是我们之前说到的MTable。它以树状结构来组织DataNode。
这么听起来可能有点云里雾里,不妨直接看一下DataNode的相关代码。
public class DataNode implements Record {
/** the data for this datanode */
byte data[];
/**
* the acl map long for this datanode. the datatree has the map
*/
Long acl;
/**
* the stat for this node that is persisted to disk.
*/
public StatPersisted stat;
/**
* the list of children for this node. note that the list of children string
* does not contain the parent path -- just the last part of the path. This
* should be synchronized on except deserializing (for speed up issues).
*/
private Set<String> children = null;
.....
}
如果用过ZkClient的小伙伴,可能非常熟悉。这就是我们根据一个path获取数据时返回的相关属性——这就是用来描述存储数据的一个类。注意,DataNode还会维护它的Children。
简单了解DataNode后,我们来看一下DataTree。为了避免干扰,我们选出最关键的成员变量:
public class DataTree {
private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);
/**
* This hashtable provides a fast lookup to the datanodes. The tree is the
* source of truth and is where all the locking occurs
*/
private final ConcurrentHashMap<String, DataNode> nodes =
new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
/**
* This hashtable lists the paths of the ephemeral nodes of a session.
*/
private final Map<Long, HashSet<String>> ephemerals =
new ConcurrentHashMap<Long, HashSet<String>>();
.......
}
我们可以看到,DataTree本质上是通过一个ConcurrentHashMap来存储DataNode的(临时节点也是)。保存的是 DataNode 的 path 到 DataNode 的映射。
那为什么要保存两个状态呢?这得看调用它们被调用的场景:
如果需要获取所有节点的信息,显然遍历树会比一个个从ConcurrentHashMap 拿快。
接下来看一下序列化的相关代码:
/**
* this method uses a stringbuilder to create a new path for children. This
* is faster than string appends ( str1 + str2).
*
* @param oa
* OutputArchive to write to.
* @param path
* a string builder.
* @throws IOException
* @throws InterruptedException
*/
void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
String pathString = path.toString();
DataNode node = getNode(pathString);
if (node == null) {
return;
}
String children[] = null;
DataNode nodeCopy;
synchronized (node) {
StatPersisted statCopy = new StatPersisted();
copyStatPersisted(node.stat, statCopy);
//we do not need to make a copy of node.data because the contents
//are never changed
nodeCopy = new DataNode(node.data, node.acl, statCopy);
Set<String> childs = node.getChildren();
children = childs.toArray(new String[childs.size()]);
}
serializeNodeData(oa, pathString, nodeCopy);
path.append('/');
int off = path.length();
for (String child : children) {
// since this is single buffer being resused
// we need
// to truncate the previous bytes of string.
path.delete(off, Integer.MAX_VALUE);
path.append(child);
serializeNode(oa, path);
}
}
可以看到,的确是通过DataNode的Children来遍历所有节点。
接下来看一下反序列化的代码:
public void deserialize(InputArchive ia, String tag) throws IOException {
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
String path = ia.readString("path");
while (!"/".equals(path)) {
DataNode node = new DataNode();
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find " +
"parent " + parentPath + " of path " + path);
}
parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
nodes.put("/", root);
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
setupQuota();
aclCache.purgeUnused();
}
因为序列化的时候是前序遍历。所以反序列化时是先反序列化父亲节点,再反序列化孩子节点。
那么DataTree在什么情况下会序列化呢?在这里就要提到快照了。
前面提到过:如果我们使用一个内存数据结构加 WAL 的存储方案,WAL 就会一直增长。这样在存储系统启动的时候,就要读取大量的 WAL 日志数据来重建内存数据。快照可以解决这个问题。
除了减少WAL日志,Snapshot还会在Zk全量同步时被用到——当一个全新的ZkServer(这个一般叫Learner)被加入集群时,Leader服务器会将本机上的数据全量同步给新来的ZkServer。
接下来看一下代码入口:
/**
* serialize the datatree and session into the file snapshot
* @param dt the datatree to be serialized
* @param sessions the sessions to be serialized
* @param snapShot the file to store snapshot into
*/
public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
throws IOException {
if (!close) {
try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
//CheckedOutputStream cout = new CheckedOutputStream()
OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
serialize(dt, sessions, oa, header);
long val = crcOut.getChecksum().getValue();
oa.writeLong(val, "val");
oa.writeString("/", "path");
sessOS.flush();
}
} else {
throw new IOException("FileSnap has already been closed");
}
}
JavaIO的基础知识在这不再介绍,有兴趣的人可以自行查阅资料或看 从一段代码谈起——浅谈JavaIO接口。
本质就是创建文件,并调用DataTree的序列化方法,DataTree的序列化其实就是遍历DataNode去序列化,最后将这些序列化的内容写入文件。
/**
* deserialize a data tree from the most recent snapshot
* @return the zxid of the snapshot
*/
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
简单来说,先读取Snapshot文件们。并反序列化它们,组成DataTree。
在本文中,笔者和大家一起学习了Zk的底层存储技术。在此处,我们做个简单的回顾:
另外,Zk中序列化技术用的是Apache Jute——本质上调用了JavaDataOutput和Input,较为简单。故没在本文中展开。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。