Netty深入淺出系列:Netty 簡介

前言

Netty是一個高性能、異步事件驅動的NIO框架,提供了對TCP、UDP和文件傳輸的支持,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結果。

作為當前最流行的NIO框架,Netty在互聯網領域、大數據分佈式計算領域、遊戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源組件也基於Netty構建,比如RPC框架、zookeeper等。

那麼,Netty性能為啥這麼高?主要是因為其內部Reactor模型的實現。

Reactor模型

Netty中的Reactor模型主要由多路複用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,可以分為三種。

1、單線程模型:所有I/O操作都由一個線程完成,即多路複用、事件分發和處理都是在一個Reactor線程上完成的。

Netty深入淺出系列:Netty 簡介

對於一些小容量應用場景,可以使用單線程模型。但是對於高負載、大併發的應用卻不合適,主要原因如下:

  • 一個線程同時處理成百上千的鏈路,性能上無法支撐,即便CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;
  • 當負載過重後,處理速度將變慢,這會導致大量客戶端連接超時,超時之後往往會進行重發,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸;
  • 一旦單線程意外跑飛,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障,可靠性不高。

2、多線程模型:為了解決單線程模型存在的一些問題,演化而來的Reactor線程模型。

Netty深入淺出系列:Netty 簡介

多線程模型的特點:

  • 有專門一個Acceptor線程用於監聽服務端,接收客戶端的TCP連接請求;
  • 網絡IO的讀寫操作由一個NIO線程池負責,線程池可以採用標準的JDK線程池實現,包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
  • 一個NIO線程可以同時處理多條鏈路,但是一個鏈路只能對應一個NIO線程,防止發生併發操作問題。

在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端併發連接,或者服務端需要對客戶端的握手消息進行安全認證,認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型-主從Reactor多線程模型。

3、主從多線程模型:採用多個reactor,每個reactor都在自己單獨的線程裡執行。如果是多核,則可以同時響應多個客戶端的請求,一旦鏈路建立成功就將鏈路註冊到負責I/O讀寫的SubReactor線程池上。

Netty深入淺出系列:Netty 簡介

事實上,Netty的線程模型並非固定不變,在啟動輔助類中創建不同的EventLoopGroup實例並通過適當的參數配置,就可以支持上述三種Reactor線程模型。正是因為Netty對Reactor線程模型的支持提供了靈活的定製能力,所以可以滿足不同業務場景的性能需求。

示例代碼

以下是server和client的示例代碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模塊進行深入分析。

server 代碼實現

public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<socketchannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).run();
}
}
/<socketchannel>

EchoServerHandler 實現

public class EchoServerHandler extends ChannelInboundHandlerAdapter { 

private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}

1、NioEventLoopGroup 是用來處理I/O操作的線程池,Netty對 EventLoopGroup 接口針對不同的傳輸協議提供了不同的實現。在本例子中,需要實例化兩個NioEventLoopGroup,通常第一個稱為“boss”,用來accept客戶端連接,另一個稱為“worker”,處理客戶端數據的讀寫操作。

2、ServerBootstrap 是啟動服務的輔助類,有關socket的參數可以通過ServerBootstrap進行設置。

3、這裡指定NioServerSocketChannel類初始化channel用來接受客戶端請求。

4、通常會為新SocketChannel通過添加一些handler,來設置ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline添加指定handler。

5、通過綁定端口8080,就可以對外提供服務了。

client 代碼實現

public class EchoClient { 

private final String host;
private final int port;
private final int firstMessageSize;

public EchoClient(String host, int port, int firstMessageSize) {
this.host = host;
this.port = port;
this.firstMessageSize = firstMessageSize;
}

public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<socketchannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});

// Start the client.
ChannelFuture f = b.connect(host, port).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
final String host = args[0];
final int port = Integer.parseInt(args[1]);
final int firstMessageSize;
if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]);
} else {

firstMessageSize = 256;
}

new EchoClient(host, port, firstMessageSize).run();
}
}
/<socketchannel>

EchoClientHandler 實現

public class EchoClientHandler extends ChannelInboundHandlerAdapter { 

private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());

private final ByteBuf firstMessage;

/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();

}
}


分享到:


相關文章: