Zookeeper中的节点主要分为临时节点和持久节点。
持久节点在创建之后,除非主动发起删除,否则节点会一直存在;
而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。
本文主要来从源码的角度来分析下临时节点删除的全过程。
既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {synchronized public void run() {try {while (running) {currentTime = Time.currentElapsedTime();if (nextExpirationTime > currentTime) {this.wait(nextExpirationTime - currentTime);continue;}SessionSet set;// 直接删除到期时间的所有Sessionset = sessionSets.remove(nextExpirationTime);if (set != null) {for (SessionImpl s : set.sessions) {// 设置Session isClosing=truesetSessionClosing(s.sessionId);// 设置session过期处理,重点在这里,具体见1.2expirer.expire(s);}}nextExpirationTime += expirationInterval;}} catch (InterruptedException e) {handleException(this.getName(), e);}LOG.info("SessionTrackerImpl exited loop!");}
}
SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {public void expire(Session session) {long sessionId = session.getSessionId();LOG.info("Expiring session 0x" + Long.toHexString(sessionId)+ ", timeout of " + session.getTimeout() + "ms exceeded");// 关闭sessionclose(sessionId);}private void close(long sessionId) {// 提交一个关闭请求submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);}private void submitRequest(ServerCnxn cnxn, long sessionId, int type,int xid, ByteBuffer bb, List authInfo) {// 请求主要就是sessionId和操作类型 closeSessionRequest si = new Request(cnxn, sessionId, xid, type, bb, authInfo);submitRequest(si);}public void submitRequest(Request si) {...try {touch(si.cnxn);boolean validpacket = Request.isValid(si.type);if (validpacket) {// 直接交由firstProcessor处理firstProcessor.processRequest(si);if (si.cnxn != null) {incInProcess();}} else {LOG.warn("Received packet at server of unknown type " + si.type);new UnimplementedRequestProcessor().processRequest(si);}} ...}
}
从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。
然后交由firstProcessor来处理。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implementsRequestProcessor {final List outstandingChanges = new ArrayList();final HashMap outstandingChangesForPath = new HashMap();// 处理请求protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)throws KeeperException, IOException, RequestProcessorException {request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), type);...switch (type) {case OpCode.create:...case OpCode.closeSession:// 获取当前session创建的所有临时节点HashSet es = zks.getZKDatabase().getEphemerals(request.sessionId);synchronized (zks.outstandingChanges) {for (ChangeRecord c : zks.outstandingChanges) {if (c.stat == null) {// Doing a deletees.remove(c.path);} else if (c.stat.getEphemeralOwner() == request.sessionId) {es.add(c.path);}}// 将临时节点删除事件包装成ChangeRecord对象放入outstandingChangesfor (String path2Delete : es) {addChangeRecord(new ChangeRecord(request.hdr.getZxid(),path2Delete, null, 0, null));}zks.sessionTracker.setSessionClosing(request.sessionId);}LOG.info("Processed session termination for sessionid: 0x"+ Long.toHexString(request.sessionId));break;} }void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}
}
PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理
public class FinalRequestProcessor implements RequestProcessor {public void processRequest(Request request) {...if (request.hdr != null) {TxnHeader hdr = request.hdr;Record txn = request.txn;// 重要处理在这里// 交由ZookeeperServer处理rc = zks.processTxn(hdr, txn);}}
}
2.2.1 ZooKeeperServer.processTxn() 处理事务请求
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {ProcessTxnResult rc;int opCode = hdr.getType();long sessionId = hdr.getClientId();// 这里交由ZKDatabase处理,具体见2.2.2rc = getZKDatabase().processTxn(hdr, txn);if (opCode == OpCode.createSession) {if (txn instanceof CreateSessionTxn) {CreateSessionTxn cst = (CreateSessionTxn) txn;sessionTracker.addSession(sessionId, cst.getTimeOut());} else {LOG.warn("*****>>>>> Got "+ txn.getClass() + " "+ txn.toString());}} else if (opCode == OpCode.closeSession) {sessionTracker.removeSession(sessionId);}return rc;}
}
2.2.2 ZKDatabase.processTxn()
public class ZKDatabase {public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {// 交由DataTree处理return dataTree.processTxn(hdr, txn);}
}
2.2.3 DataTree.processTxn() 处理事务请求
public class DataTree {public ProcessTxnResult processTxn(TxnHeader header, Record txn) {ProcessTxnResult rc = new ProcessTxnResult();try {rc.clientId = header.getClientId();rc.cxid = header.getCxid();rc.zxid = header.getZxid();rc.type = header.getType();rc.err = 0;rc.multiResult = null;switch (header.getType()) {case OpCode.create:...case OpCode.closeSession:killSession(header.getClientId(), header.getZxid());break;}}}void killSession(long session, long zxid) {// 获取当前session所创建的临时节点HashSet list = ephemerals.remove(session);if (list != null) {for (String path : list) {try {// 具体处理deleteNode(path, zxid);if (LOG.isDebugEnabled()) {...}} catch (NoNodeException e) {LOG.warn("Ignoring NoNodeException for path " + path+ " while removing ephemeral for dead session 0x"+ Long.toHexString(session));}}}}public void deleteNode(String path, long zxid)throws KeeperException.NoNodeException {int lastSlash = path.lastIndexOf('/');String parentName = path.substring(0, lastSlash);String childName = path.substring(lastSlash + 1);DataNode node = nodes.get(path);if (node == null) {throw new KeeperException.NoNodeException();}nodes.remove(path);synchronized (node) {aclCache.removeUsage(node.acl);}DataNode parent = nodes.get(parentName);if (parent == null) {throw new KeeperException.NoNodeException();}synchronized (parent) {// 删除父节点下该子节点信息parent.removeChild(childName);parent.stat.setPzxid(zxid);long eowner = node.stat.getEphemeralOwner();if (eowner != 0) {HashSet nodes = ephemerals.get(eowner);if (nodes != null) {// 删除该临时节点synchronized (nodes) {nodes.remove(path);}}}node.parent = null;}...// 触发该临时节点的watch监听 Set processed = dataWatches.triggerWatch(path,EventType.NodeDeleted);childWatches.triggerWatch(path, EventType.NodeDeleted, processed);childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,EventType.NodeChildrenChanged);}
}
最终在FinalRequestProcessor中删除该session创建所有的临时节点。
删除临时节点包含三个步骤:
1.清理其父节点下当前节点信息
2.删除当前临时节点信息
3.触发当前节点的所有监听