流程圖
先看一下客戶端源碼的流程圖
下面根據源碼講解,大家整合源碼和流程圖一起看最好,本篇內容比較多建議收藏起來看。
入口類
從ZkCli.sh腳本中可以看到zk源碼客戶端入口類為ZooKeeperMain
找到入口類ZooKeeperMain 中的main方法
<code>public static void main(String args[]) throws KeeperException, IOException, InterruptedException { // 初始化 ZooKeeperMain main = new ZooKeeperMain(args); // 讀命令執行命令,也就是我們的創建節點更新節點等等命令的讀取執行 main.run(); }/<code>
跟蹤 ZooKeeperMain main = new ZooKeeperMain(args);
<code>public ZooKeeperMain(String args[]) throws IOException, InterruptedException { //向private Map<string> options = new HashMap<string>(); 設置參數 cl.parseOptions(args); System.out.println("Connecting to " + cl.getOption("server")); // 連接zk connectToZK(cl.getOption("server")); //zk = new ZooKeeper(cl.getOption("server"),// Integer.parseInt(cl.getOption("timeout")), new MyWatcher()); }/<string>/<string>/<code>
<code> protected void connectToZK(String newHost) throws InterruptedException, IOException { // zk已連接,先close if (zk != null && zk.getState().isAlive()) { zk.close(); } host = newHost; boolean readOnly = cl.getOption("readonly") != null; //new 出 ZooKeeper zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly); }/<code>
<code>public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 創建客戶端上下文 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); // 開啟上下文設置的線程 /**public void start() { ** sendThread.start(); ** eventThread.start(); ** } **/ cnxn.start(); }/<code>
<code> SendThread(ClientCnxnSocket clientCnxnSocket) { super(makeThreadName("-SendThread()")); // 連接狀態 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,CLOSED, AUTH_FAILED, NOT_CONNECTED state = States.CONNECTING; this.clientCnxnSocket = clientCnxnSocket; setDaemon(true);// 守護線程 }/<code>
<code> EventThread() { super(makeThreadName("-EventThread")); setDaemon(true); // 守護線程 }/<code>
跟蹤main.run();
<code> void run() throws KeeperException, IOException, InterruptedException { if (cl.getCommand() == null) { System.out.println("Welcome to ZooKeeper!"); boolean jlinemissing = false; // only use jline if it's in the classpath try { Class> consoleC = Class.forName("jline.ConsoleReader"); Class> completorC = Class.forName("org.apache.zookeeper.JLineZNodeCompletor"); System.out.println("JLine support is enabled"); Object console = consoleC.getConstructor().newInstance(); Object completor = completorC.getConstructor(ZooKeeper.class).newInstance(zk); Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor")); addCompletor.invoke(console, completor); String line; Method readLine = consoleC.getMethod("readLine", String.class); while ((line = (String)readLine.invoke(console, getPrompt())) != null) { // 執行 executeLine(line); } } catch (ClassNotFoundException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (NoSuchMethodException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InvocationTargetException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (IllegalAccessException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } catch (InstantiationException e) { LOG.debug("Unable to start jline", e); jlinemissing = true; } if (jlinemissing) { System.out.println("JLine support is disabled"); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); String line; while ((line = br.readLine()) != null) { executeLine(line); } } } else { // Command line args non-null. Run what was passed. processCmd(cl); } }/<code>
<code> public void executeLine(String line) throws InterruptedException, IOException, KeeperException { if (!line.equals("")) { cl.parseCommand(line); // 添加歷史命令 addToHistory(commandCount,line); // 執行命令核心方法 processCmd(cl); // 命令計數 commandCount++; } }/<code>
<code> protected boolean processCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { try { // 執行zk命令 return processZKCmd(co); } catch (IllegalArgumentException e) { System.err.println("Command failed: " + e); } catch (KeeperException.NoNodeException e) { System.err.println("Node does not exist: " + e.getPath()); } catch (KeeperException.NoChildrenForEphemeralsException e) { System.err.println("Ephemerals cannot have children: " + e.getPath()); } catch (KeeperException.NodeExistsException e) { System.err.println("Node already exists: " + e.getPath()); } catch (KeeperException.NotEmptyException e) { System.err.println("Node not empty: " + e.getPath()); } catch (KeeperException.NotReadOnlyException e) { System.err.println("Not a read-only call: " + e.getPath()); }catch (KeeperException.InvalidACLException e) { System.err.println("Acl is not valid : "+e.getPath()); }catch (KeeperException.NoAuthException e) { System.err.println("Authentication is not valid : "+e.getPath()); }catch (KeeperException.BadArgumentsException e) { System.err.println("Arguments are not valid : "+e.getPath()); }catch (KeeperException.BadVersionException e) { System.err.println("version No is not valid : "+e.getPath()); } return false; }/<code>
<code>/** 根據字符串判斷屬於什麼命令然後發送請求,最終會調用到zookeeper類中的對應方法 **/protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { Stat stat = new Stat(); String[] args = co.getArgArray(); String cmd = co.getCommand(); if (args.length < 1) { usage(); return false; } if (!commandMap.containsKey(cmd)) { usage(); return false; } boolean watch = args.length > 2; String path = null; Listacl = Ids.OPEN_ACL_UNSAFE; LOG.debug("Processing " + cmd); if (cmd.equals("quit")) { System.out.println("Quitting..."); zk.close(); System.exit(0); } else if (cmd.equals("redo") && args.length >= 2) { Integer i = Integer.decode(args[1]); if (commandCount <= i || i < 0){ // don't allow redoing this redo System.out.println("Command index out of range"); return false; } cl.parseCommand(history.get(i)); if (cl.getCommand().equals( "redo" )){ System.out.println("No redoing redos"); return false; } history.put(commandCount, history.get(i)); processCmd( cl); } else if (cmd.equals("history")) { for (int i=commandCount - 10;i<=commandCount;++i) { if (i < 0) continue; System.out.println(i + " - " + history.get(i)); } } else if (cmd.equals("printwatches")) { if (args.length == 1) { System.out.println("printwatches is " + (printWatches ? "on" : "off")); } else { printWatches = args[1].equals("on"); } } else if (cmd.equals("connect")) { if (args.length >=2) { connectToZK(args[1]); } else { connectToZK(host); } } // Below commands all need a live connection if (zk == null || !zk.getState().isAlive()) { System.out.println("Not connected"); return false; } if (cmd.equals("create") && args.length >= 3) { int first = 0; CreateMode flags = CreateMode.PERSISTENT; if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { first+=2; flags = CreateMode.EPHEMERAL_SEQUENTIAL; } else if (args[1].equals("-e")) { first++; flags = CreateMode.EPHEMERAL; } else if (args[1].equals("-s")) { first++; flags = CreateMode.PERSISTENT_SEQUENTIAL; } if (args.length == first + 4) { acl = parseACLs(args[first+3]); } path = args[first + 1]; String newPath = zk.create(path, args[first+2].getBytes(), acl, flags); System.err.println("Created " + newPath); } else if (cmd.equals("delete") && args.length >= 2) { path = args[1]; zk.delete(path, watch ? Integer.parseInt(args[2]) : -1); } else if (cmd.equals("rmr") && args.length >= 2) { path = args[1]; ZKUtil.deleteRecursive(zk, path); } else if (cmd.equals("set") && args.length >= 3) { path = args[1]; stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); printStat(stat); } else if (cmd.equals("aget") && args.length >= 2) { path = args[1]; zk.getData(path, watch, dataCallback, path); } else if (cmd.equals("get") && args.length >= 2) { path = args[1]; byte data[] = zk.getData(path, watch, stat); data = (data == null)? "null".getBytes() : data; System.out.println(new String(data)); printStat(stat); } else if (cmd.equals("ls") && args.length >= 2) { path = args[1]; List<string> children = zk.getChildren(path, watch); System.out.println(children); } else if (cmd.equals("ls2") && args.length >= 2) { path = args[1]; List<string> children = zk.getChildren(path, watch, stat); System.out.println(children); printStat(stat); } else if (cmd.equals("getAcl") && args.length >= 2) { path = args[1]; acl = zk.getACL(path, stat); for (ACL a : acl) { System.out.println(a.getId() + ": " + getPermString(a.getPerms())); } } else if (cmd.equals("setAcl") && args.length >= 3) { path = args[1]; stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1); printStat(stat); } else if (cmd.equals("stat") && args.length >= 2) { path = args[1]; stat = zk.exists(path, watch); if (stat == null) { throw new KeeperException.NoNodeException(path); } printStat(stat); } else if (cmd.equals("listquota") && args.length >= 2) { path = args[1]; String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode; byte[] data = null; try { System.err.println("absolute path is " + absolutePath); data = zk.getData(absolutePath, false, stat); StatsTrack st = new StatsTrack(new String(data)); System.out.println("Output quota for " + path + " " + st.toString()); data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat); System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString()); } catch(KeeperException.NoNodeException ne) { System.err.println("quota for " + path + " does not exist."); } } else if (cmd.equals("setquota") && args.length >= 4) { String option = args[1]; String val = args[2]; path = args[3]; System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path); if ("-b".equals(option)) { // we are setting the bytes quota createQuota(zk, path, Long.parseLong(val), -1); } else if ("-n".equals(option)) { // we are setting the num quota createQuota(zk, path, -1L, Integer.parseInt(val)); } else { usage(); } } else if (cmd.equals("delquota") && args.length >= 2) { //if neither option -n or -b is specified, we delete // the quota node for thsi node. if (args.length == 3) { //this time we have an option String option = args[1]; path = args[2]; if ("-b".equals(option)) { delQuota(zk, path, true, false); } else if ("-n".equals(option)) { delQuota(zk, path, false, true); } } else if (args.length == 2) { path = args[1]; // we dont have an option specified. // just delete whole quota node delQuota(zk, path, true, true); } else if (cmd.equals("help")) { usage(); } } else if (cmd.equals("close")) { zk.close(); } else if (cmd.equals("sync") && args.length >= 2) { path = args[1]; zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null ); } else if (cmd.equals("addauth") && args.length >=2 ) { byte[] b = null; if (args.length >= 3) b = args[2].getBytes(); zk.addAuthInfo(args[1], b); } else if (!commandMap.containsKey(cmd)) { usage(); } return watch; }/<string>/<string> /<code>
Zookeeper類中的方法
接下來看一個Zookeeper類中的create 方法
<code>public String create(final String path, byte data[], Listacl, CreateMode createMode) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath, createMode.isSequential()); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); // 請求參數 CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null && acl.size() == 0) { throw new KeeperException.InvalidACLException(); } // 設置acl權限 request.setAcl(acl); // 提交請求 ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } } /<code>
<code> public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); // 封裝成packet,任務放到outgoingQueue隊列裡面 Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); // 同步等待結果 synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }/<code>
之前提到上下文會創建sendThread與evenThread兩個線程,接下來再來看一下這兩個線程都做了什麼,既然是線程那麼就需要看run方法。
sendThread線程run方法:
<code> @Override public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); int to; long lastPingRwServer = Time.currentElapsedTime(); final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds InetSocketAddress serverAddress = null; // 客戶端alive時 while (state.isAlive()) { try { // 未連接 if (!clientCnxnSocket.isConnected()) { if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } if (rwServerAddress != null) { serverAddress = rwServerAddress; rwServerAddress = null; } else { serverAddress = hostProvider.next(1000); } // 開始創建連接 startConnect(serverAddress); clientCnxnSocket.updateLastSendAndHeard(); }// 已連接 if (state.isConnected()) { // determine whether we need to send an AuthFailed event. if (zooKeeperSaslClient != null) { boolean sendAuthEvent = false; if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) { try { zooKeeperSaslClient.initialize(ClientCnxn.this); } catch (SaslException e) { LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e); state = States.AUTH_FAILED; sendAuthEvent = true; } } KeeperState authState = zooKeeperSaslClient.getKeeperState(); if (authState != null) { if (authState == KeeperState.AuthFailed) { // An authentication error occurred during authentication with the Zookeeper Server. state = States.AUTH_FAILED; sendAuthEvent = true; } else { if (authState == KeeperState.SaslAuthenticated) { sendAuthEvent = true; } } } if (sendAuthEvent == true) { eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, authState,null)); } } to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } if (to <= 0) { String warnInfo; warnInfo = "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId); LOG.warn(warnInfo); throw new SessionTimeoutException(warnInfo); } if (state.isConnected()) { //1000(1 second) is to prevent race condition missing to send the second ping //also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0); //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } } // If we are in read-only mode, seek for read/write server if (state == States.CONNECTEDREADONLY) { long now = Time.currentElapsedTime(); int idlePingRwServer = (int) (now - lastPingRwServer); if (idlePingRwServer >= pingRwTimeout) { lastPingRwServer = now; idlePingRwServer = 0; pingRwTimeout = Math.min(2*pingRwTimeout, maxPingRwTimeout); pingRwServer(); } to = Math.min(to, pingRwTimeout - idlePingRwServer); }// 傳輸方法 pendingQueue 等待結果的隊列 outgoingQueue 需要發送的隊列 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { if (closing) { if (LOG.isDebugEnabled()) { // closing so this is expected LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(getSessionId()) + " : " + e.getMessage()); } break; } else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else if (e instanceof SocketException) { LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage()); } else { LOG.warn("Session 0x{} for server {}, unexpected error{}", Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e); } cleanup(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } cleanup(); clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId())); }/<code>
開始連接startConnect()方法
<code>private void startConnect(InetSocketAddress addr) throws IOException { // initializing it for new connection saslLoginFailed = false; // 設置state為CONNECTING state = States.CONNECTING; setName(getName().replaceAll("\\\\(.*\\\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")")); if (ZooKeeperSaslClient.isEnabled()) { try { String principalUserName = System.getProperty( ZK_SASL_CLIENT_USERNAME, "zookeeper"); zooKeeperSaslClient = new ZooKeeperSaslClient( principalUserName+"/"+addr.getHostName()); } catch (LoginException e) { // An authentication error occurred when the SASL client tried to initialize: // for Kerberos this means that the client failed to authenticate with the KDC. // This is different from an authentication error that occurs during communication // with the Zookeeper server, which is handled below. LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it."); eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null)); saslLoginFailed = true; } } logStartConnect(addr);// 連接 clientCnxnSocket.connect(addr); }/<code>
<code> @Override void connect(InetSocketAddress addr) throws IOException { SocketChannel sock = createSock(); try { // 註冊連接 registerAndConnect(sock, addr); } catch (IOException e) { LOG.error("Unable to open socket to " + addr); sock.close(); throw e; } initialized = false; /* * Reset incomingBuffer */ lenBuffer.clear(); incomingBuffer = lenBuffer; }/<code>
<code>void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException { sockKey = sock.register(selector, SelectionKey.OP_CONNECT); // 創建socket連接 boolean immediateConnect = sock.connect(addr); if (immediateConnect) { sendThread.primeConnection(); } }/<code>
<code>// 次方法最終也會封裝成package放到outgoingQueue隊列中void primeConnection() throws IOException { LOG.info("Socket connection established to " + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session"); isFirstConnect = false; long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd); synchronized (outgoingQueue) { // We add backwards since we are pushing into the front // Only send if there's a pending watch // TODO: here we have the only remaining use of zooKeeper in // this class. It's to be eliminated! if (!disableAutoWatchReset) { List<string> dataWatches = zooKeeper.getDataWatches(); List<string> existWatches = zooKeeper.getExistWatches(); List<string> childWatches = zooKeeper.getChildWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) { Iterator<string> dataWatchesIter = prependChroot(dataWatches).iterator(); Iterator<string> existWatchesIter = prependChroot(existWatches).iterator(); Iterator<string> childWatchesIter = prependChroot(childWatches).iterator(); long setWatchesLastZxid = lastZxid; while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()) { List<string> dataWatchesBatch = new ArrayList<string>(); List<string> existWatchesBatch = new ArrayList<string>(); List<string> childWatchesBatch = new ArrayList<string>(); int batchLength = 0; // Note, we may exceed our max length by a bit when we add the last // watch in the batch. This isn't ideal, but it makes the code simpler. while (batchLength < SET_WATCHES_MAX_LENGTH) { final String watch; if (dataWatchesIter.hasNext()) { watch = dataWatchesIter.next(); dataWatchesBatch.add(watch); } else if (existWatchesIter.hasNext()) { watch = existWatchesIter.next(); existWatchesBatch.add(watch); } else if (childWatchesIter.hasNext()) { watch = childWatchesIter.next(); childWatchesBatch.add(watch); } else { break; } batchLength += watch.length(); } SetWatches sw = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setWatches); h.setXid(-8); // 封裝為package Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); // 添加到隊列中 outgoingQueue.addFirst(packet); } } } for (AuthData id : authInfo) { outgoingQueue.addFirst(new Packet(new RequestHeader(-4, OpCode.auth), null, new AuthPacket(0, id.scheme, id.data), null, null)); } outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly)); } clientCnxnSocket.enableReadWriteOnly(); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request sent on " + clientCnxnSocket.getRemoteSocketAddress()); } }/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<string>/<code>
返回到run方法查看調用的doTransport傳輸方法
<code> @Override void doTransport(int waitTimeOut, List<packet> pendingQueue, LinkedList<packet> outgoingQueue, ClientCnxn cnxn) throws IOException, InterruptedException { selector.select(waitTimeOut); Set<selectionkey> selected; synchronized (this) { selected = selector.selectedKeys(); } // Everything below and until we get back to the select is // non blocking, so time is effectively a constant. That is // Why we just have to do this once, here updateNow(); for (SelectionKey k : selected) { SocketChannel sc = ((SocketChannel) k.channel()); if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { if (sc.finishConnect()) { updateLastSendAndHeard(); sendThread.primeConnection(); } } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { doIO(pendingQueue, outgoingQueue, cnxn); } } if (sendThread.getZkState().isConnected()) { synchronized(outgoingQueue) { if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { enableWrite(); } } } selected.clear(); }/<selectionkey>/<packet>/<packet>/<code>
<code>void doIO(List<packet> pendingQueue, LinkedList<packet> outgoingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // 處理讀請求 if (sockKey.isReadable()) { int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { recvCount++; readLength(); } else if (!initialized) { readConnectResult(); enableRead(); if (findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } // 處理寫請求 if (sockKey.isWritable()) { synchronized(outgoingQueue) { Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress()); if (p != null) { updateLastSend(); // If we already started writing p, p.bb will already exist if (p.bb == null) { if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping) && (p.requestHeader.getType() != OpCode.auth)) { p.requestHeader.setXid(cnxn.getXid()); } p.createBB(); } sock.write(p.bb); if (!p.bb.hasRemaining()) { sentCount++; outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping && p.requestHeader.getType() != OpCode.auth) { synchronized (pendingQueue) { pendingQueue.add(p); } } } } if (outgoingQueue.isEmpty()) { // No more packets to send: turn off write interest flag. // Will be turned on later by a later call to enableWrite(), // from within ZooKeeperSaslClient (if client is configured // to attempt SASL authentication), or in either doIO() or // in doTransport() if not. disableWrite(); } else if (!initialized && p != null && !p.bb.hasRemaining()) { // On initial connection, write the complete connect request // packet, but then disable further writes until after // receiving a successful connection response. If the // session is expired, then the server sends the expiration // response and immediately closes its end of the socket. If // the client is simultaneously writing on its end, then the // TCP stack may choose to abort with RST, in which case the // client would never receive the session expired event. See // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html disableWrite(); } else { // Just in case enableWrite(); } } } }/<packet>/<packet>/<code>
<code> private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); }// 根據是否有回調方法,判斷是同步請求還是異步請求 if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; // 異步請求放到waitingEvents 隊列中,現在就要用到eventThread eventThread.queuePacket(p); } }/<code>
下面跟蹤一下eventThread線程,當然也是先看run方法:
<code>@Override public void run() { try { isRunning = true; while (true) { // take 方法取數據 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { // event處理 processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId())); }/<code>
<code>private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // each watcher will process the event WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { Packet p = (Packet) event; int rc = 0; String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { StatCallback cb = (StatCallback) p.cb; if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof MultiResponse) { MultiCallback cb = (MultiCallback) p.cb; MultiResponse rsp = (MultiResponse) p.response; if (rc == 0) { List<opresult> results = rsp.getResultList(); int newRc = rc; for (OpResult result : results) { if (result instanceof ErrorResult && KeeperException.Code.OK.intValue() != (newRc = ((ErrorResult) result).getErr())) { break; } } cb.processResult(newRc, clientPath, p.ctx, results); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }/<opresult>/<code>
整片內容比較多建議收藏起來看,源碼方面的知識也比較枯燥一些。
閱讀更多 JAVA破局之路 的文章