機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

全文共8336字,預計學習時長

24分鐘

機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


Apache Kafka和機器學習的關係很微妙。

本文旨在討論建立機器學習框架的一個特定部分:在Kafka應用程序中部署一個分析模型來進行實時預測。


模式訓練和模型部署可以是兩個獨立的過程。但是相同的步驟也可應用於數據集成和數據預處理,因為模型訓練和模型推導需要呈現同樣的數據集成、過濾、充實和聚合。


本文將討論和比較兩種模型部署的不同選擇:有RPC的服務端模型(RPCs)和本地嵌入Kafka客戶端應用的模型。本文的例子特地使用了TensorFlow,但是相關原則對其他機器學習/深度學習框架或者產品同樣適用。這些框架和產品包括H2O.ai,Deeplearning4j,谷歌雲端機器學習引擎和統計分析系統(SAS)。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

TensorFlow — 機器學習/深度學習的開源軟件庫

Tensorflow是一個為高效計算打造的開源軟件庫.它靈活的架構讓多個平臺(cpu、gpu、TPUs等)間的計算部署變得更加容易,應用範圍從桌面到服務器集群再到移動和邊緣設備。該軟件由谷歌人工智能組織的研發團隊研究員和工程師開發,作為機器學習和深度學習的強力支持,Tensorflow應用於多個領域,是一個完整的生態系統而不是一個孤立的元件。


鑑於本文聚焦於模型服務,主要對保存和加載模型感興趣。保存和加載模型就是存儲訓練模型,並將Tensorflow作為模型服務器。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

存儲模型本質上是一個二進制文件,使用協議緩衝區(Protobuf)序列化。接著模型在C,Python,Java等軟件中分類數據、加載數據、存儲和處理數據。文件格式是可讀的文本格式(.pbtxt)或壓縮的二進制協議緩衝區(.pb)。圖表對象是在TensorFlow中進行計算的基礎。權重保存在單獨的TensorFlow檢查點文件中。


由於本文關注的是TensorFlow的模型部署,因此如何預訓練模型並不重要。可以利用雲端服務和像雲端機器學習引擎和其谷歌雲端平臺(GCP)的集成管線,或者建立自己的模型訓練途徑。Kafka不但在模型部署方面很重要,在數據集成、數據預處理和數據監控方面也扮演重要角色。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

使用模型服務器和RPC進行流處理


模型服務器可以自我管理,也可以由分析或者雲端供應商託管。模型服務器並不僅為模型推導部署和存儲模型,而且還提供諸如版本控制或A/B測試之類的附加功能。從應用程序到模型服務器的交流經常通過請求-響應協議(HTTP)或者谷歌RPC(gRPC)等RPC框架來完成。每次項目運行,都會發生這種介於Kafka應用和模型服務器之間的請求-響應式交流,


有很多模型服務器可供選擇。可以從像Seldon Server,PredictionIO,Hydrosphere.io等開源模型服務器中選擇或者從H2O.ai,DataRobot,國際商業機器公司(IBM),統計分析系統(SAS)等分析供應商中利用模型服務器。


本文使用TensorFlow提供的服務,即來自TensorFlow的模型服務器。該模型服務器可以實現自我管理,也可以使用雲端機器學習引擎服務。TensorFlow服務擁有以下特徵。


·包含谷歌RPC(gRPC)和請求-響應協議終端(HTTP)

·呈現模型版本,無需改變客戶端代碼

·將單個模型推導請求分組,以便聯合執行請求

·優化模型推導時間以便最小化延遲

·支持許多可服務項(可服務項是一個模型或者是一個和模型一起提供數據的任務)


o TensorFlow 模型

o 嵌入函數

o 詞彙查找表格

o 特徵轉換

o 不基於TensorFlow的模型


·有能力開展金絲雀發佈及A/B測試


下圖是Kafka應用和模型服務器的交流過程

機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

執行Kafka應用的過程是直接的。下面是Kafka應用數據流的代碼片段以及TensorFlow服務端的RPC。


1.輸入Kafka以及TensorFlow服務API

  1. import org.apache.kafka.common.serialization.Serdes;
  2. import org.apache.kafka.streams.KafkaStreams;
  3. import org.apache.kafka.streams.StreamsBuilder;
  4. import org.apache.kafka.streams.StreamsConfig;
  5. import org.apache.kafka.streams.kstream.KStream;

  6. import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;

2.配置Kafka數據流應用


  1. // Configure Kafka Streams Application
  2. finalString bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
  3. final Properties streamsConfiguration = new Properties();
  4. // Give the Streams application a unique name. The name must be unique
  5. // in the Kafka cluster against which the application is run.
  6. streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
  7. // Where to find Kafka broker(s).
  8. streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

3.向TensorFlow服務端呈現RPC(如果RPC失敗,則報備異常情況)


  1. KStream<string> transformedMessage = imageInputLines.mapValues(value -> {/<string>


  2. System.out.println("Image path: " + value);
  3. imagePath = value;
  4. TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);
  5. System.out.println("Image = " + imagePath);
  6. InputStream jpegStream;
  7. try {
  8. jpegStream = new FileInputStream(imagePath);
  9. // Prediction of the TensorFlow Image Recognition model:
  10. List<map.entry>> list = recogniser.recognise(jpegStream);/<map.entry>
  11. String prediction = list.toString();
  12. System.out.println("Prediction: " + prediction);
  13. recogniser.close();
  14. jpegStream.close();
  15. return prediction;
  16. } catch (Exception e) {
  17. e.printStackTrace();

  18. return Collections.emptyList().toString();
  19. }
  20. });

4.啟動Kafka應用

  1. // Start Kafka Streams Application to process new incoming images from the Input Topic
  2. final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
  3. streams.start();
機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

嵌入式模型的流處理


可以不用模型服務器和RPC交流,直接將模型嵌入Kafka應用。嵌入模型可以通過Kafka本地處理的數據流應用,以Kafka數據流為槓桿。該模型還可以通過KSQL(一種SQL方言)或者Java、Scala、Python、Go.等Kafka客戶端應用程序接口。


在這種情況下,Kafka應用無法依賴外部模型服務器。該模型在Kafka應用內加載,例如在Kafka數據流應用內使用TensorFlow的JavaAPI。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


同樣,執行Kafka應用很簡單。這裡是在Kafka數據流應用裡嵌入TensorFlow模型的代碼片段,作為實時預測:


1.輸入Kafka和TensorFlowAPI

  1. import org.apache.kafka.streams.KafkaStreams;
  2. import org.apache.kafka.streams.KeyValue;
  3. import org.apache.kafka.streams.StreamsBuilder;
  4. import org.apache.kafka.streams.StreamsConfig;
  5. import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
  6. import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
  7. import org.apache.kafka.streams.kstream.KStream;
  8. import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
  9. import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;

2.從數據存儲(例如亞馬遜S3鏈接)或者數據記憶(例如接受一個Kafkatopic級別參數)中加載TensorFlow模型。

  1. // Step 1: Load Keras TensorFlow Model using DeepLearning4J API
  2. String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
  3. System.out.println(simpleMlp.toString());
  4. MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);

3.配置Kafka數據流應用

  1. // Configure Kafka Streams Application
  2. Properties streamsConfiguration = new Properties();
  3. streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
  4. streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
  5. // Specify default (de)serializers for record keys and for record values
  6. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  7. streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

4.在數據流中應用TensorFlow模型

  1. final KStream<string> inputEvents = builder.stream(inputTopic);/<string>
  2. inputEvents.foreach((key, value) -> {
  3. // Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
  4. String[] valuesAsArray = value.split(",");
  5. INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
  6. // Model inference in real time:
  7. output = model.output(input);
  8. prediction = output.toString();
  9. });

5.啟動Kafka應用

  1. final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
  2. streams.cleanUp();
  3. streams.start();

其他有關TensorFlow,H2O和深度學習4j在Kafka數據流中的應用實例詳見GitHub。


甚至可以通過使用知名的測試庫編寫單元測試框架(unit test),如JUnit和Kafka數據流測試庫使用的單元測試框架。


以下是KSQL自定義函數使用的模型部署示例:


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


需要做的是執行KSQL 自定義函數的Java界面,並且將自定義函數部署到KSQL服務端。過去的博客文章有詳細介紹如何建立自己的KSQL自定義函數。通過這種方法,終端用戶編寫SQL語言查詢用來實時應用分析模型。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

在應用中直接嵌入何種模型?


並不是所有模型都是嵌入應用的理想模型。考慮模型是否應該嵌入時需要包含以下幾點:


·模型性能:越快越好

·模型二進制格式:最好是Java字節碼

·模型大小:字節少,存儲少為佳

·模型服務器特徵:加載即用vs.自己安裝vs.不必要


寫在Python中的代碼之所以運行緩慢是因為動態語言在運行時需要翻譯很多變量和請求。


H2O Java分類(例如決策樹)可以運行很快。運行時間以微秒計算。

一個只有幾兆字節,內存少的TensorFlow Protobuf協議神經網絡可以加載得很快。


一個龐大的TensorFlow Protobuf協議神經網絡(約100兆字節)需要很大內存,運行相對緩慢。


基於標準的模型(例如 XML/JSON格式數據就是基於預測模型標記語言或者開放神經網絡交換)包括除了模型處理(例如數據預處理)的其他步驟。該模型呈現了使用這些標準的組織挑戰和技術侷限,性能遠次於像TensorFlow的Saved Model模型等本地加載模型


最終,不管模型是否直接嵌入應用,這取決於模型本身、硬件設施和項目要求。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

在 Kafka應用中重建模型服務器的特徵並不難


在應用中嵌入模型並不意味著能即刻使用模型特徵。用戶必須親自執行模型。先問自己第一個問題:我需要模型服務器的特徵嗎?我需要動態更新模型嗎?模型版本?A/B測試?金絲雀測試?


好消息是執行模型特徵並不困難。這取決於用戶的要求和工具配置,你可以:


·啟動新版本應用(例如Kubernetes pod容器)

·通過Kafka主題發送並使用模型或權重

·利用服務網絡(比如像Envoy,Linkerd, 或者 Istio服務器)


下面來評估權衡各種在Kafka應用裡利用分析模型的方法


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

權衡---模型服務器vs嵌入模型


可以在模型服務器部署一個分析模型並且使用RPC進行交流。或者可以直接在應用裡嵌入模型。這裡沒有最佳選項因為這取決於用戶的設施、要求和能力。


為什麼和事件流應用一起使用模型服務器和RPC?


·如果你對事件流一無所知,該模型便於理解

·可以讓之後遷移到實時流變得可能

·將不同模型,版本和A/B測試放到內置模型管理

·內置監控


為什麼在事件流應用中嵌入模型?


·藉助本地推理實現更好的延遲,而無需進行遠程呼叫

·脫機推斷(設備,邊緣處理等)

·Kafka Streams應用程序的可用性,可伸縮性和延遲/吞吐量與RPC接口的SLA之間沒有耦合

·無副作用(比如失敗的風險)- Kafka應用處理包含了各種因素(例如具體一次)


兩個選項各有利弊,根據不同場合推薦使用。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

Kubernetes(K8s)雲原生模型部署


在雲原生框架中,兩種方法都可以獲得好處。即使其他的雲原生技術有相似的特徵,下面仍用Kubernetes作為雲原生環境。


將模型嵌入到Kafka應用中,可以獲得獨立pod數據結構的所有優勢。獨立的pod數據結構是流式數據處理和模型推導的容器,不依賴外部的模型服務器。


在以下示例中,可以獨立測量嵌入模型的Kafka數據流應用,啟動新版本,加入A/B測試或者其他路徑,用像Envoy或者Linkerd的雲原生代理服務器處理異常。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


如果仍然想獲取模型服務器的優點及特性,可以使用邊車設計模式。Kubernetes支持將具有特定任務的其他容器添加到Pod中。在以下示例中,將Kafka Streams應用程序部署在一個容器中,而模型服務器作為邊車部署在同一pod內的另一個容器中。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


這樣就可以利用模型服務器的功能以及單個容器的堅固性和可擴展性。它仍然具有在每個容器之間使用RPC的缺點。通過將兩個容器部署在同一容器中,可以最大程度地減少等待時間和潛在錯誤。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

邊緣模型部署


模型也不是經常部署在雲端或者是數據中心。在某些情況下,模型可以部署在邊緣,邊緣部署意味著:


·邊緣數據中心或者邊緣設備/機器

·邊緣有一個Kafka應用集群,一箇中介和一個Kafka應用客戶端。

·一個強大的客戶端(比如KSQL或者Java)或者一個輕量級的客戶端(比如C或者JavaScript)

·一個嵌入模型或者RPC模型推導

·本地或者遠程訓練

·對法律和法規的影響


對於某些電信提供商來說,邊緣計算的定義是超低延遲,端與端之間的通信時間不到100毫秒。這是通過諸如開源雲基礎架構軟件堆棧StarlingX之類的框架實現的,該框架需要完整的OpenStack和Kubernetes集群以及對象存儲。對於其他對象來說,“邊緣”意味著移動設備、輕量級板或傳感器,可以在其中部署非常小的輕量級C應用程序和模型的移動設備。


從Kafka的角度來看,有很多選擇。可以使用librdkafka(本機Kafka C / C ++客戶端庫)完全構建輕量級的邊緣應用程序,該庫由Confluent完全支持。還可以使用JavaScript並利用REST代理或WebSocket集成進行 Kafka通信,將模型嵌入移動應用程序中。


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

Kafka的獨立技術模型部署


模型部署在過程和技術上可以與模型訓練完全分開。部署基礎架構可以處理不同的模型,甚至可以使用不同的機器學習框架訓練模型。Kafka還為構建機器學習監控提供了良好的基礎,包括基礎設施的技術監控和特定於模型的監控,例如性能或模型準確性。


無論是否要使用Kafka實施所有功能,包括數據集成,預處理,模型部署和監視,或者是否僅使用Kafka客戶將模型嵌入到實時的Kafka客戶端,Kafka都是適用於機器學習基礎設施的補充工具(與數據預處理和模型訓練完全分開)Kafka。


對於模型部署,有兩種選擇:模型服務器(RPC)和嵌入式模型。瞭解每種方法的利弊將有助於為項目做出正確決定。實際上,將分析模型嵌入到Kafka應用程序中很簡單,而且非常實用。

機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測


機器學習:如何在Kafka應用程序中部署一個分析模型進行實時預測

我們一起分享AI學習與發展的乾貨


分享到:


相關文章: