Spring Boot整合Netty

概述

  Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。   它是一個NIO客戶端服務器框架,可以快速輕鬆地開發網絡應用程序,極大地簡化了網絡編程,例如TCP和UDP套接字服務器開發。

整合

pom.xml中添加依賴:

<code>        <dependency>            <groupid>io.netty/<groupid>            <artifactid>netty-all/<artifactid>            <version>4.1.42.Final/<version>        /<dependency>/<code>

️以下使用時間服務器作為例子。

服務端實現

創建NettyServer類添加@Component註解交由容器管理

<code>@Componentpublic class NettyServer {    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);    // 服務端NIO線程組    private final EventLoopGroup bossGroup = new NioEventLoopGroup();    private final EventLoopGroup workGroup = new NioEventLoopGroup();    public ChannelFuture start(String host, int port) {        ChannelFuture channelFuture = null;        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<socketchannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            // 自定義服務處理                            socketChannel.pipeline().addLast(new ServerHandler());                        }                    });            // 綁定端口並同步等待            channelFuture = bootstrap.bind(host, port).sync();            logger.info("======Start Up Success!=========");        } catch (Exception e) {            e.printStackTrace();        }        return channelFuture;    }    public void close() {        workGroup.shutdownGracefully();        bossGroup.shutdownGracefully();        logger.info("======Shutdown Netty Server Success!=========");    }}/<socketchannel>/<code>

創建服務處理類ServerHandler繼承ChannelInboundHandlerAdapter類並重寫channelRead、channelReadComplete、exceptionCaught方法。

<code>public class ServerHandler extends ChannelInboundHandlerAdapter {    /**     * 客戶端數據到來時觸發     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf buf = (ByteBuf) msg;        System.out.println("client request: " + buf.toString(CharsetUtil.UTF_8));        SimpleDateFormat sf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");        String callback = sf.format(new Date());        ctx.write(Unpooled.copiedBuffer(callback.getBytes()));    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // 將發送緩衝區的消息全部寫到SocketChannel中        ctx.flush();    }    /**     * 發生異常時觸發     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        // 釋放與ChannelHandlerContext相關聯的資源        ctx.close();    }}/<code>

在SpringBoot啟動類上實現CommandLineRunner接口。 注入容器中的NettyServer對象,在run方法中添加需要監聽的地址以及端口開啟服務。 獲取當前線程鉤子,使jvm關閉前服務同時關閉釋放相關資源。

<code>@SpringBootApplicationpublic class NioApplication implements CommandLineRunner {    public static void main(String[] args) {        SpringApplication.run(NioApplication.class, args);    }    @Resource    private NettyServer nettyServer;    @Override    public void run(String... args) throws Exception {        // 開啟服務        ChannelFuture future = nettyServer.start("localhost", 7070);        // 在JVM銷燬前關閉服務        Runtime.getRuntime().addShutdownHook(new Thread() {            @Override            public void run() {                nettyServer.close();            }        });        future.channel().closeFuture().sync();    }}/<code>

客戶端實現

新建一個普通的項目並添加依賴netty-all-4.1.43.Final.jar(下載並解壓後只需添加all-in-one文件夾內的netty-all-4.1.43.Final.jar)

創建NettyClient類作為客戶端

<code>public class NettyClient {    private static final EventLoopGroup group = new NioEventLoopGroup();    public static void main(String[] args) throws Exception {        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() {                @Override                protected void initChannel(SocketChannel socketChannel) throws Exception {                    // 自定義處理程序                    socketChannel.pipeline().addLast(new ClientHandler());                }            });            // 綁定端口並同步等待            ChannelFuture channelFuture = bootstrap.connect("localhost", 7070).sync();            channelFuture.channel().closeFuture().sync();        } finally {            group.shutdownGracefully();        }    }    }/<socketchannel>/<code>

創建處理程序類ClientHandler繼承ChannelInboundHandlerAdapter類重寫channelActive、channelRead、exceptionCaught方法。

<code>public class ClientHandler extends ChannelInboundHandlerAdapter {    /**     * 連接到服務器時觸發     */    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        ctx.writeAndFlush(Unpooled.copiedBuffer("current time", CharsetUtil.UTF_8));    }    /**     * 消息到來時觸發     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf buf = (ByteBuf) msg;        System.out.println("Current Time: " + buf.toString(CharsetUtil.UTF_8));    }    /**     * 發生異常時觸發     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}/<code>

測試

以上一個簡單的服務端以及客戶端就配置完成了,分別運行服務端程序以及客戶端程序進行測試。

Spring Boot整合Netty

Server-1

Spring Boot整合Netty

Client-1

粘包處理

經過測試一個簡單的時間服務器配置成功,現在修改客戶端,使客戶端能夠同時發送大量數據,並進行測試。

修改客戶端處理程序ClientHandler類中的channelActive方法,連續發送100條信息。

<code>    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        for (int i = 0; i < 100; i++) {            ctx.write(Unpooled.copiedBuffer("current time", CharsetUtil.UTF_8));        }        ctx.flush();    }/<code>

服務端接收如下所示:

Spring Boot整合Netty

Server-2

客戶端接收如下所示:

Spring Boot整合Netty

Client-2

由圖可知服務端只接收了2次,並且數據被連接在了一起,可知發生了粘包的情況。 下面我們使用netty提供的解碼器來解決粘包問題。

修改發送的數據格式,在數據之間添加換行符 \\n 進行間隔:

<code>        //--------ClientHandler---------        ctx.write(Unpooled.copiedBuffer("current time\\n", CharsetUtil.UTF_8));        //--------ServerHandler---------        String callback = sf.format(new Date()) + "\\n";/<code>

在之前配置的NettyServer類的服務處理程序之前添加解碼器LineBasedFrameDecoder

<code> bootstrap.group(bossGroup, workGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<socketchannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            // LineBasedFrameDecoder能夠將接收到的數據在行尾進行拆分。                            // 設置解碼幀的最大長度,如果幀的長度超過此值拋出異常                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));                            // 服務處理                            socketChannel.pipeline().addLast(new ServerHandler());                        }                    });/<socketchannel>/<code>

同理客戶端類NettyClient也同時添加解碼器LineBasedFrameDecoder

<code>        bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<socketchannel>() {                @Override                protected void initChannel(SocketChannel socketChannel) throws Exception {                    socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));                    socketChannel.pipeline().addLast(new ClientHandler());                }            });/<socketchannel>/<code>

再次進行測試

Spring Boot整合Netty

Server-3

Spring Boot整合Netty



分享到:


相關文章: