gRPC官方文檔-gRPC 基礎(Java版)

本教程提供了 Java 程序員如何使用 gRPC 的指南。

通過學習教程中例子,你可以學會如何:

  • 在一個 .proto 文件內定義服務。
  • 用 protocol buffer 編譯器生成服務器和客戶端代碼。
  • 使用 gRPC 的 Java API 為你的服務實現一個簡單的客戶端和服務器。

假設你已經閱讀了概覽 並且熟悉protocol buffers。 注意,教程中的例子使用的是 protocol buffers 語言的 proto3 版本,它目前只是 alpha 版:可以在 proto3 語言指南和 protocol buffers 的 Github 倉庫的版本註釋發現更多關於新版本的內容。

這算不上是一個在 Java 中使用 gRPC 的綜合指南:以後會有更多的參考文檔。

為什麼使用 gRPC?

我們的例子是一個簡單的路由映射的應用,它允許客戶端獲取路由特性的信息,生成路由的總結,以及交互路由信息,如服務器和其他客戶端的流量更新。

有了 gRPC, 我們可以一次性的在一個 .proto 文件中定義服務並使用任何支持它的語言去實現客戶端和服務器,反過來,它們可以在各種環境中,從Google的服務器到你自己的平板電腦—— gRPC 幫你解決了不同語言及環境間通信的複雜性。使用 protocol buffers 還能獲得其他好處,包括高效的序列號,簡單的 IDL 以及容易進行接口更新。

例子的代碼和設置

教程的代碼在這裡 grpc/grpc-java/examples/src/main/java/io/grpc/examples。 要下載例子,通過運行下面的命令去克隆grpc-java代碼庫:

$ git clone https://github.com/grpc/grpc-java.git

然後改變當前的目錄到 grpc-java/examples:

$ cd grpc-java/examples

你還需要安裝生成服務器和客戶端的接口代碼相關工具——如果你還沒有安裝的話,請查看下面的設置指南 Java快速開始指南。

定義服務

我們的第一步(可以從概覽中得知)是使用 protocol buffers去定義 gRPC service 和方法 request 以及 response 的類型。你可以在grpc-java/examples/src/main/proto/route_guide.proto看到完整的 .proto 文件。

在生成例子中的 Java 代碼的時候,在 .proto 文件中我們指定了一個 java_package 文件的選項:

option java_package = "io.grpc.examples";

這個指定的包是為我們生成 Java 類使用的。如果在 .proto 文件中沒有顯示的 java_package 參數,那麼就會使用缺省的 proto 包(通過 "package" 關鍵字指定)。但是,因為 proto 包一般不是以域名

翻轉的格式命名,所以它不是好的 Java 包。 如果我們用其它語言通過 .proto 文件生成代碼,java_package 是不起任何作用的。

要定義一個服務,你必須在你的 .proto 文件中指定 service:

service RouteGuide {
...
}

然後在我們的服務中定義 rpc 方法,指定它們的請求的和響應類型。gRPC 允許你定義4種類型的 service 方法,這些都在 RouteGuide 服務中使用:

  • 一個 簡單 RPC , 客戶端使用存根發送請求到服務器並等待響應返回,就像平常的函數調用一樣。
 // Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
  • 一個 服務器端流式 RPC , 客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,直到裡面沒有任何消息。從例子中可以看出,通過在
    響應 類型前插入 stream 關鍵字,可以指定一個服務器端的流方法。
 // Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 一個 客戶端流式 RPC , 客戶端寫入一個消息序列並將其發送到服務器,同樣也是使用流。一旦 客戶端完成寫入消息,它等待服務器完成讀取返回它的響應。通過在 請求 類型前指定 stream 關鍵字來指定一個客戶端的流方法。
 // Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 一個 雙向流式 RPC 是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器 可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,或者其他讀寫的組合。 每個流中的消息順序被預留。你可以通過在請求和響應前加 stream 關鍵字去制定方法的類型。
 // Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

我們的 .proto 文件也包含了所有請求的 protocol buffer 消息類型定義以及在服務方法中使用的響應類型——比如,下面的Point消息類型:

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}

生成客戶端和服務器端代碼

接下來我們需要從 .proto 的服務定義中生成 gRPC 客戶端和服務器端的接口。我們通過 protocol buffer 的編譯器 protoc 以及一個特殊的 gRPC Java 插件來完成。為了生成 gRPC 服務,你必須使用proto3編譯器(同時支持 proto2 和 proto3 語法)。

這個例子使用的構建系統也是 Java gRPC 本身構建的一部分——為了簡單起見,我們推薦使用為這個例子提前生成的代碼。你可以參考README學習如何從你的 .proto 文件中生成代碼。

從這裡src/generated/main可以看到為了例子預生成的代碼。

下面的類都是從我們的服務定義中生成:

  • 包含了所有填充,序列化以及獲取請求和應答的消息類型的Feature.java,Point.java, Rectangle.java以及其它類文件。
  • RouteGuideGrpc.java 文件包含(以及其它一些有用的代碼):
  • RouteGuide 服務器要實現的一個接口 RouteGuideGrpc.RouteGuide,其中所有的方法都定 義在RouteGuide服務中。
  • 客戶端可以用來和RouteGuide服務器交互的 存根 類。 異步的存根也實現了 RouteGuide 接口。

創建服務器

首先來看看我們如何創建一個 RouteGuide 服務器。如果你只對創建 gRPC 客戶端感興趣,你可以跳

過這個部分,直接到創建客戶端 (當然你也可能發現它也很有意思)。

讓 RouteGuide 服務工作有兩個部分:

  • 實現我們服務定義的生成的服務接口:做我們的服務的實際的“工作”。
  • 運行一個 gRPC 服務器,監聽來自客戶端的請求並返回服務的響應。

你可以從[grpc-java/examples/src/main/java/io/grpc/examples/RouteGuideServer.java]

(https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java)看到我們的 RouteGuide 服務器的實現代碼。現在讓我們近距離研究它是如何工作的。

實現RouteGuide

如你所見,我們的服務器有一個實現了生成的 RouteGuideGrpc.Service 接口的

RouteGuideService類:

private static class RouteGuideService implements RouteGuideGrpc.RouteGuide {
...
}

簡單 RPC

routeGuideServer 實現了我們所有的服務方法。首先讓我們看看最簡單的類型 GetFeature,它從客戶端拿到一個 Point 對象,然後從返回包含從數據庫拿到的feature信息的 Feature。

 @Override
public void getFeature(Point request, StreamObserver<feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()

&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
/<feature>

getFeature() 接收兩個參數:

  • Point: 請求
  • StreamObserver<feature>: 一個應答的觀察者,實際上是服務器調用它應答的一個特殊接口。/<feature>

要將應答返回給客戶端,並完成調用:

  1. 如在我們的服務定義中指定的那樣,我們組織並填充一個 Feature 應答對象返回給客戶端。在這個 例子中,我們通過一個單獨的私有方法checkFeature()來實現。
  2. 我們使用應答觀察者的 onNext() 方法返回 Feature。
  3. 我們使用應答觀察者的 onCompleted() 方法來指出我們已經完成了和 RPC的交互。

服務器端流式 RPC

現在讓我們來看看我們的一種流式 RPC。 ListFeatures 是一個服務器端的流式 RPC,所以我們需要將多個 Feature 發回給客戶端。

private final Collection<feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
/<feature>/<feature>

和簡單 RPC 類似,這個方法拿到了一個請求對象(客戶端期望從 Rectangle 找到 Feature)和一個應答觀察者 StreamObserver。

這次我們得到了需要返回給客戶端的足夠多的 Feature 對象(在這個場景下,我們根據他們是否在我們的 Rectangle 請求中,從服務的特性集合中選擇他們),並且使用 onNext() 方法輪流往響應觀察者寫入。最後,和簡單 RPC 的例子一樣,我們使用響應觀察者的 onCompleted() 方法去告訴 gRPC 寫入應答已完成。

客戶端流式 RPC

現在讓我們看看稍微複雜點的東西:客戶端流方法 RecordRoute,我們通過它可以從客戶端拿到一個 Point 的流,並且返回一個包括它們路徑的信息 RouteSummary。

 @Override
public StreamObserver<point> recordRoute(final StreamObserver<routesummary> responseObserver) {
return new StreamObserver<point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
/<point>/<routesummary>/<point>

如你所見,這次這個方法沒有請求參數。相反的,它拿到了一個 RouteGuide_RecordRouteServer 流,服務器可以用它來同時讀 寫消息——它可以用自己的 Recv() 方法接收客戶端消息並且用 SendAndClose() 方法返回它的單個響應。

如你所見,我們的方法和前面的方法類型相似,拿到一個 StreamObserver 應答觀察者參數,但是這次它返回一個 StreamObserver 以便客戶端寫入它的 Point。

在這個方法體中,我們返回了一個匿名 StreamObserver 實例,其中我們:

  • 覆寫了 onNext() 方法,每次客戶端寫入一個 Point 到消息流時,拿到特性和其它信息。
  • 覆寫了 onCompleted() 方法(在 客戶端 結束寫入消息時調用),用來填充和構建我們的 RouteSummary。然後我們用 RouteSummary 調用方法自己的的響應觀察者的 onNext(),之後調用它的 onCompleted() 方法,結束服務器端的調用。

雙向流式 RPC

最後,讓我們看看雙向流式 RPC RouteChat()。

 @Override
public StreamObserver<routenote> routeChat(final StreamObserver<routenote> responseObserver) {
return new StreamObserver<routenote>() {
@Override
public void onNext(RouteNote note) {
List<routenote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {

responseObserver.onCompleted();
}
};
}
/<routenote>/<routenote>/<routenote>/<routenote>

和我們的客戶端流的例子一樣,我們拿到和返回一個 StreamObserver 應答觀察者,除了這次我們在客戶端仍然寫入消息到 它們的 消息流時通過我們方法的應答觀察者返回值。這裡讀寫的語法和客戶端流以及服務器流方法一樣。雖然每一端都會按照它們寫入的順序拿到另一端的消息,客戶端和服務器都可以任意順序讀寫——流的操作是互不依賴的。

啟動服務器

一旦我們實現了所有的方法,我們還需要啟動一個gRPC服務器,這樣客戶端才可以使用服務。下面這段代碼展示了在我們RouteGuide服務中實現的過程:

 public void start() {
gRpcServer = NettyServerBuilder.forPort(port)
.addService(RouteGuideGrpc.bindService(new RouteGuideService(features)))
.build().start();
logger.info("Server started, listening on " + port);
...
}

如你所見,我們用一個 NettyServerBuilder 構建和啟動服務器。這個服務器的生成器基於 Netty 傳輸框架。

為了做到這個,我們需要:

  1. 創建我們服務實現類 RouteGuideService 的一個實例並且將其傳給生成的 RouteGuideGrpc 類的靜態方法 bindService() 去獲得服務定義。
  2. 使用生成器的 forPort() 方法指定地址以及期望客戶端請求監聽的端口。
  3. 通過傳入將 bindService() 返回的服務定義,用生成器註冊我們的服務實現到生成器的 addService() 方法。
  4. 調用生成器上的 build() 和 start() 方法為我們的服務創建和啟動一個 RPC 服務器。

創建客戶端

在這部分,我們將嘗試為 RouteGuide 服務創建一個 Java 的客戶端。你可以從grpc-java/examples/src/main/java/io/grpc/examples/RouteGuideClient.java看到我們完整的客戶端例子代碼。

創建存根

為了調用服務方法,我們需要首先創建一個 存根,或者兩個存根:

  • 一個
    阻塞/同步 存根:這意味著 RPC 調用等待服務器響應,並且要麼返回應答,要麼造成異常。
  • 一個 非阻塞/異步 存根可以向服務器發起非阻塞調用,應答會異步返回。你可以使用異步存根去發起特定類型的流式調用。

我們首先為存根創建一個 gRPC channel,指明服務器地址和我們想連接的端口號:

 channel = NettyChannelBuilder.forAddress(host, port)
.negotiationType(NegotiationType.PLAINTEXT)
.build();

如你所見,我們用一個 NettyServerBuilder 構建和啟動服務器。這個服務器的生成器基於 Netty 傳輸框架。

我們使用 Netty 傳輸框架,所以我們用一個 NettyServerBuilder 啟動服務器。

現在我們可以通過從 .proto 中生成的 RouteGuideGrpc 類的 newStub 和 newBlockingStub 方法,使用頻道去創建我們的存根。

 blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);

調用服務方法

現在讓我們看看如何調用服務方法。

簡單 RPC

在阻塞存根上調用簡單 RPC GetFeature 幾乎是和調用一個本地方法一樣直觀。

 Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

我們創建和填充了一個請求 protocol buffer 對象(在這個場景下是 Point),在我們的阻塞存根上將其傳給 getFeature() 方法,拿回一個 Feature。

服務器端流式 RPC

接下來,讓我們看一個對於 ListFeatures 的服務器端流式調用,這個調用會返回一個地理性的 Feature 流:

 Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<feature> features = blockingStub.listFeatures(request);
/<feature>

如你所見,這和我們剛看過的簡單 RPC 很相似,除了方法返回客戶端用來讀取所有返回的 Feature 的 一個 Iterator,而不是單個的 Feature。

客戶端流式 RPC

現在看看稍微複雜點的東西:我們在客戶端流方法 RecordRoute 中發送了一個 Point 流給服務器並且拿到一個 RouteSummary。為了這個方法,我們需要使用異步存根。如果你已經閱讀了

創建服務器,一些部分看起來很相近——異步流式 RPC 是在兩端通過相似的方式實現的。

 public void recordRoute(List<feature> features, int numPoints) throws Exception {
info("*** RecordRoute");
final SettableFuture<void> finishFuture = SettableFuture.create();
StreamObserver<routesummary> responseObserver = new StreamObserver<routesummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}
@Override
public void onCompleted() {
finishFuture.set(null);
}
};
StreamObserver<point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
StringBuilder numMsg = new StringBuilder();
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishFuture.isDone()) {

break;
}
}
info(numMsg.toString());
requestObserver.onCompleted();
finishFuture.get();
info("Finished RecordRoute");
} catch (Exception e) {
requestObserver.onError(e);
logger.log(Level.WARNING, "RecordRoute Failed", e);
throw e;
}
}
/<point>/<routesummary>/<routesummary>/<void>/<feature>

如你所見,為了調用這個方法我們需要創建一個 StreamObserver,它為了服務器用它的 RouteSummary 應答實現了一個特殊的接口。在 StreamObserver 中,我們:

  • 覆寫了 onNext() 方法,在服務器把 RouteSummary 寫入到消息流時,打印出返回的信息。
  • 覆寫了 onCompleted() 方法(在 服務器 完成自己的調用時調用)去設置 SettableFuture,這樣我們可以檢查服務器是不是完成寫入。

之後,我們將 StreamObserver 傳給異步存根的 recordRoute() 方法,拿到我們自己的 StreamObserver 請求觀察者將 Point 發給服務器。一旦完成點的寫入,我們使用請求觀察者的 onCompleted() 方法告訴 gRPC 我們已經完成了客戶端的寫入。一旦完成,我們就檢查 SettableFuture 驗證服務器是否已經完成寫入。

雙向流式 RPC

最後,讓我們看看雙向流式 RPC RouteChat()。

 public void routeChat() throws Exception {
info("*** RoutChat");
final SettableFuture<void> finishFuture = SettableFuture.create();
StreamObserver<routenote> requestObserver =
asyncStub.routeChat(new StreamObserver<routenote>() {
@Override
public void onNext(RouteNote note) {
info("Got message "{0}" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}
@Override
public void onCompleted() {
finishFuture.set(null);
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message "{0}" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
requestObserver.onCompleted();
finishFuture.get();
info("Finished RouteChat");
} catch (Exception t) {
requestObserver.onError(t);
logger.log(Level.WARNING, "RouteChat Failed", t);
throw t;
}
}
/<routenote>/<routenote>/<void>

和我們的客戶端流的例子一樣,我們拿到和返回一個 StreamObserver 應答觀察者,除了這次我們在客戶端仍然寫入消息到 它們的

消息流時通過我們方法的應答觀察者返回值。這裡讀寫的語法和客戶端流以及服務器流方法一樣。雖然每一端都會按照它們寫入的順序拿到另一端的消息,客戶端和服務器都可以任意順序讀寫——流的操作是互不依賴的。

來試試吧!

根據example目錄下的README的指導去構建和運行客戶端及服務器。


分享到:


相關文章: