Flink 1.10 Native Kubernetes 原理與實踐

出處:https://zhoukaibo.com/2020/02/18/Flink-1-10-Native-Kubernetes

創作不易,在滿足創作共用版權協議的基礎上可以轉載,但請以超鏈接形式註明出處。

為了方便閱讀,微信公眾號已按分類排版,後續的文章將在移動端首發,想學習雲原生相關知識,請關注我

千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社區終於在 1.10 版本提供了對 Kubernetes 的原生支持,也就是 Native Kubernetes Integration[1]。不過還只是 Beta 版本,預計會在 1.11 版本里面提供完整的支持。

我們知道,在 Flink 1.9 以及之前的版本里面,如果要在 Kubernetes 上運行 Flink 任務是需要事先指定好需要的 TaskManager(TM) 的個數以及 CPU 和內存的。這樣的問題是:大多數情況下,你在任務啟動前根本無法精確的預估這個任務需要多少個 TM。如果指定的 TM 多了,會導致資源浪費;如果指定的 TM 個數少了,會導致任務調度不起來。本質原因是在 Kubernetes 上運行的 Flink 任務並沒有直接向 Kubernetes 集群去申請資源。

Flink 在 1.10 版本完成了 Active Kubernetes Integration 的第一階段,支持了 session clusters。後續的第二階段會提供更完整的支持,如支持 per-job 任務提交,以及基於原生 Kubernetes API 的高可用,支持更多的 Kubernetes 參數如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味著 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申請新的 Pod,類似於 Flink 中對 Yarn 和 Mesos 的集成所做的那樣。在多租戶環境中,用戶可以利用 Kubernetes 裡面的 namespace 做資源隔離啟動不同的 Flink 集群。當然,Kubernetes 中的用戶帳號和賦權是需要提前準備好的。

原 理

Flink 1.10 Native Kubernetes 原理與實踐

Flink 1.10 native k8s

工作原理如下(段首的序號對應圖中箭頭所示的數字):

  1. Flink 客戶端首先連接 Kubernetes API Server,提交 Flink 集群的資源描述文件,包括 configmap,job manager service,job manager deployment 和 Owner Reference[2]。
  2. Kubernetes Master 就會根據這些資源描述文件去創建對應的 Kubernetes 實體。以我們最關心的 job manager deployment 為例,Kubernetes 集群中的某個節點收到請求後,Kubelet 進程會從中央倉庫下載 Flink 鏡像,準備和掛載 volume,然後執行啟動命令。在 flink master 的 pod 啟動後,Dispacher 和 KubernetesResourceManager 也都啟動了。前面兩步完成後,整個 Flink session cluster 就啟動好了,可以接受提交任務請求了。
  3. 用戶可以通過 flink 命令行即 flink client 往這個 session cluster 提交任務。此時 job graph 會在 flink client 端生成,然後和用戶 jar 包一起通過 RestClinet 上傳。
  4. 一旦 job 提交成功,JobSubmitHandler 收到請求就會提交 job 給 Dispatcher。接著就會生成一個 job master。
  5. JobMaster 向 KubernetesResourceManager 請求 slots。
  6. KubernetesResourceManager 從 Kubernetes 集群分配 TaskManager。每個 TaskManager 都是具有唯一標識的 Pod。KubernetesResourceManager 會為 TaskManager 生成一份新的配置文件,裡面有 Flink Master 的 service name 作為地址。這樣在 Flink Master failover之後,TaskManager 仍然可以重新連上。
  7. Kubernetes 集群分配一個新的 Pod 後,在上面啟動 TaskManager。
  8. TaskManager 啟動後註冊到 SlotManager。
  9. SlotManager 向 TaskManager 請求 slots。
  10. TaskManager 提供 slots 給 JobMaster。然後任務就會被分配到這個 slots 上運行。

實 踐

Flink 的文檔[3]上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。如果對 Kubernetes 不熟,可能會花點時間。

(1) 首先得有個 Kubernetes 集群,會有個 ~/.kube/config 文件。嘗試執行 kubectl get nodes 看下集群是否正常。

如果沒有這個 ~/.kube/config 文件,會報錯:

<code>2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)  at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)  at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)  at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)  at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)  at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)  at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)  at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)  at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)  at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known/<code>

(2) 提前創建好用戶和賦權(RBAC[4])

<code>kubectl create serviceaccount flinkkubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink/<code>

如果沒有創建用戶,使用默認的用戶去提交,會報錯:

<code>Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes.Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods is forbidden: User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default"./<code>

(3) 這一步是可選的。默認情況下, JobManager 和 TaskManager 只會將 log 寫到各自 pod 的 /opt/flink/log 。如果想通過 kubectl logs 看到日誌,需要將 log 輸出到控制檯。要做如下修改 FLINK_HOME/conf 目錄下的 log4j.properties 文件。

<code>log4j.rootLogger=INFO, file, console# Log all infos to the consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n/<code> 

然後啟動 session cluster 的命令行需要帶上參數:

<code>-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"/<code>

(4) 終於可以開始啟動 session cluster了。如下命令是啟動一個每個 TaskManager 是4G內存,2個CPU,4個slot 的 session cluster。

<code>bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4/<code>

更多的參數詳見文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用 kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f 就能看到日誌了。

如果出現大量的如下這種日誌(目前遇到是雲廠商的LoadBalance liveness探測導致):

<code>2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exceptionjava.io.IOException: Connection reset by peer  at sun.nio.ch.FileDispatcherImpl.read0(Native Method)  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)  at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)  at sun.nio.ch.IOUtil.read(IOUtil.java:192)  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)  at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)/<code>

可以暫時在 log4j.properties 裡面配置上:

<code>log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file/<code>

這個日誌太多會導致 WebUI 上打開 jobmanger log 是空白,因為文件太大了前端無法顯示。

如果前面第(1)和第(2)步沒有做,會出現各種異常,通過 kubectl logs 就能很方便的看到日誌了。

Session cluster 啟動後可以通過 kubectl get pods,svc 來看是否正常。

通過端口轉發來查看 Web UI:

<code>kubectl port-forward service/kaibo-test 8081/<code>

打開 http://127.0.0.1:8001 就能看到 Flink 的 WebUI 了。

(5) 提交任務

<code>./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=k test.jar/<code>

我們從 Flink WebUI 頁面上可以看到,剛開始啟動時,UI上顯示 Total/Available Task Slots 為0, Task Managers 也是0。隨著任務的提交,資源會動態增加。任務停止後,資源會釋放掉。

在提交任務後,通過 kubectl get pods 能夠看到 Flink 為 TaskManager 分配了新的 Pod。

Flink 1.10 Native Kubernetes 原理與實踐

pods

(6) 停止 session cluster

<code>echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true/<code>

也可以手工刪除資源:

<code>kubectl delete service/<clusterid>/<code>

總 結

可以看到,Flink 1.10 版本對和 Kubernetes 的集成做了很好的嘗試。期待社區後續的 1.11 版本能對 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基於原生 Kubernetes API 的高可用。最新進展請關注 FLINK-14460[5]。

參考鏈接:

[1] https://flink.apache.org/news/2020/02/11/release-1.10.0.html#native-kubernetes-integration-beta

[2] https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html

[4] https://kubernetes.io/docs/reference/access-authn-authz/rbac/

[5] https://issues.apache.org/jira/browse/FLINK-14460

周凱波(寶牛),阿里巴巴技術專家,四川大學碩士,2010 年畢業後加入阿里搜索事業部,從事搜索離線平臺的研發工作,參與將搜索後臺數據處理架構從 MapReduce 到 Flink 的重構。目前在阿里計算平臺事業部,專注於基於 Flink 的一站式計算平臺的建設。


分享到:


相關文章: