客戶端的啟動(dòng)流程
看上面的客戶端啟動(dòng)的腳本圖,可以看到,zookeeper客戶端腳本運(yùn)行的入口ZookeeperMain.java的main()方法, 關(guān)于這個(gè)類可以理解成它是程序啟動(dòng)的輔助類,由它提供開始的位置,進(jìn)而加載出zk client的上下文
創(chuàng)建ZooKeeperMain對(duì)象
// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
// todo new ZK客戶端
ZooKeeperMain main = new ZooKeeperMain(args);
// todo run方法的實(shí)現(xiàn)在下面
main.run();
}
跟蹤
ZooKeeperMain main = new ZooKeeperMain(args);能往下追很長(zhǎng)的代碼,提前說(shuō)main.run()的作用,就是對(duì)用戶輸入的命令進(jìn)行下一步處理
如上是入口函數(shù)的位置,跟進(jìn)這兩個(gè)函數(shù),可以找到我們?cè)赾lient端的命令行中可以輸入命令和zookeeper服務(wù)端進(jìn)行通信的原因(開起了新的線程),以及zookeeper的客戶端所依賴的其他類
跟進(jìn)ZooKeeperMain main = new ZooKeeperMain(args);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
cl.parseOptions(args);
// todo 連接到客戶端
connectToZK(cl.getOption("server"));
}
我們?cè)诿钚袉?dòng)客戶端時(shí),輸入命令zkCli.sh -server localhost:2181,其中的args數(shù)組, 就是我們?cè)趩?dòng)就是我們輸入的參數(shù),
構(gòu)建zookeeperMain對(duì)象時(shí),上面主要做了兩件事
- 解析args參數(shù)數(shù)組
- 連接客戶端
解析參數(shù)數(shù)組的邏輯就在下面, 很熟悉,就是我們?cè)诿钚袉?dòng)zookeeper時(shí)輸入的命令可選項(xiàng)
public boolean parseOptions(String[] args) {
List<String> argList = Arrays.asList(args);
Iterator<String> it = argList.iterator();
while (it.hasNext()) {
String opt = it.next();
try {
if (opt.equals("-server")) {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
} else if (opt.equals("-r")) {
options.put("readonly", "true");
}
} catch (NoSuchElementException e) {
System.err.println("Error: no argument found for option "
+ opt);
return false;
}
if (!opt.startsWith("-")) {
command = opt;
cmdArgs = new ArrayList<String>();
cmdArgs.add(command);
while (it.hasNext()) {
cmdArgs.add(it.next());
}
return true;
}
}
return true;
}
創(chuàng)建ZooKeeper客戶端的對(duì)象
接著看如果連接客戶端, connectToZK(String newHost) 同樣是本類方法,源碼如下:
// todo 來(lái)到這里
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
//todo 命令行中的server 后面跟著 host主機(jī)地址
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
// todo 創(chuàng)建zookeeper的實(shí)例
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
new MyWatcher(), readOnly);
}
到這里算是個(gè)小高潮吧,畢竟看到了zookeeper client的封裝類ZooKeeper, 這個(gè)類上的注解大概是這么介紹這個(gè)類的
- 它是個(gè)Zookeeper 客戶端的封裝類, 它的第一個(gè)參數(shù)是
host:port,host:port,host:port這種格式的字符串,逗號(hào)左右是不同的服務(wù)端的地址 - 會(huì)異步的創(chuàng)建session,通常這個(gè)session在構(gòu)造函數(shù)執(zhí)行完之間就已經(jīng)創(chuàng)建完成了
- watcher 是監(jiān)聽(tīng)者,它被通知的時(shí)刻不確定,可能是構(gòu)造方法執(zhí)行完成前,也可能在這之后
- 只要沒(méi)有連接成功, zookeeper客戶端,會(huì)一直嘗試從提供的服務(wù)地址串中選擇出一個(gè)嘗試鏈接
跟進(jìn)ZooKeeper的構(gòu)造方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
// todo 包裝服務(wù)端的地址
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//todo 將服務(wù)端的地址封裝進(jìn) StaticHostProvider -> HostProvider中
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
// todo 創(chuàng)建客戶端的上下文, 這個(gè)上下文對(duì)象的亮點(diǎn)就是它維護(hù)了一個(gè)客戶端的socket
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
// todo 跟進(jìn)這個(gè)方法,getClientCnxnSocket, 獲取出客戶端上下文中的socket
getClientCnxnSocket(), canBeReadOnly);
// todo 啟動(dòng)客戶端
cnxn.start();
}
主要做了這么幾件事
- 將服務(wù)端的地址解析封裝進(jìn)了
StaticHostProvider類中, 可以把這個(gè)類理解成專門存放服務(wù)端地址的set 集合 - 創(chuàng)建出了客戶端的上下文對(duì)象: ClientCnxn, 當(dāng)然在這之前,入?yún)⑽恢眠€有一個(gè)
getClientCnxnSocket()這個(gè)函數(shù)可以創(chuàng)建出客戶端的NIO Socket 然后調(diào)用
cnxn.start()其實(shí)就是啟動(dòng)了客戶端的另外兩條線程sendThread和eventThread下面會(huì)詳細(xì)說(shuō)創(chuàng)建客戶端的 NioSocket
繼續(xù)跟進(jìn)源碼getClientCnxnSocket()通過(guò)反射,zk客戶端使用的socket對(duì)象是ClientCnxnSocketNIO
//todo 通過(guò)反射創(chuàng)建出客戶端上下文中的 socket , 實(shí)際的ClientCnxnSocketNIO 是 ClientCnxnSocket的子類
// todo ---> zookeeper 封裝的 NIO的邏輯都在 實(shí)際的ClientCnxnSocketNIO
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
// todo zookeeper.clientCnxnSocket
String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// todo 上面String其實(shí)就是這個(gè)類的name, 根進(jìn)去看一下它的屬性
// todo 這個(gè)類維護(hù)了NioSocket使用到的 selector 選擇器 , 已經(jīng)發(fā)生的感興趣的事件SelectionKey
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// todo 可以看到客戶端使用的 NioSocket
return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
創(chuàng)建 ClientCnxn客戶端的上下文
創(chuàng)建上下文,構(gòu)造函數(shù)中的諸多屬性都是在前面讀取配置文件或是新添加進(jìn)來(lái)的,重點(diǎn)是最后兩行,它創(chuàng)建了兩條線程類,和zk客戶端的IO息息相關(guān)
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId; // todo 剛才傳遞過(guò)來(lái)的值為0
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
// todo 添加read的超時(shí)時(shí)間
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// todo 創(chuàng)建了一個(gè)seadThread 線程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
}
創(chuàng)建SendThread
sendThred是一個(gè)客戶端的線程類,什么時(shí)候開啟? 其實(shí)就在上面,當(dāng)創(chuàng)建了ClientCnxn后,調(diào)用的cnxn.start()就是在開啟它的run() , 它有什么作用? 它的run()是一個(gè)無(wú)限循環(huán),除非運(yùn)到了close的條件,否則他就會(huì)一直循環(huán)下去, 比如向服務(wù)端發(fā)送心跳,或者向服務(wù)端發(fā)送我們?cè)诳刂婆_(tái)輸入的數(shù)據(jù)以及接受服務(wù)端發(fā)送過(guò)來(lái)的響應(yīng)
這是他的構(gòu)造方法,可以看到它還是一個(gè)守護(hù)線程,并擁有客戶端socket的引用,有了NIO Socket相關(guān)技能
//todo
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
// todo 設(shè)置狀態(tài) Connecting
state = States.CONNECTING;
// todo 就是在 Zookeeper new ClientCnxn 時(shí), 在倒數(shù)第二個(gè)位置使傳遞進(jìn)去一個(gè)函數(shù)實(shí)際的
this.clientCnxnSocket = clientCnxnSocket;
// todo 設(shè)置成守護(hù)線程
setDaemon(true);
}
它的Run方法, 真的是好長(zhǎng)啊, 比我上面寫的部分內(nèi)容還長(zhǎng)(大概兩百行了), 大概它的流程 ,每次循環(huán):
- 檢查一下客戶端的socket有沒(méi)有和服務(wù)端的socket建立連接
- 沒(méi)有建立連接
- 嘗試選出其他的server地址進(jìn)行連接
- 如果滿足close的條件,直接break 跳出整個(gè)while循環(huán)
- 如果已經(jīng)建立了連接
- 計(jì)算 to = 讀取的超時(shí)時(shí)間 - 服務(wù)端的響應(yīng)時(shí)間
- 未連接的狀態(tài)
- 計(jì)算 to = 連接超時(shí)時(shí)間 - 服務(wù)端的響應(yīng)時(shí)間
- 上面的兩個(gè)to, 如果小于0, 說(shuō)明客戶端和服務(wù)端通信出現(xiàn)了異常, 很可能是server的session time out,于是拋出異常
- 如果連接狀態(tài)是健康的,向服務(wù)端發(fā)送心跳
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);向服務(wù)端發(fā)送數(shù)據(jù)
- 沒(méi)有建立連接
在這個(gè)負(fù)責(zé)和服務(wù)端進(jìn)行IO操作的線程中,只要不是close或其他重大錯(cuò)誤,一般可以預(yù)知的異常都有try起來(lái),然后記錄日志,并沒(méi)有其他操作,循環(huán)還是會(huì)進(jìn)行
// todo introduce 介紹
clientCnxnSocket.introduce(this,sessionId); // todo this,sessionId == 0
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// todo 這個(gè)while循環(huán)中存在建立連接的過(guò)程, 已經(jīng)連接建立失敗后不斷重試的過(guò)程
//todo state.isAlive() 默認(rèn)是 NOT_CONNECTED
while (state.isAlive()) {
try {
//todo 1111 如果socket還沒(méi)有連接 /////////////////////////////////////////////////////////////////////////////////////////////////////////
//todo 如果socket還沒(méi)有連接
if (!clientCnxnSocket.isConnected()) {
// todo 判斷是不是第一次連接, 如果不是第一次進(jìn)入下面try代碼塊, 隨機(jī)產(chǎn)生一個(gè)小于一秒的時(shí)間
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
// todo 如果是closing 或者 已經(jīng)關(guān)閉了, 直接退出這個(gè)循環(huán)
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// todo 連接失敗時(shí),來(lái)這里重試連接
// todo 從我們傳遞進(jìn)來(lái)的host地址中選擇一個(gè)地址
serverAddress = hostProvider.next(1000);
}
// todo client和server進(jìn)行socket連接
// todo 跟進(jìn)去 ,實(shí)現(xiàn)邏輯在上面
// todo 這個(gè)方法開始建立連接,并將 isFasterConnect改成了 false
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//todo 2222 如果socket處于連接狀態(tài) /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的連接狀態(tài)
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
// todo 連接成功的話執(zhí)行to 為下面值
// todo to = 讀取的超時(shí)時(shí)間 - 上一次的讀取時(shí)間
// todo 如果預(yù)訂的超時(shí)時(shí)間 - 上次讀的時(shí)間 <= 0 說(shuō)明超時(shí)了
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
// todo 如果沒(méi)有連接成功, 就會(huì)來(lái)到這里, 給 to 賦值
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//todo 3333 異常處理 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面拋出來(lái)了異常
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
// todo 這里拋出來(lái)了異常, 下面的try 就會(huì)把它抓住
throw new SessionTimeoutException(warnInfo);
}
//todo 44444 連接成功執(zhí)行的邏輯 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 下面的是連接成功執(zhí)行的邏輯
if (state.isConnected()) {
// todo 為了防止競(jìng)爭(zhēng)狀態(tài)丟失發(fā)送第二個(gè)ping, 同時(shí)也避免出現(xiàn)很多的ping
//1000(1 second) is to prevent(阻止) race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
// todo 客戶端一直在這里循環(huán), 如果連接成功的話, 每次循環(huán)都來(lái)到這個(gè)邏輯這里發(fā)送 ping
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//todo 55555 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// If we are in read-only mode, seek for read/write server
// todo 只讀狀態(tài) 相關(guān)邏輯
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//todo 66666 /////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo 消費(fèi)outgoingqueue, 完成向服務(wù)端的發(fā)送發(fā)送
// todo doTransport 是 ClientCnxnSocket 的抽象方法, 實(shí)現(xiàn)類clientCnxnSocketNio
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
// todo 在這個(gè)try中處理里面的拋出來(lái)的異常
if (closing) {
// todo 如果是請(qǐng)求關(guān)閉, 直接退出 break 出while循環(huán)
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// todo 只要不是退出異常, 下面的異常都是僅僅打印了一下出現(xiàn)了什么異常
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
// todo 這個(gè)方法中, isFirstConnect = true
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
} // todo while循環(huán)的結(jié)束符號(hào) , 這是個(gè)while循環(huán), 除了上面的close其他異常都會(huì)繼續(xù)循環(huán), 接著上去再看一遍
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
在上面這個(gè)200行的Run方法中比較值得注意的幾個(gè)方法如下
- 如果做到下次選出一個(gè)非當(dāng)前server的地址
針對(duì)下標(biāo)運(yùn)行,對(duì)數(shù)組的size取模, 再賦值給自己,所以就實(shí)現(xiàn)了從0 - array.size()的循環(huán)
,【巨型】【十萬(wàn)】【更加】【說(shuō)不】,【剔除】【塔狂】【有一】.【毒藥】【劈去】【就完】【橋右】,【點(diǎn)像】【水聲】【險(xiǎn)鯤】黑帽seo研究【十幾】,【狐那】【都掩】【用到】【思想】.【來(lái)短】!【若無(wú)】【是一】【君之】【全部】【升起】【就會(huì)】【姐聽(tīng)】【嗯我】【必然】【身金】【得更】【聲驚】【佛土】【應(yīng)的】【一會(huì)】【響之】【而說(shuō)】【量波】【得泰】【死有】【原了】【口中】【不高】【沒(méi)有】【不是】【如出】【衣袍】【巨大】【那火】【停頓】【雖然】【難度】【通天】【后多】【敏銳】【出現(xiàn)】, public InetSocketAddress next(long spinDelay) {
currentIndex = ++currentIndex % serverAddresses.size();
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
lastIndex = 0;
}
- 如果檢查到了沒(méi)有連接的話,就是用clientCnxnSocket進(jìn)行連接
這個(gè)函數(shù)中,將標(biāo)記是否是第一次連接的標(biāo)記置為了flase, 并且拿到了sessionid
// todo 保證連接的邏輯
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
//todo 創(chuàng)建了一個(gè)建立連接的request, 并且在下面將它添加進(jìn)來(lái) outgoingqueue
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
... 如watcher 相關(guān)的邏輯
SendThread 和 服務(wù)端的IO溝通
跟進(jìn)上面Run方法的如下方法,doTranprot
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
他是本類的抽象方法,具體的實(shí)現(xiàn)類是clientCnxnSocketNIO
跟進(jìn)這個(gè)方法,其中有一步跟重要doIO(pendingQueue, outgoingQueue, cnxn);
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// todo 建立連接的邏輯
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// todo 往服務(wù)端發(fā)送數(shù)據(jù)的邏輯 , 方法在上面的64行
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
- DoIo的源碼如下
它分成了兩大模塊
- 讀就緒, 讀取服務(wù)端發(fā)送過(guò)來(lái)的數(shù)據(jù)
- 寫就緒, 往客戶端發(fā)送用戶在控制臺(tái)輸入的命令
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
// todo 通過(guò)key獲取服務(wù)端的channel
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// TODO 讀就緒
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
// todo 返回buffer
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) { //todo 連接有沒(méi)有初始化, 來(lái)這之前被改成了 flase ,現(xiàn)在
// todo 讀取服務(wù)端發(fā)給我的連接請(qǐng)求的結(jié)果
readConnectResult(); // primeConnection()
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//todo 如果已經(jīng)初始化了, 就來(lái)這里讀取響應(yīng), 跟進(jìn)去
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//todo 寫就緒
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
// todo 查詢出可發(fā)送的packet
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
// todo 往服務(wù)端發(fā)送數(shù)據(jù) packet.ByteBuf
sock.write(p.bb); // 發(fā)送服務(wù)端
if (!p.bb.hasRemaining()) { //todo !hasRemaining 沒(méi)有剩余的數(shù)據(jù)
sentCount++;
// todo 將發(fā)送過(guò)的packet從outgoingqueue移除
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// todo 如果剛才的請(qǐng)求頭的類型不是null , 不是ping ,不是權(quán)限驗(yàn)證 就把packet添加到 pendingQueue
/**
* These are the packets that have been sent and are waiting for a response.
* todo 這個(gè)penddingQueue 存放的是已經(jīng)發(fā)送的 和 等待服務(wù)器響應(yīng)的packet
*/
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
e.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
思考:
雖然找到了客戶端往服務(wù)端發(fā)送數(shù)據(jù)的代碼, 但是問(wèn)題來(lái)了, 它發(fā)送的什么數(shù)據(jù)啊? 在上面可以看到,它每次發(fā)送的數(shù)據(jù)都被包裝車成了packet類型,并且,繼續(xù)往下跟進(jìn)可以看到這個(gè)packet來(lái)自于一個(gè)叫outgoingqueue的隊(duì)列中
client想往服務(wù)端發(fā)送什么?其實(shí)發(fā)送就是我們手動(dòng)輸入的命令,只不過(guò)他把我們的命令解析出來(lái)并且進(jìn)行了封裝,進(jìn)行了哪些封裝? String-> request -> packet -> socket ,這個(gè)packet就在上面的部分被消費(fèi)
到目前為止,算上一開始的主線程,其實(shí)已經(jīng)有3條線程了, 分別是主線程,SendThread和eventThread
代碼讀到這里,sendThread部分其實(shí)已經(jīng)結(jié)束了,我們直到了它正在消費(fèi)outgoingqueue中的內(nèi)容,接下來(lái)的任務(wù)返回回去,從新回到 ZooKeeperMain中,看一開始主線程時(shí)如何處理用戶在命令行的輸入的
// todo zookeeper的入口方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException {
// todo new ZK客戶端
ZooKeeperMain main = new ZooKeeperMain(args);
// todo run方法的實(shí)現(xiàn)在下面
main.run();
}
跟進(jìn) main.run(), 主要做了如下幾件事
- 通過(guò)反射創(chuàng)建出可以獲取控制臺(tái)輸入的對(duì)象
jline.ConsoleReader - 通過(guò)反射創(chuàng)建出可以解析鍵盤錄入的對(duì)象
- 開啟while循環(huán),等待用戶的輸入,處理用戶的輸入
executeLine(line);
@SuppressWarnings("unchecked")
void run() throws KeeperException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
// only use jline if it's in the classpath
try {
// todo jline.ConsoleReader是java命令行的實(shí)現(xiàn)類, 獲取可從控制臺(tái)接受輸入的對(duì)象
Class<?> consoleC = Class.forName("jline.ConsoleReader");
Class<?> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
System.out.println("JLine support is enabled");
// todo 使用反射獲取實(shí)例
Object console = consoleC.getConstructor().newInstance();
Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk);
// todo 通過(guò)反射獲取某指定類的指定方法 Completor
Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
addCompletor.invoke(console, completor);
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
// todo 我們?cè)诿钚兄休斎氲哪切┟钭罱K都會(huì)來(lái)到這里執(zhí)行
// todo getPrompt() 方法 就是在控制臺(tái)上打印出了命令行的前綴--- [zk: " + host + "("+zk.getState()+")" + " " + commandCount + "] "
while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
// todo 執(zhí)行命令行的輸入
executeLine(line);
}
} catch (ClassNotFoundException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (NoSuchMethodException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InvocationTargetException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (IllegalAccessException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InstantiationException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
}
if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
executeLine(line);
}
}
} else {
// Command line args non-null. Run what was passed.
processCmd(cl);
}
}
繼續(xù)跟進(jìn) executeLine(line);,做了如下幾件事
- 處理用戶輸入
- 將命令添加到歷史命令
- 處理命令
- 命令數(shù)+1
public void executeLine(String line)
throws InterruptedException, IOException, KeeperException {
if (!line.equals("")) {
cl.parseCommand(line);
// todo 添加到歷史命令
addToHistory(commandCount, line);
// todo 具體處理命令
processCmd(cl);
// todo 命令次數(shù)+1
commandCount++;
}
}
處理命令的邏輯如下:
將命令解析出來(lái),通過(guò)if分支語(yǔ)句,判斷用戶輸入的什么命令, 然后再進(jìn)一步處理
// todo zookeeper客戶端, 處理用戶輸入命令的具體邏輯
// todo 用大白話講,下面其實(shí)就是把 從控制臺(tái)獲取的用戶的輸入信息轉(zhuǎn)換成指定的字符, 然后發(fā)送到服務(wù)端
// todo MyCommandOptions 是處理命令行選項(xiàng)和shell腳本的工具類
protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException {
// todo 在這個(gè)方法中可以看到很多的命令行所支持的命令
Stat stat = new Stat();
// todo 獲取命令行輸入中 0 1 2 3 ... 位置的內(nèi)容, 比如 0 位置是命令 1 2 3 位置可能就是不同的參數(shù)
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
return false;
}
if (!commandMap.containsKey(cmd)) {
usage();
return false;
}
boolean watch = args.length > 2;
String path = null;
List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
LOG.debug("Processing " + cmd);
...
// todo 看看這個(gè)create命令的實(shí)現(xiàn), 如果是-e 就是很 CreateMode= ephemeral sequential 時(shí)序的
if (cmd.equals("create") && args.length >= 3) {
int first = 0;
CreateMode flags = CreateMode.PERSISTENT;
if ((args[1].equals("-e") && args[2].equals("-s"))
|| (args[1]).equals("-s") && (args[2].equals("-e"))) {
first += 2;
flags = CreateMode.EPHEMERAL_SEQUENTIAL;
} else if (args[1].equals("-e")) {
first++;
flags = CreateMode.EPHEMERAL;
} else if (args[1].equals("-s")) {
first++;
flags = CreateMode.PERSISTENT_SEQUENTIAL;
}
...
比如,用戶輸入的是創(chuàng)建新節(jié)點(diǎn)的命令create /path, 就會(huì)有下面的函數(shù)處理
// todo 調(diào)用Zookeeper的 create方法,
String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags);
跟進(jìn)這個(gè)方法 , 主要做了下面幾件事
- 校驗(yàn)合法性
- 封裝進(jìn) request
- 添加acl
- 提交submitRequest(),他是個(gè)重要的阻塞方法,每次執(zhí)行都會(huì)阻塞等待服務(wù)端的響應(yīng)
- 等待響應(yīng)結(jié)果
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
// todo 驗(yàn)證,path string 的合法性, 根據(jù)去查看
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
// todo 創(chuàng)建請(qǐng)求頭, 不同的操作有不同的頭標(biāo)記
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
// todo 將命令行里面的內(nèi)容嵌入到request
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
// todo 添加權(quán)限
request.setAcl(acl);
// todo 通過(guò)上下文, 將包裝后的用戶的request 提交到socket 傳遞到server , 跟進(jìn)去看看
ReplyHeader r =submitRequest
// todo 判斷是否出錯(cuò)了
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
客戶端的阻塞式等待 -- 自旋鎖
跟進(jìn)submitRequest()
// todo 這是ClientCnxn的類, 提交請(qǐng)求, 最終將我們的請(qǐng)求傳遞到socket
// todo 返回一個(gè)header, 因?yàn)楦鶕?jù)它判斷是否是否出錯(cuò)了
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// todo 來(lái)到這個(gè) queuePacket() 方法在下面, 這個(gè)方法就是將 用戶輸入-> string ->>> request ->>> packet 的過(guò)程
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// todo 使用同步代碼塊,在下面的進(jìn)行 同步阻塞等待, 直到有了Response響應(yīng)才會(huì)跳出這個(gè)循環(huán), 這個(gè)finished狀態(tài)就是在客戶端接受到服務(wù)端的
// todo 的響應(yīng)后, 將服務(wù)端的響應(yīng)解析出來(lái),然后放置到 pendingqueue里時(shí),設(shè)置上去的
synchronized (packet) {
while (!packet.finished) {
// todo 這個(gè)等待是需要喚醒的
packet.wait();
}
}
// todo 直到上面的代碼塊被喚醒,才會(huì)這個(gè)方法才會(huì)返回
return r;
}
在上面的代碼中,可以看到可以他是使用一個(gè)while(!packet,finishes){} 來(lái)阻塞程序的, 剛看看到用戶的命令被封裝進(jìn)了request, 接下來(lái), 在queuePacket(h, r, request, response, null, null, null, null, watchRegistration);中,可以看到他被封裝進(jìn)packet,然后添加到outgoingqueue隊(duì)列中,源碼如下
// todo
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet.
// todo 它會(huì)為我們的沒(méi)有 Xid 的packet生成 Xid
// It is generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// todo 她會(huì)在ClientCnxnSocket::doIO()之后生成
// where the packet is actually sent.
// todo packet實(shí)際生成的位置
synchronized (outgoingQueue) {
// todo 將用戶傳遞過(guò)來(lái)的信息包裝成 Packet
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
// todo 如果客戶端正在發(fā)送關(guān)閉session的請(qǐng)求, 就標(biāo)記成 closing = true
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// todo 將packet 添加到隊(duì)列里面
// todo 這個(gè)什么時(shí)候會(huì)被消費(fèi)呢? 是在sendthread的無(wú)限循環(huán)中被消費(fèi)的, 因?yàn)槟鞘堑诙l線程
outgoingQueue.add(packet);
}
}
// todo getClientCnxnSocket() 獲取ClientCnxnSocket對(duì)象
// todo wakeupCnxn() 是 ClientCnxnSocket對(duì)象 中的抽象方法, 實(shí)現(xiàn)類是 ClientCnxnSocket的實(shí)現(xiàn)類ClientCnxnSocketNio
// 喚醒阻塞在selector.select上的線程,讓該線程及時(shí)去處理其他事情,比如這里的讓sendThread 干凈去消費(fèi)packet
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
在這個(gè)方法的最后一行,點(diǎn)睛,selector.wakeup(); 就是通知選擇器,別再阻塞select了,趕緊去做其他工作
因?yàn)檫x擇器在sendThread的doTransport()方法中,有阻塞的操作,我重新把代碼貼出來(lái)如下
服務(wù)端的NIOSocket -> ClientCnxnSocket 都是ClientCnxn上下文的封裝類的,SendThread同樣也是,它可以使用
現(xiàn)在再看,喚醒selector 讓他去做其他事 ,其實(shí)即使doIO(),這個(gè)方法代碼其實(shí)我在上面貼出來(lái)過(guò),就是分成兩大部分,讀就緒與寫就緒
// todo 往服務(wù)端發(fā)送 packet
//todo 下面就是NIO 網(wǎng)絡(luò)編程的邏輯了
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
// todo 選擇器在waitTimeOut時(shí)間內(nèi)阻塞輪詢== 上一次計(jì)算的 to時(shí)間
selector.select(waitTimeOut);
Set<SelectionKey> selected;
// todo 獲取channel注冊(cè)進(jìn)selector時(shí)返回的key
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is non blocking,
// so time is effectively a constant. That is Why we just have to do this once, here
// todo 直到我們重新回到select之前, 下面的全部操作都是非阻塞的
// todo 因此時(shí)間只是一個(gè)常數(shù), 那就是為什么我們?cè)谶@里用下面的函數(shù)
updateNow();
//
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// todo 建立連接的邏輯
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// todo 往服務(wù)端發(fā)送數(shù)據(jù)的邏輯 , 方法在上面的64行
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
寫到這里其實(shí)已經(jīng)把整個(gè)過(guò)程順下來(lái)了,下面再重新看看,sendThread是如果消費(fèi)packet并且修改然后得到服務(wù)端的響應(yīng),修改pakcet.finished屬性的, 因?yàn)楝F(xiàn)在主線的submitRequest還在阻塞呢
往服務(wù)端寫
客戶端的socket的實(shí)現(xiàn)類是ClientCnxnSocketNio, 它往服務(wù)端寫的邏輯如下, 不難看出使用的java原生的sock.write(p.bb); // 發(fā)送服務(wù)端 , 亮點(diǎn)是后面的操作pendingQueue.add(p);被寫過(guò)的packet被添加到了pengingqueue中
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
// todo 查詢出可發(fā)送的packet
Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
// todo 往服務(wù)端發(fā)送數(shù)據(jù) packet.ByteBuf
sock.write(p.bb); // 發(fā)送服務(wù)端
if (!p.bb.hasRemaining()) { //todo !hasRemaining 沒(méi)有剩余的數(shù)據(jù)
sentCount++;
// todo 將發(fā)送過(guò)的packet從outgoingqueue移除
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// todo 如果剛才的請(qǐng)求頭的類型不是null , 不是ping ,不是權(quán)限驗(yàn)證 就把packet添加到 pendingQueue
/**
* These are the packets that have been sent and are waiting for a response.
* todo 這個(gè)penddingQueue 存放的是已經(jīng)發(fā)送的 和 等待服務(wù)器響應(yīng)的packet
*/
pendingQueue.add(p);
}
}
}
上面說(shuō)了, 為啥被使用過(guò)的pakcet還要保留一份呢? 還是那個(gè)原因,主線程還因?yàn)閜akcet的finish狀態(tài)未被該變而阻塞呢, 那什么時(shí)候改變呢? 答案是受到服務(wù)端的響應(yīng)之后改變,在哪里收到呢? 就是DoIo()的讀就緒模塊,下面附上源碼,它的解析我寫在這段代碼下面
從服務(wù)端讀
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
// todo 返回buffer
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) { //todo 連接有沒(méi)有初始化, 來(lái)這之前被改成了 flase ,現(xiàn)在
// todo 讀取服務(wù)端發(fā)給我的連接請(qǐng)求的結(jié)果
readConnectResult(); // primeConnection()
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//todo 如果已經(jīng)初始化了, 就來(lái)這里讀取響應(yīng), 跟進(jìn)去
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
如上代碼的最后部分,sendThread.readResponse(incomingBuffer); 下面是它的源碼,它首先是從buffer中讀取出服務(wù)端的發(fā)送的數(shù)據(jù),然后一通解析,封裝進(jìn)pendingqueue的packet中,并且在方法的最后部分終于完成了狀態(tài)的修改
// todo 同樣是 sendThread的方法, 讀取響應(yīng)
// todo 是經(jīng)過(guò)flip 反轉(zhuǎn)后的 可讀的buffer
void readResponse(ByteBuffer incomingBuffer) throws IOException {
// todo --------------------- 從服務(wù)端寫回來(lái)的buffer中解析封裝成 ReplyHeader ----------------------------
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
// todo ---------------------------------------------------------------------
// todo 下面根據(jù) ReplyHeader 的 xid 判斷響應(yīng)的結(jié)果類型
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
// todo 從pendingQueue 中取出第一個(gè)packet
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response to the first request!
* // todo 因?yàn)檎?qǐng)求存在隊(duì)列中,是有順序的, 因此我們最好對(duì)第一個(gè)做出相應(yīng)
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
// todo 把todo 從服務(wù)端解析出來(lái)的結(jié)果賦值給 pendingQueue 中的packet
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
// todo 跟進(jìn)這個(gè)方法
finishPacket(packet);
}
}
解開客戶端的阻塞狀態(tài)
進(jìn)入finishPacket(packet)
// todo ClientCnxn 也就是本類中, 在根據(jù)用戶的輸入向服務(wù)端提交命令后的那個(gè) wait喚醒了, finished=true,使得原來(lái)的while循環(huán)退出了
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
// todo 喚醒 Zookeeper中 submitRequest() 提交請(qǐng)求后的阻塞操作, 現(xiàn)在拿到請(qǐng)求后進(jìn)行喚醒
if (p.cb == null) {
synchronized (p) {
//todo 將這個(gè)finish 改成true, 在
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}|轉(zhuǎn)載請(qǐng)注明來(lái)源地址:蜘蛛池出租 http://www.wholesalehouseflipping.com/ 專注于SEO培訓(xùn),快速排名黑帽SEO https://www.heimao.wiki
