Springboot 2.0 +protobuf + Netty 實戰(附源碼)

20

大進階架構專題每日送達

Springboot 2.0 +protobuf + Netty 實戰(附源碼)

作者:pjmike_pj

juejin.im/post/5bd584bc518825292865395d

前言

這一篇文章主要介紹如何用Springboot 整合 Netty,這裡也是在網上搜尋了一些Netty例子學習後總結來的,借鑑了他人的寫法和經驗。如有重複部分,還請見諒。

關於SpringBoot 如何整合使用 Netty ,我將分為以下幾步進行分析與討論:

  • 構建Netty 服務端

  • 構建Netty 客戶端

  • 利用protobuf定義消息格式

  • 服務端空閒檢測

  • 客戶端發送心跳包與斷線重連

PS: 我這裡為了簡單起見(主要是懶),將 Netty 服務端與客戶端放在了同一個SpringBoot工程裡,當然也可以將客戶端和服務端分開。

構建 Netty 服務端

Netty 服務端的代碼其實比較簡單,代碼如下:

<code>@Component
@Slf4j
public class NettyServer {
/**
* boss 線程組用於處理連接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup;
/**
* work 線程組用於數據處理
*/
private EventLoopGroup work = new NioEventLoopGroup;
@Value("${netty.port}")
private Integer port;
/**
* 啟動Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap;
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口設置套接字地址
.localAddress(new InetSocketAddress(port))

//服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數
.option(ChannelOption.SO_BACKLOG, 1024)

//設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
.childOption(ChannelOption.SO_KEEPALIVE, true)


//將小的數據包包裝成更大的幀進行傳送,提高網絡的負載,即TCP延遲傳輸
.childOption(ChannelOption.TCP_NODELAY, true)

.childHandler(new NettyServerHandlerInitializer);
ChannelFuture future = bootstrap.bind.sync;
if (future.isSuccess) {
log.info("啟動 Netty Server");
}
}

@PreDestroy
public void destory throws InterruptedException {
boss.shutdownGracefully.sync;
work.shutdownGracefully.sync;
log.info("關閉Netty");
}
}
/<code>

因為我們在springboot 項目中使用 Netty ,所以我們將Netty 服務器的啟動封裝在一個 start方法,並使用 @PostConstruct註解,在指定的方法上加上 @PostConstruct註解來表示該方法在 Spring 初始化 NettyServer類後調用。

考慮到使用心跳機制等操作,關於ChannelHandler邏輯處理鏈的部分將在後面進行闡述。

構建 Netty 客戶端

Netty 客戶端代碼與服務端類似,代碼如下:

<code>@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup;
@Value("${netty.port}")
private int port;

@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;

public void sendMsg(MessageBase.Message message) {
socketChannel.writeAndFlush(message);
}

@PostConstruct
public void start {
Bootstrap bootstrap = new Bootstrap;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ClientHandlerInitilizer);
ChannelFuture future = bootstrap.connect;
//客戶端斷線重連邏輯
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess) {
log.info("連接Netty服務端成功");
} else {
log.info("連接失敗,進行斷線重連");
future1.channel.eventLoop.schedule( -> start, 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel;
}
}
/<code>

上面還包含了客戶端斷線重連的邏輯,更多細節問題,將在下面進行闡述。

使用 protobuf 構建通信協議

在整合使用 Netty 的過程中,我們使用 Google 的protobuf定義消息格式,下面來簡單介紹下 protobuf

protobuf簡介

Google 官方給 protobuf的定義如下:

Protocol Buffers 是一種輕便高效的結構化數據存儲格式,可以用於結構化數據序列化,很適合做數據存儲或 RPC 數據交換格式。它可用於通訊協議、數據存儲等領域的語言無關、平臺無關、可擴展的序列化結構數據格式。

在 Netty 中常用 protobuf 來做序列化方案,當然也可以用 protobuf來構建 客戶端與服務端之間的通信協議

為什麼要用protobuf

我們這裡是用 protobuf 做為我們的序列化手段,那我們為什麼要使用 protobuf,而不使用其他序列化方案呢,比如 jdk 自帶的序列化,Thrift,fastjson等。

首先 jdk 自帶序列化手段有很多缺點,比如:

  • 序列化後的碼流太大

  • 性能太低

  • 無法跨語言

而 Google Protobuf 跨語言,支持C++、java和python。然後利用protobuf 編碼後的消息更小,有利於存儲和傳輸,並且其性能也非常高,相比其他序列化框架,它也是非常有優勢的,具體的關於Java 各種序列化框架比較此處就不多說了。總之,目前Google Protobuf 廣泛的被使用到各種項目,它的諸多優點讓我們選擇使用它。

怎麼使用protobuf

對於 Java 而言,使用 protobuf 主要有以下幾步:

  • 在 .proto 文件中定義消息格式

  • 使用 protobuf 編譯器編譯 .proto文件 成 Java 類

  • 使用 Java 對應的 protobuf API來寫或讀消息

定義 protobuf 協議格式

這裡為我Demo裡的 message.proto文件為例,如下:

<code>//protobuf語法有 proto2和proto3兩種,這裡指定 proto3 

syntax = "proto3";
// 文件選項
option java_package = "com.pjmike.server.protocol.protobuf";
option java_outer_classname = "MessageBase";
// 消息模型定義
message Message {
string requestId = 1;
CommandType cmd = 2;
string content = 3;
enum CommandType {
NORMAL = 0; //常規業務消息
HEARTBEAT_REQUEST = 1; //客戶端心跳消息
HEARTBEAT_RESPONSE = 2; //服務端心跳消息
}
}
/<code>

文件解讀:

  • 文中的第一行指定正在使用 proto3語法,如果沒有指定,編譯器默認使用 proto2的語法。現在新項目中可能一般多用 proto3的語法,proto3比 proto2支持更多的語言但更簡潔。如果首次使用 protobuf,可以選擇使用 proto3

  • 定義 .proto文件時,可以標註一系列的選項,一些選項是文件級別的,比如上面的第二行和第三行,java_package文件選項表明protocol編譯器編譯 .proto文件生成的 Java 類所在的包,java_outer_classname選項表明想要生成的 Java 類的名稱

  • Message中定義了具體的消息格式,我這裡定義了三個字段,每個字段都有唯一的一個數字標識符,這些標識符用來在消息的二進制格式中識別各個字段的

  • Message中還添加了一個枚舉類型,該枚舉中含有類型 CommandType中所有的值,每個枚舉類型必須將其第一個類型映射為 0,該0值為默認值。

消息模型定義

關於消息格式,此處我只是非常非常簡單的定義了幾個字段,requestId代表消息Id,CommandType表示消息的類型,這裡簡單分為心跳消息類型和業務消息類型,然後content就是具體的消息內容。這裡的消息格式定義是十分簡陋,真正的項目實戰中,關於自定義消息格式的要求是非常多的,是比較複雜的。

上面簡單的介紹了 protobuf的一些語法規則,關於 protobuf語法的更多介紹參考官方文檔:

https://developers.google.com/protocol-buffers/docs/proto3

使用 .proto編譯器編譯

第一步已經定義好了 protobuf的消息格式,然後我們用 .proto文件的編譯器將我們定義的 消息格式編譯生成對應的 Java類,以便於我們在項目中使用該消息類。

關於protobuf編譯器的安裝這裡我就不細說,詳情見官方文檔:

https://developers.google.com/protocol-buffers/

安裝好編譯器以後,使用以下命令編譯.proto文件:

<code>protoc -I = ./ --java_out=./ ./Message.proto
/<code>
  • -I 選項用於指定待編譯的 .proto消息定義文件所在的目錄,該選項也可以寫作為 --proto_path

  • --java_out選項表示生成 Java代碼後存放位置,對於不同語言,我們的選項可能不同,比如生成C++代碼為 --cpp_out

  • 在前兩個選項後再加上 待編譯的消息定義文件

使用 Java 對應 的 protobuf API來讀寫消息

前面已經根據 .proto消息定義文件生成的Java類,我們這裡代碼根據 Message.proto生成了MessageBase類,但是要正常的使用生成的 Java 類,我們還需要引入 protobuf-java的依賴:

<code>
com.google.protobuf

protobuf-java
3.5.1

/<code>

使用 protobuf 生成的每一個 Java類中,都會包含兩種內部類:Msg 和 Msg 包含的 Builder(這裡的Msg就是實際消息傳輸類)。具體是.proto中定義的每一個message 都會生成一個 Msg,每一個Msg對應一個 Builder:

  • Buidler提供了構建類,查詢類的API

  • Msg提供了查詢,序列化,反序列化的API

比如我們使用 Builder來構建 Msg,例子如下:

<code>public class MessageBaseTest {
public static void main(String[] args) {
MessageBase.Message message = MessageBase.Message.newBuilder
.setRequestId(UUID.randomUUID.toString)
.setContent("hello world").build;
System.out.println("message: "+message.toString);
}
}
/<code>

這裡就不多介紹protobuf-java API的相關用法了,更多詳情還是參考官方文檔:

https://developers.google.com/protocol-buffers/docs/reference/java/

protobuf的編解碼器

上面說了這麼多,消息傳輸格式已經定義好了,但是在客戶端和服務端傳輸過程中我們還需要對這種 protobuf格式進行編解碼,當然我們可以自定義消息的編解碼,protobuf-java 的API中提供了相關的序列化和反序列化方法。好消息是,Netty 為了支持 protobuf提供了針對 protobuf的編解碼器,如下表所示(摘自《Netty實戰》) :

Springboot 2.0 +protobuf + Netty 實戰(附源碼)

有了這些編解碼器,將其加入客戶端和服務端的 ChannelPipeline中以用於對消息進行編解碼,如下:

<code>public class NettyServerHandlerInitializer extends ChannelInitializer {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline
//空閒檢測
.addLast(new ServerIdleStateHandler)
.addLast(new ProtobufVarint32FrameDecoder)
.addLast(new ProtobufDecoder(MessageBase.Message.getDefaultInstance))
.addLast(new ProtobufVarint32LengthFieldPrepender)
.addLast(new ProtobufEncoder)
.addLast(new NettyServerHandler);
}
}
/<code>

客戶端心跳機制

心跳機制簡介

心跳是在TCP長連接中,客戶端與服務端之間定期發送的一種特殊的數據包,通知對方在線以確保TCP連接的有效性。

如何實現心跳機制

有兩種方式實現心跳機制:

  • 使用TCP協議層面的 keepalive 機制

  • 在應用層上自定義的心跳機制

TCP層面的 keepalive 機制我們在之前構建 Netty服務端和客戶端啟動過程中也有定義,我們需要手動開啟,示例如下:

<code>// 設置TCP的長連接,默認的 keepalive的心跳時間是兩個小時
childOption(ChannelOption.SO_KEEPALIVE, true)
/<code>

除了開啟 TCP協議的 keepalive 之外,在我研究了github的一些開源Demo發現,人們往往也會自定義自己的心跳機制,定義心跳數據包。而Netty也提供了 IdleStateHandler 來實現心跳機制。(更多Springboot文章,參考:SpringBoot內容聚合)

Netty 實現心跳機制

下面來看看客戶端如何實現心跳機制:

<code>@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state == IdleState.WRITER_IDLE) {
log.info("已經10s沒有發送消息給服務端");
//向服務端送心跳包
//這裡使用 protobuf定義的消息格式
MessageBase.Message heartbeat = new MessageBase.Message.toBuilder.setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)

.setRequestId(UUID.randomUUID.toString)
.setContent("heartbeat").build;
//發送心跳消息,並在發送失敗時關閉該連接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
/<code>

我們這裡創建了一個ChannelHandler類並重寫了userEventTriggered方法,在該方法裡實現發送心跳數據包的邏輯,同時將 IdleStateEvent類加入邏輯處理鏈上。

實際上是當連接空閒時間太長時,將會觸發一個 IdleStateEvent事件,然後我們調用 userEventTriggered來處理該 IdleStateEvent事件。

當啟動客戶端和服務端之後,控制檯打印心跳消息如下:

<code>2018-10-28 16:30:46.825 INFO 42648 --- [ntLoopGroup-2-1] c.pjmike.server.client.HeartbeatHandler : 已經10s沒有發送消息給服務端
2018-10-28 16:30:47.176 INFO 42648 --- [ntLoopGroup-4-1] c.p.server.server.NettyServerHandler : 收到客戶端發來的心跳消息:requestId: "80723780-2ce0-4b43-ad3a-53060a6e81ab"
cmd: HEARTBEAT_REQUEST
content: "heartbeat"
/<code>

上面我們只討論了客戶端發送心跳消息給服務端,那麼服務端還需要發心跳消息給客戶端嗎?

一般情況是,對於長連接而言,一種方案是兩邊都發送心跳消息,另一種是服務端作為被動接收一方,如果一段時間內服務端沒有收到心跳包那麼就直接斷開連接。

我們這裡採用第二種方案,只需要客戶端發送心跳消息,然後服務端被動接收,然後設置一段時間,在這段時間內如果服務端沒有收到任何消息,那麼就主動斷開連接,這也就是後面要說的 空閒檢測

Netty 客戶端斷線重連

一般有以下兩種情況,Netty 客戶端需要重連服務端:

  • Netty 客戶端啟動時,服務端掛掉,連不上服務端

  • 在程序運行過程中,服務端突然掛掉

第一種情況實現 ChannelFutureListener用來監測連接是否成功,不成功就進行斷連重試機制,代碼如下:

<code>@Component
@Slf4j
public class NettyClient {
private EventLoopGroup group = new NioEventLoopGroup;
@Value("${netty.port}")
private int port;
@Value("${netty.host}")
private String host;
private SocketChannel socketChannel;

public void sendMsg(MessageBase.Message message) {

socketChannel.writeAndFlush(message);
}

@PostConstruct
public void start {
Bootstrap bootstrap = new Bootstrap;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.handler(new ClientHandlerInitilizer);
ChannelFuture future = bootstrap.connect;
//客戶端斷線重連邏輯
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess) {
log.info("連接Netty服務端成功");
} else {
log.info("連接失敗,進行斷線重連");
future1.channel.eventLoop.schedule( -> start, 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel;
}
}
/<code>

ChannelFuture添加一個監聽器,如果客戶端連接服務端失敗,調用 channel.eventLoop.schedule方法執行重試邏輯。

第二種情況是運行過程中 服務端突然掛掉了,這種情況我們在處理數據讀寫的Handler中實現,代碼如下:

<code>@Slf4j
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Autowired
private NettyClient nettyClient;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state == IdleState.WRITER_IDLE) {
log.info("已經10s沒有發送消息給服務端");

//向服務端送心跳包
MessageBase.Message heartbeat = new MessageBase.Message.toBuilder.setCmd(MessageBase.Message.CommandType.HEARTBEAT_REQUEST)
.setRequestId(UUID.randomUUID.toString)
.setContent("heartbeat").build;
//發送心跳消息,並在發送失敗時關閉該連接
ctx.writeAndFlush(heartbeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//如果運行過程中服務端掛了,執行重連機制
EventLoop eventLoop = ctx.channel.eventLoop;
eventLoop.schedule( -> nettyClient.start, 10L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
}
/<code>

我們這裡直接在實現心跳機制的 Handler中重寫channelInactive方法,然後在該方法中執行重試邏輯,這裡注入了 NettyClient類,目的是方便調用 NettyClient的start方法重新連接服務端

channelInactive方法是指如果當前Channel沒有連接到遠程節點,那麼該方法將會被調用。

服務端空閒檢測

空閒檢測是什麼?實際上空閒檢測是每隔一段時間,檢測這段時間內是否有數據讀寫。比如,服務端檢測一段時間內,是否收到客戶端發送來的數據,如果沒有,就及時釋放資源,關閉連接。

對於空閒檢測,Netty 特地提供了 IdleStateHandler 來實現這個功能。下面的代碼參考自掘金小冊《Netty 入門與實戰:仿寫微信 IM 即時通訊系統》中空閒檢測部分的實現:

<code>@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/**
* 設置空閒檢測時間為 30s
*/
private static final int READER_IDLE_TIME = 30;
public ServerIdleStateHandler {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{} 秒內沒有讀取到數據,關閉連接", READER_IDLE_TIME);
ctx.channel.close;
/<code>

Controller方法測試

因為這是 SpringBoot 整合 Netty 的一個Demo,我們創建一個Controller方法對Netty 服務端與客戶端之間的通信進行測試,controller代碼如下,非常簡單:

<code>@RestController
public class ConsumerController {
@Autowired
private NettyClient nettyClient;

@GetMapping("/send")
public String send {
MessageBase.Message message = new MessageBase.Message
.toBuilder.setCmd(MessageBase.Message.CommandType.NORMAL)
.setContent("hello server")
.setRequestId(UUID.randomUUID.toString).build;
nettyClient.sendMsg(message);

return "send ok";
}
}
/<code>

注入 NettyClient,調用其 sendMsg方法發送消息,結果如下:

<code>c.p.server.server.NettyServerHandler : 收到客戶端的業務消息:requestId: "aba74c28-1b6e-42b3-9f27-889e7044dcbf"
content: "hello server"
/<code>

小結

上面詳細闡述了 如何用 SpringBoot 整合 Netty ,其中借鑑很多前輩大佬的例子與文章,算是初步瞭解瞭如何使用 Netty。上文中如有錯誤之處,歡迎指出。

github地址:

https://github.com/pjmike/springboot-netty

參考

https://juejin.im/book/5b4bc28bf265da0f60130116

https://colobu.com/2015/08/14/netty-tcp-client-with-reconnect-handling/

https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/

https://segmentfault.com/a/1190000006931568

https://colobu.com/2017/03/16/Pr

Springboot 2.0 +protobuf + Netty 實戰(附源碼)

之前,給大家發過三份Java面試寶典,這次新增了一份,目前總共是四份面試寶典,相信在跳槽前一個月按照面試寶典準備準備,基本沒大問題。

  • 《java面試寶典5.0》(初中級)

  • 《350道Java面試題:整理自100+公司》(中高級)

  • 《資深java面試寶典-視頻版》(資深)

  • 《Java[BAT]面試必備》(資深)

分別適用於初中級,中高級

資深級工程師的面試複習。

內容包含java基礎、javaweb、mysql性能優化、JVM、鎖、百萬併發、消息隊列,高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper、數據結構、限流熔斷降級等等。

Springboot 2.0 +protobuf + Netty 實戰(附源碼)

看到這裡,證明有所收穫


分享到:


相關文章: