Netty NioEventLoop 創建過程源碼分析

前面 ,我們分析了Netty中的Channel組件,本篇我們來介紹一下與Channel關聯的另一個核心的組件 ——

EventLoop

Netty版本:4.1.30

概述

EventLoop定義了Netty的核心抽象,用於處理網絡連接生命週期中所有發生的事件。

我們先來從一個比較高的視角來了解一下Channels、Thread、EventLoops、EventLoopGroups之間的關係。

Netty NioEventLoop 創建過程源碼分析

上圖是表示了擁有4個EventLoop的EventLoopGroup處理IO的流程圖。它們之間的關係如下:

  • 一個 EventLoopGroup包含一個或多個EventLoop
  • 一個 EventLoop在它的生命週期內只和一個Thread綁定
  • 所有由EventLoop處理的I/O事件都將在它專有的Thread上被處理
  • 一個Channel在它的生命週期內只註冊於一個EventLoop
  • 一個EventLoop可能會被分配給一個或多個Channel

EventLoop 原理

下圖是Netty EventLoop相關類的UML圖。從中我們可以看到EventLoop相關的類都是實現了 java.util.concurrent 包中的 ExecutorService 接口。我們可以直接將任務(Runable 或 Callable) 提交給EventLoop去立即執行或定時執行。

Netty NioEventLoop 創建過程源碼分析

例如,使用EventLoop去執行定時任務,樣例代碼:

public static void scheduleViaEventLoop() {
Channel ch = new NioSocketChannel();
ScheduledFuture> future = ch.eventLoop().schedule(
() -> System.out.println("60 seconds later"), 60, TimeUnit.SECONDS);
}

Thread 管理

Netty線程模型的高性能主要取決於當前所執行線程的身份的確定。一個線程提交到EventLoop執行的流程如下:

  • 將Task任務提交給EventLoop執行
  • 在Task傳遞到execute方法之後,檢查當前要執行的Task的線程是否是分配給EventLoop的那個線程
  • 如果是,則該線程會立即執行
  • 如果不是,則將線程放入任務隊列中,等待下一次執行

其中,Netty中的每一個EventLoop都有它自己的任務隊列,並且和其他的EventLoop的任務隊列獨立開來。

Netty NioEventLoop 創建過程源碼分析

現在私信我“資料”即可獲取Java工程化、高性能及分佈式、高性能、高架構、性能調優、Spring、MyBatis、Netty源碼分析等多個知識點高級進階乾貨的直播免費學習權限及相關視頻資料,還有spring和虛擬機等書籍掃描版

Thread 分配

服務於Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根據不同的傳輸實現,EventLoop的創建和分配方式也不同。

NIO傳輸

Netty NioEventLoop 創建過程源碼分析

在NIO傳輸方式中,使用盡可能少的EventLoop就可以服務多個Channel。如圖所示,EventLoopGroup採用順序循環的方式負責為每一個新創建的Channel分配EventLoop,每一個EventLoop會被分配給多個Channels。

一旦一個Channel被分配給了一個EventLoop,則這個Channel的生命週期內,只會綁定這個EventLoop。這就讓我們在ChannelHandler的實現省去了對線程安全和同步問題的擔心。

OIO傳輸

Netty NioEventLoop 創建過程源碼分析

與NIO方式的不同在於,一個EventLoop只會服務於一個Channel。

NioEventLoop & NioEventLoopGroup 創建

初步瞭解了 EventLoop 以及 EventLoopGroup 的工作機制,接下來我們以 NioEventLoopGroup 為例,來深入分析 NioEventLoopGroup 是如何創建的,又是如何啟動的,它的內部執行邏輯又是怎樣的等等問題。

MultithreadEventExecutorGroup 構造器

我們從 NioEventLoopGroup 的構造函數開始分析:

EventLoopGroup acceptorEventLoopGroup = new NioEventLoopGroup(1);

NioEventLoopGroup構造函數會調用到父類 MultithreadEventLoopGroup 的構造函數,默認情況下,EventLoop的數量 = 處理器數量 x 2:

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
// 默認情況下,EventLoop的數量 = 處理器數量 x 2
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}

繼續調用父類,會調用到 MultithreadEventExecutorGroup 的構造器,主要做三件事情:

  • 創建線程任務執行器 ThreadPerTaskExecutor
  • 通過for循環創建數量為 nThreads 個的 EventLoop
  • 創建 EventLoop 選擇器 EventExecutorChooser
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 創建任務執行器 ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 創建 EventExecutor 數組
children = new EventExecutor[nThreads];
// 通過for循環創建數量為 nThreads 個的 EventLoop
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 調用 newChild 接口
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

// 創建選擇器
chooser = chooserFactory.newChooser(children);
final FutureListener<object> terminationListener = new FutureListener<object>() {
@Override
public void operationComplete(Future<object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<eventexecutor> childrenSet = new LinkedHashSet<eventexecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
/<eventexecutor>/<eventexecutor>/<object>/<object>/<object>

創建線程任務執行器 ThreadPerTaskExecutor

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

線程任務執行器 ThreadPerTaskExecutor 源碼如下,具體的任務都由 ThreadFactory 去執行:

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

this.threadFactory = threadFactory;
}

// 使用 threadFactory 執行任務
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

來看看 newDefaultThreadFactory 方法:

protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}

DefaultThreadFactory

接下來看看 DefaultThreadFactory 這個類,實現了 ThreadFactory 接口,我們可以瞭解到:

  • EventLoopGroup的命名規則
  • 具體的線程為 FastThreadLocalThread
public class DefaultThreadFactory implements ThreadFactory {

// 線程池ID編號自增器
private static final AtomicInteger poolId = new AtomicInteger();
// 線程ID自增器
private final AtomicInteger nextId = new AtomicInteger();
// 線程名稱前綴
private final String prefix;
// 是否為守護進程
private final boolean daemon;
// 線程優先級
private final int priority;
// 線程組

protected final ThreadGroup threadGroup;
public DefaultThreadFactory(Class> poolType) {
this(poolType, false, Thread.NORM_PRIORITY);
}
...
// 獲取線程名,返回結果:nioEventLoopGroup
public static String toPoolName(Class> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}

// nioEventLoopGroup-2-
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}

@Override
public Thread newThread(Runnable r) {
// 創建新線程 nioEventLoopGroup-2-1
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());

try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

// 創建新線程 FastThreadLocalThread
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

}

創建NioEventLoop

繼續從 MultithreadEventExecutorGroup 構造器開始,創建完任務執行器 ThreadPerTaskExecutor 之後,進入for循環,開始創建 NioEventLoop:

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 創建 nioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
}

...

}

NioEventLoopGroup類中的 newChild() 方法:

@Override 

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop 構造器:

public final class NioEventLoop extends SingleThreadEventLoop{

...

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調用父類 SingleThreadEventLoop 構造器
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 設置 selectorProvider
provider = selectorProvider;
// 獲取 SelectorTuple 對象,裡面封裝了原生的selector和優化過的selector
final SelectorTuple selectorTuple = openSelector();
// 設置優化過的selector
selector = selectorTuple.selector;
// 設置原生的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
// 設置select策略
selectStrategy = strategy;
}

...

}

接下來我們看看 獲取多路複用選擇器 方法—— openSelector() ,

// selectKey 優化選項flag
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private SelectorTuple openSelector() {

// JDK原生的selector
final Selector unwrappedSelector;
try {
// 通過 SelectorProvider 創建獲得selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果不優化,則直接返回
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 通過反射創建 sun.nio.ch.SelectorImpl 對象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

// 如果 maybeSelectorImplClass 不是 selector 的一個實現,則直接返回原生的Selector
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
// 確保當前的選擇器實現是我們可以檢測的
!((Class>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
// maybeSelectorImplClass 是selector的實現,則轉化為 selector 實現類
final Class> selectorImplClass = (Class>) maybeSelectorImplClass;
// 創建新的 SelectionKey 集合 SelectedSelectionKeySet,內部採用的是 SelectionKey 數組的形

// 式,而非 set 集合
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<object>() {
@Override
public Object run() {
try {
// 通過反射的方式獲取 sun.nio.ch.SelectorImpl 的成員變量 selectedKeys
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
// 通過反射的方式獲取 sun.nio.ch.SelectorImpl 的成員變量 publicSelectedKeys
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
// 設置字段 selectedKeys Accessible 為true
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
// 設置字段 publicSelectedKeys Accessible 為true
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;

logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}

// 設置 SelectedSelectionKeySet
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 返回包含了原生selector和優化過的selector的SelectorTuple
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
/<object>/<object>

優化後的 SelectedSelectionKeySet 對象,內部採用 SelectionKey 數組的形式:

final class SelectedSelectionKeySet extends AbstractSet<selectionkey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
// 使用數組,來替代HashSet,可以降低時間複雜度為O(1)
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}

@Override
public Iterator<selectionkey> iterator() {
return new Iterator<selectionkey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
// 擴容
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
/<selectionkey>/<selectionkey>/<selectionkey>

SingleThreadEventLoop 構造器

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

...

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
// 調用 SingleThreadEventExecutor 構造器
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);

}

...
}

SingleThreadEventExecutor 構造器,主要做兩件事情:

  • 設置線程任務執行器。
  • 設置任務隊列。前面講到EventLoop對於不能立即執行的Task會放入一個隊列中,就是這裡設置的。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

...

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 設置線程任務執行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 設置任務隊列
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");

}

...

}

NioEventLoop 中對 newTaskQueue 接口的實現,返回的是 JCTools 工具包 Mpsc 隊列。後面我們寫文章單獨介紹 JCTools 中的相關隊列。

Mpsc:Multi Producer Single Consumer (Lock less, bounded and unbounded)

多個生產者對單個消費者(無鎖、有界和無界都有實現)

public final class NioEventLoop extends SingleThreadEventLoop {
...
@Override
protected Queue<runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<runnable>newMpscQueue()
: PlatformDependent.<runnable>newMpscQueue(maxPendingTasks);
}

...
}
/<runnable>/<runnable>/<runnable>

創建線程執行選擇器chooser

接下來,我們看看 MultithreadEventExecutorGroup 構造器的最後一個部分內容,創建線程執行選擇器chooser,它的主要作用就是 EventLoopGroup 用於從 EventLoop 數組中選擇一個 EventLoop 去執行任務。

// 創建選擇器
chooser = chooserFactory.newChooser(children);

EventLoopGroup 中定義的 next() 接口:

public interface EventLoopGroup extends EventExecutorGroup {

...

// 選擇下一個 EventLoop 用於執行任務
@Override
EventLoop next();

...

}

MultithreadEventExecutorGroup 中對 next() 的實現:

@Override
public EventExecutor next() {
// 調用 DefaultEventExecutorChooserFactory 中的next()
return chooser.next();
}

DefaultEventExecutorChooserFactory 對於如何從數組中選擇任務執行器,也做了巧妙的優化。

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

// 判斷線程任務執行的個數是否為 2 的冪次方。e.g: 2、4、8、16
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

// 冪次方選擇器
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
// 通過二級制進行 & 運算,效率更高

return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// 普通選擇器
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}

@Override
public EventExecutor next() {
// 按照最普通的取模的方式從index=0開始向後開始選擇
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}

小結

通過本節內容,我們瞭解到了EventLoop與EventLoopGroup的基本原理,EventLoopGroup與EventLoop的創建過程:

  • 創建線程任務執行器 ThreadPerTaskExecutor
  • 創建EventLoop
  • 創建任務選擇器 EventExecutorChooser
  • Java讀源碼之Netty深入剖析
  • 《Netty in action》

最後送福利了,現在私信我“資料”即可獲取Java工程化、高性能及分佈式、高性能、高架構、性能調優、Spring、MyBatis、Netty源碼分析等多個知識點高級進階乾貨的直播免費學習權限及相關視頻資料,還有spring和虛擬機等書籍掃描版


分享到:


相關文章: