实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:
ZooKeeperServer
|-- QuorumZooKeeperServer
|-- LeaderZooKeeperServer
|-- LearnerZooKeeperServer
|-- FollowerZooKeeperServer
|-- ObserverZooKeeperServer
|-- ReadOnlyZooKeeperServer
// tickTime参数默认值
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 默认tickTime * 2
protected int minSessionTimeout = -1;
// 默认tickTime * 20
protected int maxSessionTimeout = -1;
// 会话跟踪
protected SessionTracker sessionTracker;
// 存储组件
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
// 缓存数据
private ResponseCache readResponseCache;
private ResponseCache getChildrenResponseCache;
// zxid会在启动阶段设置为最新lastZxid
private final AtomicLong hzxid = new AtomicLong(0);
// 请求处理器链入口
protected RequestProcessor firstProcessor;
// 缓存变化的数据
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
// 大请求判断使用的参数
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
private volatile int largeRequestThreshold = -1;
// 通过zkDb从dataTree中删除Watcher监听器
void removeCnxn(ServerCnxn cnxn);
// 创建zkDb(为null时)并loadData加载数据
public void startdata() throws IOException, InterruptedException;
// 加载数据、清理session、生成快照
public void loadData() throws IOException, InterruptedException;
// 保存zkDb当前快照
public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
boolean fastForwardFromEdits) throws IOException;
// 从指定的输入流解析数据,生成新的zkDb和SessionTrack
public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;
// 使用zkDb.truncateLog(zxid)删除快照数据
public void truncateLog(long zxid) throws IOException;
// 通过zkDb获取dataTree.lastProcessedZxid的值
public long getLastProcessedZxid();
// 提交closeSession类型的Request来关闭会话
private void close(long sessionId);
// 使用zkDb杀掉会话
protected void killSession(long sessionId, long zxid);
// 启动组件
private void startupWithServerState(State state);
// 创建RequestProcessor用来处理请求
protected void setupRequestProcessors();
// 创建SessionTracker
protected void createSessionTracker();
// 为指定的session生成一个密码
byte[] generatePasswd(long id);
// 验证session密码
protected boolean checkPasswd(long sessionId, byte[] passwd);
// 使用sessionTracker创建session、生成密码、提交一个createSession请求
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
// 为指定的session绑定owner
public void setOwner(long id, Object owner) throws SessionExpiredException;
// 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
public void finishSessionInit(ServerCnxn cnxn, boolean valid);
// checkPasswd->revalidateSession->finishSessionInit
public void reopenSession(ServerCnxn cnxn, long sessionId,
byte[] passwd, int sessionTimeout) throws IOException;
// 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
public void enqueueRequest(Request si);
// 使用firstProcessor处理请求
public void submitRequestNow(Request si);
// 处理连接请求,网络IO层调用
public void processConnectRequest(
ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
// 处理业务请求,网络IO层调用
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
// sasl认证
private void processSasl(
RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;
// 处理transaction
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
public ProcessTxnResult processTxn(Request request);
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest);
// Grant or deny authorization to an operation on a node
public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids,
String path, List<ACL> setAcls) throws KeeperException.NoAuthException;
// Check a path whether exceeded the quota
public void checkQuota(String path, byte[] lastData, byte[] data,
int type) throws KeeperException.QuotaExceededException;
private void checkQuota(String lastPrefix, long bytesDiff, long countDiff,
String namespace) throws KeeperException.QuotaExceededException;
// 获取上级父类path
private String parentPath(String path) throws KeeperException.BadArgumentsException;
// 从Request获取有效的path
private String effectiveACLPath(
Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException;
// 根据Request获取需要的权限类型
private int effectiveACLPerms(Request request);
// 检查写权限
public boolean authWriteRequest(Request request);
加载数据、清理session、生成快照:
public void loadData() throws IOException, InterruptedException {
// 初始化zxid
if (zkDb.isInitialized()) {
setZxid(zkDb.getDataTreeLastProcessedZxid());
} else {
setZxid(zkDb.loadDataBase());
}
// 使用killSession方法杀死过期会话
zkDb.getSessions().stream()
.filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
.forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
// 保存快照
// txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap)
takeSnapshot();
}
protected void killSession(long sessionId, long zxid) {
// 需要清理临时节点
zkDb.killSession(sessionId, zxid);
if (sessionTracker != null) {
// 删除会话跟踪信息
sessionTracker.removeSession(sessionId);
}
}
private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
// 创建RequestProcessor用于处理请求
setupRequestProcessors();
// 这是一个限流的组件,不做分析
startRequestThrottler();
registerJMX();
startJvmPauseMonitor();
registerMetrics();
setState(state);
requestPathMetricsCollector.start();
localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
RequestProcessor接口:以处理器链方式处理事务,请求总是按顺序处理。standaloneServer、follower和leader有不同的处理器链。请求通过processRequest方法传递给其他RequestProcessor对象,通常情况总是由单个线程调用。当调用shutdown时,RequestProcessor还应关闭与其关联的其他RequestProcessor对象。
FinalRequestProcessor类:处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾。
SyncRequestProcessor类:将请求记录到磁盘,对请求进行批处理,以有效地执行IO操作。在日志同步到磁盘之前,请求不会传递给下一个RequestProcessor对象。SyncRequestProcessor用于3种不同的情况:
PrepRequestProcessor类:通常位于RequestProcessor链开头,为更新请求关联的事务做设置。
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime,
createSessionTrackerServerId, getZooKeeperServerListener());
}
不同的子类使用了不同的SessionTracker实现类:
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
passwd = new byte[0];
}
// 创建一个session
long sessionId = sessionTracker.createSession(timeout);
// 生成session密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
// 提交createSession请求,该请求会被RequestProcessor处理
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
public void submitRequestNow(Request si) {
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
// 使用firstProcessor处理请求
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
// Update request accounting/throttling limits
requestFinished(si);
}
}
public void processConnectRequest(
ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
long sessionId = request.getSessionId();
// 略
if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
// zxid参数有误
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
int sessionTimeout = request.getTimeOut();
byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the session is setup
cnxn.disableRecv();
if (sessionId == 0) {
// 创建session
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
validateSession(cnxn, sessionId); // do nothing
// 关闭旧的ServerCnxn
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
cnxn.setSessionId(sessionId);
// 开启新session
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
cnxn.incrOutstandingAndCheckThrottle(h);
if (h.getType() == OpCode.auth) {
AuthPacket authPacket = request.readRecord(AuthPacket::new);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
// 认证、继续通信或者关闭连接,略
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(request, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
return;
} else {
Request si = new Request(
cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) { // 判断large请求
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
// 提交请求等待firstProcessor处理
submitRequest(si);
}
}
}
// entry point for quorum/Learner.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
processTxnForSessionEvents(null, hdr, txn);
return processTxnInDB(hdr, txn, null);
}
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
int opCode = (request == null) ? hdr.getType() : request.type;
long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
// Add the session to the local session map or global one in zkDB.
sessionTracker.commitSession(sessionId, cst.getTimeOut());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
}
private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
if (hdr == null) {
return new ProcessTxnResult();
} else {
return getZKDatabase().processTxn(hdr, txn, digest);
}
}
集群模式下的ZooKeeperServer基类:
Just like the standard ZooKeeperServer. We just replace the request processors: PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor
集群模式下leader节点使用的ZooKeeperServer实现类:
继承QuorumZooKeeperServer
使用的RequestProcessor与父类不同:
// 构建处理器链
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor =
new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(
toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
setupContainerManager(); // 启动ContainerManager用于删除ttl节点和container节点
}
使用LeaderSessionTracker做会话追踪
与learner节点通信
FinalRequestProcessor - 处理与请求相关的事务,并提供查询服务,给客户端发响应,位于RequestProcessor链末尾
ToBeAppliedRequestProcessor - 维护toBeApplied列表
CommitProcessor - 等待commit完成之后调用下游RequestProcessor处理器
ProposalRequestProcessor - 发起proposal并将Request转发给内部的SyncRequestProcessor和AckRequestProcessor
public ProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
// 内部有维护SyncRequestProcessor和AckRequestProcessor
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
}
PrepRequestProcessor - 通常位于RequestProcessor链开头,为更新请求关联的事务做设置
LeaderRequestProcessor - 负责执行本地会话升级,只有直接提交给leader的Request才能通过这个处理器
Learner基类:
与ZooKeeperServer类似,只是处理器链不同:
FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor
使用SyncRequestProcessor来记录leader的提案。
setupRequestProcessors方法:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
Observer类型节点的ZooKeeperServer实现。
setupRequestProcessors方法:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(
finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
// 默认false
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}