theme: awesome-green
版本 | 日期 | 备注 |
---|---|---|
1.0 | 2020.3.29 | 文章首发 |
1.1 | 2020.4.18 | 改进小结部分 |
1.2 | 2020.5.4 | 修复笔误部分 |
1.4 | 2020.7.21 | 段落重新排版,增强语义 |
1.5 | 2020.8.6 | 增加题图 |
1.6 | 2021.6.22 | 标题从 |
我们知道zookeeper是一个分布式协同系统。在一个大型的分布式系统中,必然会有大量的client来连接zookeeper。那么zookeeper是如何管理这些session的生命周期呢?带着这个问题,我们进入今天的正文。
我们先来看看session相关的核心类——位于服务端的SessionTracker的抽象定义:
/**
* This is the basic interface that ZooKeeperServer uses to track sessions. The
* standalone and leader ZooKeeperServer use the same SessionTracker. The
* FollowerZooKeeperServer uses a SessionTracker which is basically a simple
* shell to track information to be forwarded to the leader.
*/
public interface SessionTracker {
public static interface Session {
long getSessionId();
int getTimeout();
boolean isClosing();
}
public static interface SessionExpirer {
void expire(Session session);
long getServerId();
}
long createSession(int sessionTimeout);
/**
* Add a global session to those being tracked.
* @param id sessionId
* @param to sessionTimeout
* @return whether the session was newly added (if false, already existed)
*/
boolean addGlobalSession(long id, int to);
/**
* Add a session to those being tracked. The session is added as a local
* session if they are enabled, otherwise as global.
* @param id sessionId
* @param to sessionTimeout
* @return whether the session was newly added (if false, already existed)
*/
boolean addSession(long id, int to);
/**
* @param sessionId
* @param sessionTimeout
* @return false if session is no longer active
*/
boolean touchSession(long sessionId, int sessionTimeout);
/**
* Mark that the session is in the process of closing.
* @param sessionId
*/
void setSessionClosing(long sessionId);
/**
*
*/
void shutdown();
/**
* @param sessionId
*/
void removeSession(long sessionId);
/**
* @param sessionId
* @return whether or not the SessionTracker is aware of this session
*/
boolean isTrackingSession(long sessionId);
/**
* Checks whether the SessionTracker is aware of this session, the session
* is still active, and the owner matches. If the owner wasn't previously
* set, this sets the owner of the session.
*
* UnknownSessionException should never been thrown to the client. It is
* only used internally to deal with possible local session from other
* machine
*
* @param sessionId
* @param owner
*/
public void checkSession(long sessionId, Object owner)
throws KeeperException.SessionExpiredException,
KeeperException.SessionMovedException,
KeeperException.UnknownSessionException;
/**
* Strictly check that a given session is a global session or not
* @param sessionId
* @param owner
* @throws KeeperException.SessionExpiredException
* @throws KeeperException.SessionMovedException
*/
public void checkGlobalSession(long sessionId, Object owner)
throws KeeperException.SessionExpiredException,
KeeperException.SessionMovedException;
void setOwner(long id, Object owner) throws SessionExpiredException;
/**
* Text dump of session information, suitable for debugging.
* @param pwriter the output writer
*/
void dumpSessions(PrintWriter pwriter);
/**
* Returns a mapping of time to session IDs that expire at that time.
*/
Map<Long, Set<Long>> getSessionExpiryMap();
}
大致可以看到,该interface定义对会话一系列的控制方法:比如会话的创建、激活及删除等等。
那么我们来看下其SessionTrackerImpl
实现中比较重要的接口和成员变量以及方法。
接下来我们来看看一个会话实例会包含哪些属性,话不多说,直接看接口定义:
public static interface Session {
long getSessionId();
int getTimeout();
boolean isClosing();
}
我们可以看到,在服务端,仅仅记录了client这样的三个属性:sessionId,timeout,isClosing。
但在client,还会更复杂一点。比如session的状态就有好多个:
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
/**
* Returns whether we are connected to a server (which
* could possibly be read-only, if this client is allowed
* to go to read-only mode)
* */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
通常情况下,因为网络闪断或其他原因,client会出现和server断开的情况。所幸的是,zkClient会自动重连,这时client会变为connecting,直到连上服务器,则变connected。如果会话超时、权限检查失败或client退出程序等异常情况,则客户端会变成close状态。
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
sessionsById
很显然,就是通过session的id与session本体做映射的一个字典。sessionExpiryQueue
,听名字像是一个过期队列,没错,不过里面使用了分桶策略 ,稍后我们会做分析。sessionsWithTimeout
,名字说明一切。用于标示session的超时时间,k是sessionId,v是超时时间。该数据结构和Zk的内存数据库相连通,会被定期持久化到快照里去。要谈会话管理,必然要谈到会话是怎么创建的,不然则显得有些空洞。这里不会赘述client的初始化过程。无论如何,我们需要一个链接,毕竟不能让会话基于空气建立:
ClientCnxnSocket
去创建与zk之间的TCP链接。readConnectResult
方法来处理请求。这就是会话的大致创建流程了,当然我们还省去了SyncConnected-None
的事件通知逻辑,因为这在本篇的内容里并不重要。
/**
* Generates an initial sessionId. High order byte is serverId, next 5
* 5 bytes are from timestamp, and low order 2 bytes are 0s.
*/
public static long initializeNextSession(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id <<56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
简单来说,前7位确定了所在的机器,后57位使用当前时间的毫秒表示进行随机。
会话过期检查是通过SessionTrackerImpl.run
来做的,这是一个线程的核心方法——显然,zk的session过期检查是通过一个线程来做的。
简单来说,ExpiryQueue
会根据时间将会要过期的sessions进行归档。比如在12:12:54将会有session1、session2、session3
会过期,12:12:55会有session4、session5、session6
会过期,那么时间会作为一个k,而对应的过期sessions会被作为一个数组,用字典将它们映射起来:
key | value |
---|---|
12:12:54 | session1,session2,session3 |
12:12:55 | session4,session5,session6 |
当然,实际中间隔不会是1s,这里为了便于表达,才这么写的。真实的情况是,zk会计算每个session的过期时间,并将其归档到对应的会话桶中。
CurrentTime+SessionTimeout
(见ExpiryQueue的update)。为了便于理解,我们可以举几个例子,Zk默认的间隔时间是2000ms:
0 | 2000ms | 4000ms | 6000ms | 8000ms |
---|---|---|---|---|
sessionB | sessionA |
这样线程就不用遍历所有的会话去逐一检查它们的过期时间了,有点妙。在这里,也可以简单的讲一下会话清理步骤:
isClosing
。这样在会话清理期间接收到客户端的新请求也无法继续处理了。PrepRequestProcessor
,使其在整个Zk集群里生效。sessionsWithTimeout
和内存数据库是共通的。FinalRequestProcessor.processRequest
)。sessionsById
、sessionExpiryQueue
、sessionsWithTimeout
中移除。FinalRequestProcessor.closeSession
)。从这里可以了解到,Zk临时节点的自动回收基于会话管理机制。
@Override
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
逻辑很简单。去sessionExpiryQueue
里看一下离最近的过期时间还要多久,有的话就等一会儿。
接下来是标记成Closing,并开始做使过期
操作。
我们接着看expirer.expire
:
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
跳向close
:
private void close(long sessionId) {
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
setLocalSessionFlag(si);
submitRequest(si);
}
就是build一个新的请求,然后set本地的flag。关键方法是submitRequest
:
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
第一段逻辑是等待Processor的chain准备好。接下来是激活一下会话,但会话如果已经被移除或超时,则会抛出异常。这个情况很正常,因为client的session和这里的移除请求并不是同时做的。
接下来则是提交移除会话
的请求。
从上面看来,session似乎是到了事先计算好的时间就会过期。其实并非如此——client会通过发送请求or心跳请求来保持会话的有效性,即延迟超时时间。这个过程一般叫做touchSession
(没错,代码里也是这么叫的)。我们来简单的讲一下流程:
synchronized public boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
updateSessionExpiry(s, timeout);
return true;
}
获取和校验逻辑不再赘述。直接跳向核心方法ExpiryQueue.update
:
/**
* Adds or updates expiration time for element in queue, rounding the
* timeout to the expiry interval bucketed used by this queue.
* @param elem element to add/update
* @param timeout timout in milliseconds
* @return time at which the element is now set to expire if
* changed, or null if unchanged
*/
public Long update(E elem, int timeout) {
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
// First add the elem to the new expiry time bucket in expiryMap.
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(
new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
逻辑非常简单。计算最新的过期时间,并放置到新的归档区间里,再移除掉老归档区间里的会话实例。
在本文中,笔者和大家一起了剖析了zk的session管理机制。有些点我们在以后设计系统时可以借鉴一番:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。