NioEventLoopGroup 源碼分析

NioEventLoopGroup 源碼分析

1. 在閱讀源碼時做了一定的註釋,並且做了一些測試分析源碼內的執行流程,由於博客篇幅有限。為了方便 IDE 查看、跟蹤、調試 代碼,所以在 github上提供 netty 的源碼、詳細的註釋及測試用例。歡迎大家 star、fork !

2. 由於個人水平有限,對源碼的分析理解可能存在偏差或不透徹的地方還請大家在評論區指出,謝謝!

從今天開始,就準備進軍 ne tty 了,主要的想法是看看 netty4 中一些比較重要的實現,也就是能經常出現在我們面前的東西。主要是: 線程池、通道、管道、編解碼器、以及常用的工具類。

然後現在看源碼應該不會像之前的 jdk 那麼細緻了,主要是看了一個類以後就發現 netty 對代碼封裝太強了,基本一個功能可能封裝了七八個類去實現,很多的抽象類但是這些抽象類中的功能還非常的多。所以說主要看這個流程,以及裡面寫的比較好的代碼或者比較新的思想會仔細的去看看。具體的子字段,每個方法不可能做到那麼細緻。

好,正式開始 netty 源碼征戰 !

1. 基本思路

這裡首先講一下結論,也就是先說我看這個類的源碼整理出來的思路,主要就是因為這些類太雜,一個功能在好幾個類中才完全實現。

我們在 new 一個 worker/boss 線程的時候一般是採用的直接使用的無參的構造方法,但是無參的構造方法他創建的線程池的大小是我們 CPU 核心的 2 倍。緊接著就需要 new 這麼多個線程放到線程池裡面,這裡的線程池採用的數據結構是一個數組存放的,每一個線程需要設置一個任務隊列,顯然任務隊列使用的是一個阻塞隊列,這裡實際採用的是

LinkedBlockQueue ,然後回想一下在 jdk 中的線程池是不是還有一個比較重要的參數就是線程工廠,對的!這裡也有這個東西,他是需要我們手動傳入的,但是如果不傳則會使用一個默認的線程工廠,裡面有一個 newThread 方法,這個方法實現基本和 jdk 中的實現一模一樣,就是創建一個級別為 5 的非 Daemon 線程。對這就是我們在創建一個線程池時候完成的全部工作!

好現在來具體說一下,我們每次創建的是 NioEventLoopGroup 但是他又繼承了 n 個類才實現了線程池,也就是線程池的祖先是 ScheduledExecutorService是 jdk 中的線程池的一個接口,其中裡面最重要的數據結構就是一個 children 數組,用來裝線程的。

然後具體的線程他也是進行了封裝的,也就是我們常看到的 NioEventLoop 。這個類裡面有兩個比較重要的結構:taskQueue 和 thread 。很明顯這個非常類似 jdk 中的線程池。

2. NioEventLoopGroup 線程池分析

首先要創建線程池,傳入的線程數為 0,他是一直在調用 this()

最後追溯到 super(nThreads,threadFactory,selectorProvider) 也就是使用了 MultithreadEventLoopGroup 的構造方法,在這一步確定了當傳入的線程數為 0 時應該設置的線程數為 CPU 核心的兩倍。然後再次上調,調用了 MultithreadEventExecutorGroup 的構造方法,在這裡才是真正的開始了線程池的初始化。

首先設置了線程池工廠,然後初始化 chooser ,接著創建 n 個線程放到 children 數組中,最後設置線程中斷的監聽事件。

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
/** * 這個方法流程: * 1、設置了默認的線程工廠 * 2、初始化 chooser * 3、創建nTreads個NioEventLoop對象保存在children數組中 * 4、添加中斷的監聽事件 * @param nThreads * @param threadFactory * @param args */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 默認使用線程工廠是 DefaultThreadFactory if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; // 二的平方的實現是看 n&-n==n //根據線程個數是否為2的冪次方,採用不同策略初始化chooser if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //產生nTreads個NioEventLoop對象保存在children數組中 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 沒成功,把已有的線程優雅關閉 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 沒有完全關閉的線程讓它一直等待 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } // 對每一個 children 添加中斷線程時候的監聽事件,就是將 terminatedChildren 自增 // 判斷是否到達線程總數,是則更新 terminationFuture final FutureListener terminationListener = new FutureListener() { @Override public void operationComplete(Future 
future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } }

其中有一個 if 分支用來初始化 chooser ,這個 chooser 就是用來選擇使用哪個線程來執行哪些操作的。這裡用到了判斷一個數是否為 2 的次冪的一個方法 isPowerOfTwo() 實現比較有意思,貼出來。

123
private static boolean isPowerOfTwo(int val) { return (val & -val) == val;}

接下來目光要轉向 newChild(threadFactory, args) ,因為在這個類裡面這個方法是抽象的,在 NioEventLoopGroup 得到了實現。其實看到了也非常的簡單粗暴,直接 new 了一個 NioEventLoop ,接下來就應該分析這個線程的包裝類了。

123456
@Overrideprotected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { // 這裡才是重點 也就是真正的線程 被放在自己的 children 數組中 return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);}

3. NioEventLoop 線程分析

上面已經看到了,newChild 方法就是 new 了一個 NioEventLoop 。所以有必要好好看看這個線程包裝類。

這個類的構造方法是調用了父類 SingleThreadEventLoop 的構造,接著繼續上調 SingleThreadEventExecutor 構造,在這個類中才真正的實現了線程的構造。裡面就做了兩件事 :

  1. new 了一個新的線程,新的線程還分配了一個任務,任務的內容就是調用本類中的一個 run 方法,在 NioEventLoop 中實現。

  2. 設置任務隊列為 LinkedBlockQueue

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
/** * 構造方法主要完成了: * 1、new 一個新的線程執行一個 run 方法 * 2、用 LinkedBlockQueue 初始化 taskQueue * @param parent * @param threadFactory * @param addTaskWakesUp */ protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.parent = parent; this.addTaskWakesUp = addTaskWakesUp; // new 了一個新的線程 thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { // 調用一個 run 方法 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { // 讓線程關閉 for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn("An event executor terminated with non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); // 使用 LinkedBlockQueue 初始化 taskQueue taskQueue = newTaskQueue(); }

然後看一下他要執行的 run 方法在 NioEventLoop 中得到了實現。

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
/** *'wakenUp.compareAndSet(false, true)' 一般都會在 select.wakeUp() 之前執行 * 因為這樣可以減少 select.wakeUp() 調用的次數,select.wakeUp() 調用是一個代價 * 很高的操作 * 注意:如果說我們過早的把 wakenUp 設置為 true,可能導致線程的競爭問題,過早設置的情形如下: 1) Selector is waken up between 'wakenUp.set(false)' and 'selector.select(...)'. (BAD) 2) Selector is waken up between 'selector.select(...)' and 'if (wakenUp.get()) { ... }'. (OK) 在第一種情況中 wakenUp 被設置為 true 則 select 會立刻被喚醒直到 wakenUp 再次被設置為 false 但是wakenUp.compareAndSet(false, true)會失敗,並且導致所有希望喚醒他的線程都會失敗導致 select 進行不必要的休眠 為了解決這個問題我們是在 wakenUp 為 true 的時候再次對 select 進行喚醒。 */ @Override protected void run() { for (;;) { // 獲取之前的線程狀態,並讓 select 阻塞 boolean oldWakenUp = wakenUp.getAndSet(false); try { // 有任務在線程創建之後直接開始 select if (hasTasks()) { selectNow(); //直接調用了 select 的 selectNow 然後再次喚醒同下面的代碼 // 沒有任務 } else { // 自旋進行等待可進行 select 操作 select(oldWakenUp); // 再次喚醒,解決併發問題 if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; // 都是處理 selected 的通道的數據,並執行所有的任務,只是在 runAllTasks 傳的參數不同 if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } } 

緊接著就是分析這個 run 方法,也就是線程在被創建之後進行的一系列操作。裡面主要做了三件事:

  1. 進行 select

  2. 處理 selectedKeys

  3. 喚醒隊列中所有的任務

上面的操作都是在一個循環裡面一直執行的,所以說 NioEventLoop 這個線程的作用就只有一個那就是:進行任務處理。在這個線程被 new 出來時我們就給他分配了線程的任務就是永不停歇的進行上面的操作。

上面的過程說的是有線程安全問題,也就是如果我們過早的把 wakenUp 設置為 true,我們的 select 就會甦醒過來,而其他的線程不清楚這種狀態想要設置為 wakenUp 的時候都會失敗,導致 select 休眠。主要感覺有點是因為這個東西不是線程間可見的,要是採用 volatile 可能就會解決這個問題,但是 wakenUp 是 final 的不能使用 volatile 關鍵字修飾。所以作者採用的解決方案就是再次手動喚醒,防止由於其他線程併發設置 wakenUp 的值導致的不必要的休眠。

然後要說一下 select 方法,這個方法的調用主要因為在隊列中沒有任務,所以就暫時不用 select ,這個方法裡面做的就是自旋的去 select ,沒有任務就 等待一段時間再去 select。

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
/** * 這個方法主要乾的事情: * 1、如果不需要等待就直接 select * 2、需要等待則等一個超時時間再去 select * 這個過程是不停進行的也就是死循環直達有任務可進行 select 時 select 完畢退出循環 * @param oldWakenUp * @throws IOException */ private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // 不用等待進行一次 select 操作 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // 等一個超時再去選擇 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } }

接著就是 processSelectedKeys();runAllTasks(); 這兩個方法,前一個方法不說就是和我們寫 Nio 的時候的步驟差不多,遍歷 selectedKeys 處理,然後 runAllTasks() 執行所有的任務的 run 方法。

12345678910111213141516171819202122 
protected boolean runAllTasks() { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } // 這個循環就是用來循環任務隊列中的所有任務 for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); // 循環條件 if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }

4. 總結

好了其實到這裡線程池其實分析的已經差不多了,對於很多的細節問題並沒有仔細的去看,單絲我們清楚流程以及裡面的結構基本就差不多了。

NioEventLoopGroup 中包裝了 NioEventLoop 線程任務。具體包裝在了 children 數組中,然後使用 newThread 工廠創建線程,接著給線程分配任務,任務就是進行 select 操作。


分享到:


相關文章: