看我如何把NIO拉下神壇

1. 傳統的阻塞式I/O


看我如何把NIO拉下神壇


阻塞式I/O的阻塞指的是,socket的read函數、write函數是阻塞的。

1.2 阻塞式I/O編程模型

<code>public static void main(String[] args) {

try (ServerSocket serverSocket = new ServerSocket()) {
// 綁定端口
serverSocket.bind(new InetSocketAddress(8081));
while (true) {

// 輪詢established
Socket socket = serverSocket.accept();

new Thread(() -> {
try (BufferedReader buffer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true)) {
// 讀消息
while (true) {
String body = buffer.readLine();
if (body == null) {
break;
}
log.info("receive body: {}", body);
}

// 寫消息
printWriter.write("server receive message!");

} catch (Exception e) {
log.error(e.getMessage());
}
}).start();
}

} catch (Exception e) {
log.error(e.getMessage());
}
}/<code>

因為socket的accept函數,read函數,write函數是同步阻塞的,所以主線程不斷調用socket的accept函數,輪詢狀態是established的TCP連接。

read函數會從內核緩衝區中讀取已經準備好的數據,複製到用戶進程,如果內核緩衝區中沒有數據,那麼這個線程就的就會被掛起,相應的cpu的使用權被釋放出來。當內核緩衝中準備好數據後,cpu會響應I/O的中斷信號,喚醒被阻塞的線程處理數據。

當一個連接在處理I/O的時候,系統是阻塞的,如果是單線程的話必然就掛死在那裡;但CPU是被釋放出來的,開啟多線程,就可以讓CPU去處理更多的事情。

阻塞式I/O模型


看我如何把NIO拉下神壇


阻塞式I/O的缺點

缺乏擴展性,嚴重依賴線程。Java的線程佔用內存在512K-1M,線程數量過多會導致JVM內存溢出。大量的線程上下文切換嚴重消耗CPU性能。大量的I/O線程被激活會導致系統鋸齒狀負載。

2. NIO編程

同步非阻塞I/O模型


看我如何把NIO拉下神壇


對於NIO來說,如果內核緩衝區中沒有數據就直接返回一個EWOULDBLOCK錯誤,一般來說進程可以輪詢調用read函數,當緩衝區中有數據的時候將數據複製到用戶空間,而不用掛起線程。

所以同步非阻塞中的非阻塞指的是socket的讀寫函數不是阻塞的,但是用戶進程依然需要輪詢讀寫函數,所以是同步的。但是NIO給我們提供了不需要新起線程就可以利用CPU的可能,也就是I/O多路複用技術

2.1 I/O多路複用技術

在linux系統中,可以使用select/poll/epoll使用一個線程監控多個socket,只要有一個socket的讀緩存有數據了,方法就立即返回,然後你就可以去讀這個可讀的socket了,如果所有的socket讀緩存都是空的,則會阻塞,也就是將線程掛起。

一開始用的linux用的是select,但是selct比較慢,最終使用了epoll。

2.1.1 epoll的優點

  1. 支持打開的socket描述符(FD)僅受限於操作系統最大文件句柄數,而select最大支持1024。
  2. selcet每次都會掃描所有的socket,而epoll只掃描活躍的socket。
  3. 使用mmap加速數據在內核空間到用戶空間的拷貝。

2.2 NIO的工作機制

NIO實際上是一個事件驅動的模型,NIO中最重要的就是多路複用器(Selector)。在NIO中它提供了選擇就緒事件的能力,我們只需要把通道(Channel) 註冊到Selector上,Selector就會通過select方法(實際上操作系統是通過epoll)不斷輪詢註冊在其上的Channel,如果某個Channel上發生了讀就緒、寫就緒或者連接到來就會被Selector輪詢出來,然後通過SelectionKey(Channel註冊到Selector上時會返回和其綁定的SelectionKey)可以獲取到已經就緒的Channel集合,否則Selector就會阻塞在select方法上。

Selector調用select方法,並不是一個線程通過for循環去選擇就緒的Channel,而是操作系統通過epoll以事件的方式的通知JVM的線程,哪個通道發生了讀就緒或者寫就緒的事件。所以select方法更像是一個監聽器。

多路複用的核心目的就是使用最少的線程去操作更多的通道,在其內部並不是只有一個線程。創建線程的個數是根據通道的數量來決定的,每註冊1023個通道就創建1個新的線程。

NIO的核心是多路複用器和事件模型,搞清楚了這兩點其實就能搞清楚NIO的基本工作原理。原來在學習NIO的時候感覺很複雜,隨著對TCP理解的深入,發現NIO其實並不難。在使用NIO的時候,最核心的代就是把Channel和要監聽的事件註冊到Selector上。

不同類型通道支持的事件

看我如何把NIO拉下神壇


NIO事件模型示意圖


看我如何把NIO拉下神壇

2.2.1 代碼示例

ServerReactor

<code>@Slf4j
public class ServerReactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;

private volatile boolean stop = false;

public ServerReactor(int port, int backlog) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(port), backlog);
serverSocket.setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
// 將channel註冊到多路複用器上,並監聽ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}

public void setStop(boolean stop) {
this.stop = stop;
}

@Override
public void run() {
try {
// 無限的接收客戶端連接
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<selectionkey> selectionKeys = selector.selectedKeys();
Iterator<selectionkey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,否則會導致事件重複消費
it.remove();
try {
handle(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
if (selector != null) {
try {

selector.close();
} catch (IOException e) {
e.printStackTrace();
}

}
}

private void handle(SelectionKey key) throws Exception {
if (key.isValid()) {
// 如果是ACCEPT事件,代表是一個新的連接請求
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 相當於三次握手後,從全連接隊列中獲取可用的連接
// 必須使用accept方法消費ACCEPT事件,否則將導致多路複用器死循環
SocketChannel socketChannel = serverSocketChannel.accept();
// 設置為非阻塞模式,當沒有可用的連接時直接返回null,而不是阻塞。
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}

if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String content = new String(bytes);
System.out.println("recv client content: " + content);
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(("服務端已收到: " + content).getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);

} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}


}
}
}/<selectionkey>/<selectionkey>/<code>

ClientReactor

<code>public class ClientReactor implements Runnable {
final String host;
final int port;
final SocketChannel socketChannel;
final Selector selector;
private volatile boolean stop = false;

public ClientReactor(String host, int port) throws IOException {
this.socketChannel = SocketChannel.open();
this.socketChannel.configureBlocking(false);
Socket socket = this.socketChannel.socket();
socket.setTcpNoDelay(true);
this.selector = Selector.open();
this.host = host;
this.port = port;

}

@Override
public void run() {

try {
// 如果通道呈阻塞模式,則立即發起連接;
// 如果呈非阻塞模式,則不是立即發起連接,而是在隨後的某個時間才發起連接。

// 如果連接是立即建立的,說明通道是阻塞模式,當連接成功時,則此方法返回true,連接失敗出現異常。
// 如果此通道處於阻塞模式,則此方法的調用將會阻塞,直到建立連接或發生I/O錯誤。

// 如果連接不是立即建立的,說明通道是非阻塞模式,則此方法返回false,

// 並且以後必須通過調用finishConnect()方法來驗證連接是否完成
// socketChannel.isConnectionPending()判斷此通道是否正在進行連接
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);

}
while (!stop && !Thread.interrupted()) {
int num = selector.select();
Set<selectionkey> selectionKeys = selector.selectedKeys();
Iterator<selectionkey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 移除key,否則會導致事件重複消費
it.remove();
try {
handle(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}

if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}

}


}

private void handle(SelectionKey key) throws IOException {


if (key.isValid()) {

SocketChannel socketChannel = (SocketChannel) key.channel();

if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}
}

if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
System.out.println("recv server content: " + new String(bytes));
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}

}
}

private void doWrite(SocketChannel socketChannel) {
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (scanner.hasNext()) {
try {

ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(scanner.nextLine().getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
} catch (Exception e) {

}
}
}).start();
}
}/<selectionkey>/<selectionkey>/<code>

鏈接:https://juejin.im/post/5dfae986518825122671c846


分享到:


相關文章: