概述
Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。 它是一個NIO客戶端服務器框架,可以快速輕鬆地開發網絡應用程序,極大地簡化了網絡編程,例如TCP和UDP套接字服務器開發。
整合
在pom.xml中添加依賴:
<code> <dependency> <groupid>io.netty/<groupid> <artifactid>netty-all/<artifactid> <version>4.1.42.Final/<version> /<dependency>/<code>
️以下使用時間服務器作為例子。
服務端實現
創建NettyServer類添加@Component註解交由容器管理
<code>@Componentpublic class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); // 服務端NIO線程組 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workGroup = new NioEventLoopGroup(); public ChannelFuture start(String host, int port) { ChannelFuture channelFuture = null; try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<socketchannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 自定義服務處理 socketChannel.pipeline().addLast(new ServerHandler()); } }); // 綁定端口並同步等待 channelFuture = bootstrap.bind(host, port).sync(); logger.info("======Start Up Success!========="); } catch (Exception e) { e.printStackTrace(); } return channelFuture; } public void close() { workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); logger.info("======Shutdown Netty Server Success!========="); }}/<socketchannel>/<code>
創建服務處理類ServerHandler繼承ChannelInboundHandlerAdapter類並重寫channelRead、channelReadComplete、exceptionCaught方法。
<code>public class ServerHandler extends ChannelInboundHandlerAdapter { /** * 客戶端數據到來時觸發 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("client request: " + buf.toString(CharsetUtil.UTF_8)); SimpleDateFormat sf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); String callback = sf.format(new Date()); ctx.write(Unpooled.copiedBuffer(callback.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 將發送緩衝區的消息全部寫到SocketChannel中 ctx.flush(); } /** * 發生異常時觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 釋放與ChannelHandlerContext相關聯的資源 ctx.close(); }}/<code>
在SpringBoot啟動類上實現CommandLineRunner接口。 注入容器中的NettyServer對象,在run方法中添加需要監聽的地址以及端口開啟服務。 獲取當前線程鉤子,使jvm關閉前服務同時關閉釋放相關資源。
<code>@SpringBootApplicationpublic class NioApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(NioApplication.class, args); } @Resource private NettyServer nettyServer; @Override public void run(String... args) throws Exception { // 開啟服務 ChannelFuture future = nettyServer.start("localhost", 7070); // 在JVM銷燬前關閉服務 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { nettyServer.close(); } }); future.channel().closeFuture().sync(); }}/<code>
客戶端實現
新建一個普通的項目並添加依賴netty-all-4.1.43.Final.jar(下載並解壓後只需添加all-in-one文件夾內的netty-all-4.1.43.Final.jar)
創建NettyClient類作為客戶端
<code>public class NettyClient { private static final EventLoopGroup group = new NioEventLoopGroup(); public static void main(String[] args) throws Exception { try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 自定義處理程序 socketChannel.pipeline().addLast(new ClientHandler()); } }); // 綁定端口並同步等待 ChannelFuture channelFuture = bootstrap.connect("localhost", 7070).sync(); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }/<socketchannel>/<code>
創建處理程序類ClientHandler繼承ChannelInboundHandlerAdapter類重寫channelActive、channelRead、exceptionCaught方法。
<code>public class ClientHandler extends ChannelInboundHandlerAdapter { /** * 連接到服務器時觸發 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("current time", CharsetUtil.UTF_8)); } /** * 消息到來時觸發 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("Current Time: " + buf.toString(CharsetUtil.UTF_8)); } /** * 發生異常時觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}/<code>
測試
以上一個簡單的服務端以及客戶端就配置完成了,分別運行服務端程序以及客戶端程序進行測試。
![Spring Boot整合Netty](http://p2.ttnews.xyz/loading.gif)
Server-1
![Spring Boot整合Netty](http://p2.ttnews.xyz/loading.gif)
Client-1
粘包處理
經過測試一個簡單的時間服務器配置成功,現在修改客戶端,使客戶端能夠同時發送大量數據,並進行測試。
修改客戶端處理程序ClientHandler類中的channelActive方法,連續發送100條信息。
<code> @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 100; i++) { ctx.write(Unpooled.copiedBuffer("current time", CharsetUtil.UTF_8)); } ctx.flush(); }/<code>
服務端接收如下所示:
Server-2
客戶端接收如下所示:
Client-2
由圖可知服務端只接收了2次,並且數據被連接在了一起,可知發生了粘包的情況。 下面我們使用netty提供的解碼器來解決粘包問題。
修改發送的數據格式,在數據之間添加換行符 \\n 進行間隔:
<code> //--------ClientHandler--------- ctx.write(Unpooled.copiedBuffer("current time\\n", CharsetUtil.UTF_8)); //--------ServerHandler--------- String callback = sf.format(new Date()) + "\\n";/<code>
在之前配置的NettyServer類的服務處理程序之前添加解碼器LineBasedFrameDecoder:
<code> bootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<socketchannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // LineBasedFrameDecoder能夠將接收到的數據在行尾進行拆分。 // 設置解碼幀的最大長度,如果幀的長度超過此值拋出異常 socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 服務處理 socketChannel.pipeline().addLast(new ServerHandler()); } });/<socketchannel>/<code>
同理客戶端類NettyClient也同時添加解碼器LineBasedFrameDecoder:
<code> bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024)); socketChannel.pipeline().addLast(new ClientHandler()); } });/<socketchannel>/<code>
再次進行測試
Server-3
閱讀更多 java互聯網高級架構 的文章