Zookeeper源码分析ZooKeeperServer

发布时间:2024年01月22日

ZooKeeperServer

实现了单机版zookeeper服务端功能,子类实现了更加丰富的分布式集群功能:

ZooKeeperServer
  |-- QuorumZooKeeperServer
    |-- LeaderZooKeeperServer
    |-- LearnerZooKeeperServer
      |-- FollowerZooKeeperServer
      |-- ObserverZooKeeperServer
  |-- ReadOnlyZooKeeperServer

主要字段

// tickTime参数默认值
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
// 默认tickTime * 2
protected int minSessionTimeout = -1;
// 默认tickTime * 20
protected int maxSessionTimeout = -1;

// 会话跟踪
protected SessionTracker sessionTracker;

// 存储组件
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;

// 缓存数据
private ResponseCache readResponseCache;
private ResponseCache getChildrenResponseCache;

// zxid会在启动阶段设置为最新lastZxid
private final AtomicLong hzxid = new AtomicLong(0);
// 请求处理器链入口
protected RequestProcessor firstProcessor;
// 缓存变化的数据
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();

protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;

// 大请求判断使用的参数
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
private volatile int largeRequestThreshold = -1;

主要方法

方法定义

// 通过zkDb从dataTree中删除Watcher监听器
void removeCnxn(ServerCnxn cnxn);

// 创建zkDb(为null时)并loadData加载数据
public void startdata() throws IOException, InterruptedException;
// 加载数据、清理session、生成快照
public void loadData() throws IOException, InterruptedException;
// 保存zkDb当前快照
public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere,
                                      boolean fastForwardFromEdits) throws IOException;

// 从指定的输入流解析数据,生成新的zkDb和SessionTrack
public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException;

// 使用zkDb.truncateLog(zxid)删除快照数据
public void truncateLog(long zxid) throws IOException;
// 通过zkDb获取dataTree.lastProcessedZxid的值
public long getLastProcessedZxid();

// 提交closeSession类型的Request来关闭会话
private void close(long sessionId);
// 使用zkDb杀掉会话
protected void killSession(long sessionId, long zxid);
// 启动组件
private void startupWithServerState(State state);
// 创建RequestProcessor用来处理请求
protected void setupRequestProcessors();
// 创建SessionTracker
protected void createSessionTracker();

// 为指定的session生成一个密码
byte[] generatePasswd(long id);
// 验证session密码
protected boolean checkPasswd(long sessionId, byte[] passwd);
// 使用sessionTracker创建session、生成密码、提交一个createSession请求
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout);
// 为指定的session绑定owner
public void setOwner(long id, Object owner) throws SessionExpiredException;
// 验证session之后使用finishSessionInit方法确定继续通信或者断开连接
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException;
public void finishSessionInit(ServerCnxn cnxn, boolean valid);
// checkPasswd->revalidateSession->finishSessionInit
public void reopenSession(ServerCnxn cnxn, long sessionId,
                          byte[] passwd, int sessionTimeout) throws IOException;

// 把请求提交给requestThrottler之后再陆续调用submitRequestNow处理
public void enqueueRequest(Request si);
// 使用firstProcessor处理请求
public void submitRequestNow(Request si);

// 处理连接请求,网络IO层调用
public void processConnectRequest(
    ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException;
// 处理业务请求,网络IO层调用
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException;
// sasl认证
private void processSasl(
    RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException;

// 处理transaction
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn);
public ProcessTxnResult processTxn(Request request);
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn);
priv
文章来源:https://blog.csdn.net/softshow1026/article/details/135746846
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。