Zookeeper临时节点删除时机解析
创始人
2024-03-14 23:16:04
0

前言:

Zookeeper中的节点主要分为临时节点和持久节点。

持久节点在创建之后,除非主动发起删除,否则节点会一直存在;

而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。

本文主要来从源码的角度来分析下临时节点删除的全过程。

1.SessionTrackImpl的心跳检测

既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。

1.1 SessionTrackImpl.run()

public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {synchronized public void run() {try {while (running) {currentTime = Time.currentElapsedTime();if (nextExpirationTime > currentTime) {this.wait(nextExpirationTime - currentTime);continue;}SessionSet set;// 直接删除到期时间的所有Sessionset = sessionSets.remove(nextExpirationTime);if (set != null) {for (SessionImpl s : set.sessions) {// 设置Session isClosing=truesetSessionClosing(s.sessionId);// 设置session过期处理,重点在这里,具体见1.2expirer.expire(s);}}nextExpirationTime += expirationInterval;}} catch (InterruptedException e) {handleException(this.getName(), e);}LOG.info("SessionTrackerImpl exited loop!");}
}

SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。

1.2 ZooKeeperServer.expire() 处理session过期信息

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {public void expire(Session session) {long sessionId = session.getSessionId();LOG.info("Expiring session 0x" + Long.toHexString(sessionId)+ ", timeout of " + session.getTimeout() + "ms exceeded");// 关闭sessionclose(sessionId);}private void close(long sessionId) {// 提交一个关闭请求submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);}private void submitRequest(ServerCnxn cnxn, long sessionId, int type,int xid, ByteBuffer bb, List authInfo) {// 请求主要就是sessionId和操作类型 closeSessionRequest si = new Request(cnxn, sessionId, xid, type, bb, authInfo);submitRequest(si);}public void submitRequest(Request si) {...try {touch(si.cnxn);boolean validpacket = Request.isValid(si.type);if (validpacket) {// 直接交由firstProcessor处理firstProcessor.processRequest(si);if (si.cnxn != null) {incInProcess();}} else {LOG.warn("Received packet at server of unknown type " + si.type);new UnimplementedRequestProcessor().processRequest(si);}} ...}
}

从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。

然后交由firstProcessor来处理。

2.Processor处理closeSession请求

2.1 PrepRequestProcessor.pRequest2Txn() 处理事务请求

public class PrepRequestProcessor extends ZooKeeperCriticalThread implementsRequestProcessor {final List outstandingChanges = new ArrayList();final HashMap outstandingChangesForPath = new HashMap();// 处理请求protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)throws KeeperException, IOException, RequestProcessorException {request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), type);...switch (type) {case OpCode.create:...case OpCode.closeSession:// 获取当前session创建的所有临时节点HashSet es = zks.getZKDatabase().getEphemerals(request.sessionId);synchronized (zks.outstandingChanges) {for (ChangeRecord c : zks.outstandingChanges) {if (c.stat == null) {// Doing a deletees.remove(c.path);} else if (c.stat.getEphemeralOwner() == request.sessionId) {es.add(c.path);}}// 将临时节点删除事件包装成ChangeRecord对象放入outstandingChangesfor (String path2Delete : es) {addChangeRecord(new ChangeRecord(request.hdr.getZxid(),path2Delete, null, 0, null));}zks.sessionTracker.setSessionClosing(request.sessionId);}LOG.info("Processed session termination for sessionid: 0x"+ Long.toHexString(request.sessionId));break;}     }void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}
}

PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理

2.2 FinalRequestProcessor.processRequest() 最终处理请求

public class FinalRequestProcessor implements RequestProcessor {public void processRequest(Request request) {...if (request.hdr != null) {TxnHeader hdr = request.hdr;Record txn = request.txn;// 重要处理在这里// 交由ZookeeperServer处理rc = zks.processTxn(hdr, txn);}}
}

2.2.1 ZooKeeperServer.processTxn() 处理事务请求

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {ProcessTxnResult rc;int opCode = hdr.getType();long sessionId = hdr.getClientId();// 这里交由ZKDatabase处理,具体见2.2.2rc = getZKDatabase().processTxn(hdr, txn);if (opCode == OpCode.createSession) {if (txn instanceof CreateSessionTxn) {CreateSessionTxn cst = (CreateSessionTxn) txn;sessionTracker.addSession(sessionId, cst.getTimeOut());} else {LOG.warn("*****>>>>> Got "+ txn.getClass() + " "+ txn.toString());}} else if (opCode == OpCode.closeSession) {sessionTracker.removeSession(sessionId);}return rc;}
}

2.2.2 ZKDatabase.processTxn() 

public class ZKDatabase {public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {// 交由DataTree处理return dataTree.processTxn(hdr, txn);}
}

2.2.3 DataTree.processTxn() 处理事务请求

public class DataTree {public ProcessTxnResult processTxn(TxnHeader header, Record txn) {ProcessTxnResult rc = new ProcessTxnResult();try {rc.clientId = header.getClientId();rc.cxid = header.getCxid();rc.zxid = header.getZxid();rc.type = header.getType();rc.err = 0;rc.multiResult = null;switch (header.getType()) {case OpCode.create:...case OpCode.closeSession:killSession(header.getClientId(), header.getZxid());break;}}}void killSession(long session, long zxid) {// 获取当前session所创建的临时节点HashSet list = ephemerals.remove(session);if (list != null) {for (String path : list) {try {// 具体处理deleteNode(path, zxid);if (LOG.isDebugEnabled()) {...}} catch (NoNodeException e) {LOG.warn("Ignoring NoNodeException for path " + path+ " while removing ephemeral for dead session 0x"+ Long.toHexString(session));}}}}public void deleteNode(String path, long zxid)throws KeeperException.NoNodeException {int lastSlash = path.lastIndexOf('/');String parentName = path.substring(0, lastSlash);String childName = path.substring(lastSlash + 1);DataNode node = nodes.get(path);if (node == null) {throw new KeeperException.NoNodeException();}nodes.remove(path);synchronized (node) {aclCache.removeUsage(node.acl);}DataNode parent = nodes.get(parentName);if (parent == null) {throw new KeeperException.NoNodeException();}synchronized (parent) {// 删除父节点下该子节点信息parent.removeChild(childName);parent.stat.setPzxid(zxid);long eowner = node.stat.getEphemeralOwner();if (eowner != 0) {HashSet nodes = ephemerals.get(eowner);if (nodes != null) {// 删除该临时节点synchronized (nodes) {nodes.remove(path);}}}node.parent = null;}...// 触发该临时节点的watch监听    Set processed = dataWatches.triggerWatch(path,EventType.NodeDeleted);childWatches.triggerWatch(path, EventType.NodeDeleted, processed);childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,EventType.NodeChildrenChanged);}
}

总结:

最终在FinalRequestProcessor中删除该session创建所有的临时节点。

删除临时节点包含三个步骤:

1.清理其父节点下当前节点信息

2.删除当前临时节点信息

3.触发当前节点的所有监听

相关内容

热门资讯

深度关注丨促进"四项... 安徽省亳州市健全“纪巡”联动机制,该市市委巡察机构将发现的生态环境保护方面问题线索及时移交市纪委监委...
上海出台23条政策措施 支持长... 近日,上海市科委会同松江区研究制订了《关于支持长三角G60科创走廊策源地建设的若干措施》。《若干措施...
藏格矿业:发布对外投资管理制度 藏格矿业公告称,公司制定对外投资管理制度,规范对外投资行为,明确对外投资需遵循合法合规、符合发展战略...
一审败诉!海峡创新因担保卷入房... 12月25日晚间,海峡创新(300300)发布公告,公司收到浙江省杭州市拱墅区人民法院送达的民事判决...
形势政策系列报告会第三场报告会... 新华社北京12月25日电 由中央宣传部、中央和国家机关工委、教育部、中央军委政治工作部、北京市委联合...
以考提质 以答践责——长春市宽... 12月23日,长春市宽城区举行2025年度领导干部法律知识考试,28名新提拔处级领导干部和31名新提...
获赔200万!比亚迪起诉多个账... 12月25日,比亚迪法务部发文称,近期,就比亚迪起诉“龙哥讲电车”、“满格电新能源”等账号一案,公司...
全国人大常委会法工委:推动物业... 在民生领域,物业管理条例的修订工作备受关注。 12月24日,全国人大常委会法工委法规备案审查室主任严...
韩国涉毒“财阀千金”黄荷娜被捕... 封面新闻记者 雷蕴含 近日,韩国涉毒前科人员黄荷娜因潜逃海外、涉嫌关联电诈、洗钱等多重违法行为被押解...