

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化资料的朋友,可以戳这里获取
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
使用WireShark嗅探GetDataRequest产生的TCP包(十六进制字节数组)
| 十六进制位 | 协议部分 | 数值或字符串 |
|---|---|---|
| 00,00,00,1d | 0-3位:len 整个数据包长度 | 长度29 |
| 00,00,00,01 | 4-7位:xid 客户端请求的发起序号 | 1 |
| 00,00,00,04 | 8-11位:type 客户端请求类型 | 4 OpCode.getData |
| 00,00,00,10 | 12-15位:len 节点路径的长度 | 16 节点路径长度转换成十六进制是16位 |
| 2f,24,37,5f, 32,5f,34,2f, 67,65,74,5f, 64,61,74,61 | 16-31位:path 节点路径 | Hex编码 |
| 01 | 32位:是否注册Watcher | 1-是 |
GetDataResponse响应完整协议定义

响应头 ReplyHeader
public class ReplyHeader implements Record { private int xid; // 请求时传过来的xid会在响应中原样返回 private long zxid; // zxid 代表ZK服务器上当前最新事务ID private int err; // 错误码:Code.OK-0,NONODE-101,NOAUTH-102,定义在KeeperException.Code中 响应体Response
//会话创建 public class ConnectResponse implements Record { private int protocolVersion; private int timeOut; private long sessionId; private byte[] passwd; // 获取节点数据 public class GetDataResponse implements Record { private byte[] data; private org.apache.zookeeper.data.Stat stat; // 更新节点数据 public class SetDataResponse implements Record { private org.apache.zookeeper.data.Stat stat; GetDataResponse 协议定义
| 十六进制位 | 协议解释 | 当前值 |
|---|---|---|
| 00,00,00,63 | 0-3位:len 整个响应的数据包长度 | 99 |
| 00,00,00,05 | 4-7位:xid 客户端请求序号 | 5 本次请求所属会话创建后的第5次请求 |
| 00,00,00,00, 00,00,00,04 | 8-15位: zxid 当前服务器处理过的最大ZXID | 4 |
| 00,00,00,00 | 16-19位:err 错误码 | 0-Codes.OK |
| 00,00,00,0b | 20-23位:len 节点数据内容的长度 | 11 后面11位是数据内容的字节数组 |
| xxx | 24-34位:data 节点数据内容 | Hex编码 |
| 00,00,00,00, 00,00,00,04 | 35-42位:czxid 创建该节点时的ZXID | 4 |
| 00,00,00,00, 00,00,00,04 | 43-50位:mzxid 最后一次访问该数据节点时的ZXID | 4 |
| 00,00,01,43,67,bd,0e,08 | 51-58位:ctime 数据节点的创建时间 | unix_timestamp 1389014879752 |
| 00,00,01,43,67,bd,0e,08 | 59-66位:mtime 数据节点最后一次变更的时间 | |
| 00,00,00,00 | 67-70位:version 数据节点内容的版本号 | 0 |
| 00,00,00,00 | 71-74位:cversion 数据节点的子版本号 | 0 |
| 00,00,00,00 | 75-78位:aversion 数据节点的ACL变更版本号 | 0 |
| 00,00,00,00,00,00,00,00 | 79-86位:ephemeralOwner 如果是临时节点,则记录创建该节点的sessionID,否则置0 | 0 (该节点是永久节点) |
| 00,00,00,0b | 87-90位:dataLength 数据节点的数据内容长度 | 11 |
| 00,00,00,00 | 91-94位:numChildren 数据节点的子节点个数 | 0 |
| 00,00,00,00,00,00,00,04 | 95-102位:pzxid 最后一次对子节点列表变更的ZXID | 4 |
ZK客户端的组成:ZooKeeper实例-客户端入口,HostProvider - 客户端地址列表管理器,ClientCnxn-客户端核心线程,内部包含SendThread和EventThread两个线程。前者是一个IO线程,负责ZooKeeper客户端和服务器端间的网络IO通信,后者是一个事件线程,负责对服务端事件进行处理。
初始化阶段
会话创建阶段
响应处理阶段
connectString 形如 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,ZK客户端允许将服务器所有地址配置在字符上,ZK客户端在连接服务器的过程中是如何从服务器列表中选择机器的?是顺序?还是随机?
org.apache.zookeeper.client.ConnectStringParser 中的构造方法对connectString进行的处理有:解析chrootPath + 保存服务器地址列表到 ArrayList serverAddresses
chroot 客户端命名空间
ZK3.2.0 之后的版本中添加了该特性,connectString 可 设置为 192.168.0.1:2181,192.168.0.2:2181/apps/domainName,将解析出chroot=/apps/domainName,这样客户端的所有操作都会限制在这个命名空间下
ZooKeeper.java
private static HostProvider createDefaultHostProvider(String connectString) { return new StaticHostProvider(new ConnectStringParser(connectString).getServerAddresses()); } 解析的结果会返回 地址列表管理器 StaticHostProvider 的构造方法中
HostProvider 提供了客户端连接所需的host,每一个实现该接口的类需要确保下述几点:
public interface HostProvider { //当前服务器地址列表的个数,不能返回0 int size(); // 获取下一个将要连接的InetSocketAddress,spinDelay 表示所有地址都尝试过后的等待时间 InetSocketAddress next(long spinDelay); //连接成功后的回调方法 void onConnected(); //更新服务器列表,返回是否需要改变连接用于负载均衡 boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost); } 解析服务器地址:StaticHostProvider会解析服务器地址放入serverAddress 集合中,同时使用Collections#shuffle方法将服务器地址列表进行随机打散。
获取可用的服务器地址:StaticHostProvider#next() 方法中将随机排序后的服务器地址列表拼成一个环形循环队列,该过程是一次性的。
HostProvider的实现:自动从配置文件中读取服务器地址列表、动态变更的地址列表管理器(定时从配置管理中心上解析ZK服务器地址)、实现服务调用时同机房优先的策略
ClientCnxn维护客户端与服务器之间的网络连接并进行通信
Packet是ClientCnxn的内部类,定义:
static class Packet { RequestHeader requestHeader; ReplyHeader replyHeader; Record request; Record response; ByteBuffer bb; String clientPath; //server视角下的path,chroot不同 String serverPath; boolean finished; AsyncCallback cb; Object ctx; WatchRegistration watchRegistration; public boolean readOnly; WatchDeregistration watchDeregistration; //并不是Packet中的所有字段都进行网络传输,在createBB方法中定义了用于网络传输的ByteBuffer bb字段的生成逻辑 //里面只用到了RequestHeader requestHeader,Record request,boolean readOnly 3个字段 public void createBB() {} } ClientCnxn的两个核心队列(都是Packet队列):
ZK3.4之后ClientCnxnSocket从ClientCnxn中提取了出来,便于对底层Socket进行扩展(如使用Netty实现)
通过系统变量配合ClientCnxnSocket实现类的全类名:-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO
ClientCnxnSocketNIO是ClientCnxnSocket的Java NIO原生实现
【分布式】Zookeeper会话 - leesf - 博客园
会话状态有:CONNECTING CONNECTED RECONNECTING RECONNECTED CLOSE

Session是ZK中的会话实体,代表一个客户端会话,包含以下4个基本属性:
代码位于 SessionTrackerImpl#initializeNextSession //最终返回的sessionID:高8位是传入的id,剩下的56位最后16位被置零了,前面的40位是最高位截掉的timestamp(去掉数字1) public static long initializeNextSessionId(long id) { long nextSid; // nanoTime/10^6 就是 currentTimeMillis 13位long型,long型占空间8B,共64位 //如 1657349408123 对应 44 位的二进制是 00011000000111110001101110010000010101111011 //左移24位后再右移8位后的结果:00000000(-8位)1000000111110001101110010000010101111011(16位-)0000000000000000 //注意这个右移8位是无符号右移,防止unixtimes第5位是1带来的负数问题 nextSid = (System.nanoTime() / 1000000 << 24) >>> 8; //添加机器标识 sid 正好补在前面腾出的8位中 nextSid = nextSid | (id << 56); if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) { ++nextSid; // this is an unlikely edge case, but check it just in case } return nextSid; } 左移24位可以将高位的1去掉(unixTimestamp转二进制的44位数字开头总是0001),防止负数(负数右移8位后最高位的1不变),sid不能明确得出
ZK服务端的会话管理器,负责会话的创建、管理和清理,使用3个数据结构管理Session:
ZK的会话管理主要由SessionTracker负责,其采用了分桶策略:将理论上可以在同一时间点超时的会话放在同一区块中,便于进行会话的隔离处理和同一区块的统一管理。

对于一个会话的超时时间理论上就是客户端设置的超时时间之后,即图中的 ExpirationTime = CurrentTime + sessionTimeout(客户端进行设置),这样到达这个ExpirationTime检查各会话是否真的需要置超时状态
但是ZK服务端检查各区块的会话是否超时是有周期的,如每隔 ExpirationInterval 进行检查,这样实际的 ExpirationTime 是在原数值之后的最近一个周期上进行检查,这样
ExpirationTime_Adjust = ((CurrentTime + sessionTimeout) / ExpirationInterval + 1) * ExpirationInterval (单位均是ms)
如对于当前时间为4,,10 超时,检查周期为3,在15的时候才是第一个可能的超时时间。这样 ExpirationTime_Adjust 总是 ExpirationInterval 的整数倍。这样SessionTracker中的会话超时检查线程就可以在 ExpirationInterval 的整数倍的时间点上对会话进行批量清理(未及时移走的会话都是要被清理掉的,没有客户端触发会话激活)
Leader服务器收到客户端的心跳消息PING后:
触发会话激活的两种场景:
客户端与服务端网络连接断开时,ZK客户端会进行反复的重连
客户端经常看到的两种连接异常是:CONNECTION_LOSS 连接断开,SESSION_EXPIRE 会话过期;服务端可能看到的连接异常是SESSION_MOVED 会话转移
ZK服务端架构

zookeeper学习笔记Sky_的博客-CSDN博客

ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"创建服务器统计器ServerStats,包含下述基本运行时信息:
创建ZK数据管理器FileTxnSnapLog:FileTxnSnapLog是ZK上层服务器和底层数据存储之间的对接层,提供了一些列操作数据文件的接口,包括事务日志文件(TxnLog接口)和快照数据文件(SnapShot接口)。ZK根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。
设置服务端 tickTime 和 会话超时时间 限制
创建并初始化 ServerCnxnFactory , 通过属性 zookeeper.serverCnxnFactory 指定zookeeper使用 Java原生NIO还是Netty框架作为ZooKeeper服务端网络连接工厂
启动ServerCnxnFactory主线程(执行主逻辑所在的run方法)此时ZK的NIO服务器已经对外开放了端口,客户端可以访问到2181端口,但此时zk服务器还无法正常处理客户端请求
恢复本地数据:ZK启动时都会从本地快照文件和事务日志文件中进行数据恢复
创建并启动会话管理器SessionTracker,同时会设置 expirationInterval 计算 nextExpirationTime、sessionID ,初始化本地数据结构 sessionsWithTimeout(保存每个会话的超时时间)。之后ZK就会开始会话管理器的会话超时检查
初始化ZK的请求处理链,ZK服务端对于请求的初始方式是典型的责任链模式,单机版服务器的处理链主要包括:PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
注册JMX服务:ZK会将服务器运行时的一些状态信息以JMX的方式暴露出来
注册ZK服务器实例:此时ZK服务器初始化完毕,注册到ServerCnxnFactory之后就可以对外提供服务了,至此单机版的ZK服务器启动完毕
zk源码阅读26:集群版服务器启动概述 - 简书

预启动过程与单机版一致

接上面步骤10,启动步骤如下:
至此,集群版的ZK服务器启动完毕
Leader选举是ZooKeeper中最重要的技术之一,也是保证分布式数据一致性的关键
以3台机器组成的集群为例:Server1首先启动,此时无法完成Leader选举
Server2启动后,与Server1进行Leader选举,由于是初始化阶段,都会投票给自己,于是Server1投票内容 (myid, ZXID) 为 (1,0),Server2投票 (2,0),各自将这个投票发送给集群中的其他所有机器
每个服务器接收来自其他各服务器的投票,并判断投票的有效性:检查是否是本轮投票,是否来自LOOKING状态的服务器
收到其他服务器的投票后与自己的投票进行PK,PK规则有:
此时Server1收到Server2的投票(2,0),ZXID相同,但myid较小,会更新自己的投票为 (2,0) 并发出。Server2发现自己的myid较大,无需更新投票信息,只是再次向集群中所有机器发出上一次投票信息
Leader服务器宕机后进入新一轮的Leader选举
ZooKeeper提供了3种Leader选举算法:LeaderElection、UDP版本的FastLeaderElection、TCP版本的FastLeaderElection。
术语解释:
SID - 服务器ID,唯一标识ZooKeeper集群中的机器的数字,与myid一致
ZXID - 事务ID,用于唯一标识一次服务器状态的变更,某一时刻,集群中的每台服务器的ZXID不一定完全一致
Vote - 投票
Quorum - 过半机器数,quorum = n/2 + 1
ZooKeeper集群中服务器出现下述两种情况之一就会进入Leader选举:集群初始化启动阶段;Leader宕机/断网
而一台机器进入Leader选举流程时,当前集群也可能会处于两种状态:
【选举案例】集群有5台机器,SID分别为 1 2 3 4 5,ZXID分别为 9 9 9 8 8,在某一时刻SID为 1 2 的机器宕机退出,集群此时开始进行Leader选举
第一次投票时,由于还无法检测到集群中其他机器的状态信息,每台机器都将投自己,于是SID为 3 4 5的机器分别投票(SID,ZXID) (3,9) (4,8) (5,8)
每台机器发出自己的投票后也会收到来自集群中其他机器的投票,每台机器都会对比收到的投票,决定是否替换。假设机器自己的投票是 (self_sid, self_zxid) 接收到的投票是 (vote_sid, vote_zxid),对比的规则是:
SID为 3 4 5的机器对投票进行对比,会统一更新为投票 (3,9) ,此时quorum = 3 >= (5/2 + 1) 超过半数,选举服务器3作为Leader
ZXID越大的机器,数据也就越新,这样可以保证数据的恢复(更少的数据丢失),所以适合作为Leader服务器
在QuorumPeer.ServerState 类中定义了4种服务器状态
public enum ServerState { LOOKING, // 寻找Leader状态,当前集群中没有Leader,需要进入Leader选举流程 FOLLOWING, // 当前服务器的角色是Follower LEADING, // 当前服务器角色是Leader OBSERVING // 当前服务器角色是 Observer } org.apache.zookeeper.server.quorum.Vote 数据结构的定义
public class Vote { private final int version; private final long id; // 选举的Leader的SID private final long zxid; //逻辑时钟,用于判断多个投票是否在同一轮选举周期中。该值在服务端是一个自增序列,每次进入新一轮投票后,都会对该值+1 private final long electionEpoch;// 被推举的Leader的epoch private final long peerEpoch;//当前服务器的状态 每个服务器启动时会启动一个QuorumCnxManager,负责各服务器的底层Leader选举过程中的网络通信。
QuorumCnxManager内部维护了一系列按SID分组的消息队列:
recvQueue:消息接收队列,存放从其他服务器接收到的消息
queueSendMap:消息发送队列,保存待发送的消息。此Map的key是SID,分别为集群中的每台机器分配了一个单独队列,从而保证各台机器之间的消息发送互不影响
senderWorkerMap:发送器集合,同样按SID分组,每个SenderWorker消息发送器对应一台远程ZooKeeper服务器
lastMessageSent:最近发送过的消息,为每个SID记录最近发送过的消息
选举时集群中的机器是如何建立连接的:
为了能够进行互相投票,ZooKeeper集群中的机器需要两两建立网络连接。
QuorumCnxManager启动时会创建一个ServerSocket监听Leader选举的通信端口(默认3888),接收其他服务器的TCP连接请求并交给receiveConnection函数来处理。为了避免两台机器之间重复创建TCP连接,ZooKeeper设计一种建立TCP连接的规则:只允许SID大的服务器主动和其他服务器建立连接,否则断开连接。如果服务器收到TCP连接请求发现比自己的SID值小,会断开这个连接并主动与发起连接的远程服务器建立连接。
建立连接后就会根据外部服务器的SID创建对应的消息发送器 SendWorker 和 消息接收器RecvWorker 并启动
ZooKeeper对于选票的管理
FastLeaderElection#lookForLeader方法中揭示了选举算法的流程,该方法在服务器状态变成LOOKING时触发
自增选举轮次 logicalclock ++ FastLeaderElection中的 AtomicLong logicalclock 字段标记当前Leader的选举轮次,ZooKeeper在开始新一轮投票时,会首先对logicalclock进行自增操作
初始化选票 初始化选票Vote的属性:将自己推荐为Leader(id=服务器自身SID,zxid=当前服务器最新ZXID,electionEpoch=当前服务器的选举轮次,peerEpoch=被推举的服务器的选举轮次,state=LOOKING)
将初始化好的选票放入sendqueue中,由WorkerSender负责发出
服务器不断从 recvqueue 接收外部投票,如果服务器发现无法获取到任何投票会检查与其他服务器的连接,修复连接后重新发出
处理外部投票,根据选举轮次判断进行不同的处理:
选票PK:收到其他服务器有效的外部投票后,进行选票PK,执行FastLeaderElection.totalOrderPredicate方法,选票PK的目的是确定当前服务器是否需要变更投票,主要从 logicalclock、ZXID、SID三个维度判断,符合下述任意一个条件就进行投票变更:
变更投票:如果需要变更投票就使用外部投票的选票信息覆盖内部投票,变更完成后再将这个变更后的内部投票发出去
选票归档:无论是否进行了投票变更,外部投票都会存入recvset中进行归档,recvset中按照服务器对应的SID来区分{(1,vote1),(2,vote2),…}
统计投票:统计集群中是否已经有过半的机器认可了当前的内部投票,否则返回步骤4
更新服务器状态:如果此时已经确定可以终止投票,就更新服务器状态:根据过半机器认可的投票对应的服务器是否是自己确定是否成为Leader,并将状态切换为LEADING/FOLLOWING/OBSERVING
上述10个步骤就是FastLeaderElection的选举流程,步骤4~9会经过几轮循环,直到Leader选举产生。在步骤9如果已经有过半服务器认可了当前选票,此时ZooKeeper并不会立即进入步骤10,而是等待一段时间(默认200ms)来确定是否有新的更优的投票。
工作内容:事务请求的唯一调度和处理者,保证集群事务处理的顺序性;集群内部各服务器的调度者;
ZooKeeper使用责任链模式来处理客户端请求

PrepRequestProcessor是Leader服务器的请求预处理器,在ZK中,将创建删除节点/更新数据/创建会话等会改变服务器状态的请求称为事务请求,对于事务请求,预处理器会进行一系列预处理,如创建请求事务头、事务体、会话检查、ACL检查和版本检查
ProposalRequestProcessor Leader的事务投票处理器,也是Leader服务器事务处理流程的发起者。
SyncRequestProcessor 事务日志处理器,将事务请求记录到事务日志文件中,触发ZooKeeper进行数据快照
AckRequestProcessor 是Leader特有的处理器,负责在SyncRequestProcessor处理器完成事务日志记录后向Proposal的投票收集器发送ACK反馈,通知投票收集器当前服务器已完成对该Proposal的事务日志记录
CommitProcessor 事务提交处理器
ToBeCommitProcessor 该处理类中有一个toBeApplied队列(ConcurrentLinkedQueue toBeApplied)存储被CommitProcessor处理过的可被提交的Proposal,等待FinalRequestProcessor处理完提交的请求后从队列中移除
FinalRequestProcessor 进行客户端请求返回前的收尾工作:创建客户端请求的响应、将事务应用到内存数据库
LearnerHandler:Leader服务器会与每一个Follower/Observer服务器建立一个TCP长链接,同时为每个Follower/Observer服务器创建LearnerHandler。LearnerHandler是ZK集群中的Learner服务器的管理器,负责Follower/Observer服务器和Leader服务器之间的网络通信:数据同步、请求转发、Proposal提议的投票。
Follower的职责:处理客户端非事务请求,转发事务请求给Leader服务器;参与事务请求Proposal的投票;参与Leader选举投票;
Follower不需要负责事务请求的投票处理(所以不需要ProposalRequestProcessor),所以其请求处理链简单一些

观察ZooKeeper集群的最新状态并将这些状态变更同步过来,Observer服务器在工作原理上与Follower基本一致,对于非事务请求可以进行独立的处理,对于事务请求同样需要转发到Leader服。与Follower的一大区别是:Observer不参与任何形式的投票,包括Leader选举和事务请求Proposal的投票。
ZK集群各服务器间消息类型分为4类:数据同步型、服务器初始化型、请求处理型、会话管理型
Learner与Leader进行数据同步使用的消息,分为4种(消息类型定义在Leader.java中,使用常量数字标记):
整个集群或某些机器初始化时,Leader与Learner之间相互通信所使用的消息类型:
请求处理过程中Leader和Learner之间互相通信所使用的消息:
ZK服务器在进行会话管理过程中,与Learner服务器之间通信所使用的消息:
会话创建请求
ZK服务端对于会话创建的处理,可以分为请求接收、会话创建、预处理、事务处理、事务应用和会话响应。
zookeeper源码分析(3)— 一次会话的创建过程 - 简书— 一次会话的创建过程 - 简书")


ProposalRequestProcessor处理请求:PrepRequestProcessor将请求交给下一级处理器,提案Proposal是ZK中对因事务请求展开的投票流程中的事务操作的包装,该处理器就是处理提案的,处理流程有:
Sync流程:SyncRequestProcessor处理器记录事务日志。完成事务日志记录后,每个Follower都会向Leader发送ACK消息,表明自身完成了事务日志的记录,以便Leader服务器统计每个事务请求的投票情况
Proposal流程:ZK的实现中,每个事务请求都需要集群中过半机器投票认可才能真正应用到ZK的内存数据库中,这个投票与统计的过程就叫 Proposal流程:
Commit流程:



既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
用于确保事务请求的顺序性,便于CommitProcessor检测当前集群中是否正在进行事务请求的投票
+ 等待Proposal投票:Commit流程处理时,Leader根据当前事务请求生成Proposal广播给所有Follower,此时Commit流程需要等待
+ 投票通过,提议获得过半机器认可,ZK会将请求放入committedRequests队列中,同时唤醒Commit流程
+ 提交请求:将请求放入toProcess队列中,交给FinalRequestProcessor处理
[外链图片转存中…(img-YBQD3BIG-1715341903538)]
[外链图片转存中…(img-p0dH29Hs-1715341903539)]
[外链图片转存中…(img-7Ar2803B-1715341903539)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
上一篇:linux实用技巧:ubuntu18.04安装samba服务器实现局域网文件共享
下一篇:frp内网穿透实现外网可访问的ftp(FileZillaServer)服务器(web服务器,远程桌面连接都可以)