Flink 1.10 Native Kubernetes 原理與實踐

出處:https://zhoukaibo.com/2020/02/18/Flink-1-10-Native-Kubernetes

創作不易,在滿足創作共用版權協議的基礎上可以轉載,但請以超鏈接形式註明出處。

為了方便閱讀,微信公眾號已按分類排版,後續的文章將在移動端首發,想學習雲原生相關知識,請關注我

千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社區終於在 1.10 版本提供了對 Kubernetes 的原生支持,也就是 Native Kubernetes Integration[1]。不過還只是 Beta 版本,預計會在 1.11 版本里面提供完整的支持。

我們知道,在 Flink 1.9 以及之前的版本里面,如果要在 Kubernetes 上運行 Flink 任務是需要事先指定好需要的 TaskManager(TM) 的個數以及 CPU 和內存的。這樣的問題是:大多數情況下,你在任務啟動前根本無法精確的預估這個任務需要多少個 TM。如果指定的 TM 多了,會導致資源浪費;如果指定的 TM 個數少了,會導致任務調度不起來。本質原因是在 Kubernetes 上運行的 Flink 任務並沒有直接向 Kubernetes 集群去申請資源。

Flink 在 1.10 版本完成了 Active Kubernetes Integration 的第一階段,支持了 session clusters。後續的第二階段會提供更完整的支持,如支持 per-job 任務提交,以及基於原生 Kubernetes API 的高可用,支持更多的 Kubernetes 參數如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味著 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通信,按需申請新的 Pod,類似於 Flink 中對 Yarn 和 Mesos 的集成所做的那樣。在多租戶環境中,用戶可以利用 Kubernetes 裡面的 namespace 做資源隔離啟動不同的 Flink 集群。當然,Kubernetes 中的用戶帳號和賦權是需要提前準備好的。

原 理

Flink 1.10 native k8s

工作原理如下(段首的序號對應圖中箭頭所示的數字):

Flink 客戶端首先連接 Kubernetes API Server,提交 Flink 集群的資源描述文件,包括 configmap,job manager service,job manager deployment 和 Owner Reference[2]。Kubernetes Master 就會根據這些資源描述文件去創建對應的 Kubernetes 實體。以我們最關心的 job manager deployment 為例,Kubernetes 集群中的某個節點收到請求後,Kubelet 進程會從中央倉庫下載 Flink 鏡像,準備和掛載 volume,然後執行啟動命令。在 flink master 的 pod 啟動後,Dispacher 和 KubernetesResourceManager 也都啟動了。前面兩步完成後,整個 Flink session cluster 就啟動好了,可以接受提交任務請求了。用戶可以通過 flink 命令行即 flink client 往這個 session cluster 提交任務。此時 job graph 會在 flink client 端生成,然後和用戶 jar 包一起通過 RestClinet 上傳。一旦 job 提交成功,JobSubmitHandler 收到請求就會提交 job 給 Dispatcher。接著就會生成一個 job master。JobMaster 向 KubernetesResourceManager 請求 slots。KubernetesResourceManager 從 Kubernetes 集群分配 TaskManager。每個 TaskManager 都是具有唯一標識的 Pod。KubernetesResourceManager 會為 TaskManager 生成一份新的配置文件,裡面有 Flink Master 的 service name 作為地址。這樣在 Flink Master failover之後,TaskManager 仍然可以重新連上。Kubernetes 集群分配一個新的 Pod 後,在上面啟動 TaskManager。TaskManager 啟動後註冊到 SlotManager。SlotManager 向 TaskManager 請求 slots。TaskManager 提供 slots 給 JobMaster。然後任務就會被分配到這個 slots 上運行。

實 踐

Flink 的文檔[3]上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。如果對 Kubernetes 不熟,可能會花點時間。

(1) 首先得有個 Kubernetes 集群,會有個 ~/.kube/config 文件。嘗試執行 kubectl get nodes 看下集群是否正常。

如果沒有這個 ~/.kube/config 文件,會報錯:

<code>2020-02-17 22:27:17,253 WARN io.fabric8.kubernetes.client.Config - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli - Error while running the Flink session.io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Service] with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c] in namespace: [default] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164) at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334) at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known/<code>

(2) 提前創建好用戶和賦權(RBAC[4])

<code>kubectl create serviceaccount flinkkubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink/<code>

如果沒有創建用戶,使用默認的用戶去提交,會報錯:

<code>Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes.Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods is forbidden: User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default"./<code>

(3) 這一步是可選的。默認情況下, JobManager 和 TaskManager 只會將 log 寫到各自 pod 的 /opt/flink/log 。如果想通過 kubectl logs 看到日誌,需要將 log 輸出到控制檯。要做如下修改 FLINK_HOME/conf 目錄下的 log4j.properties 文件。

<code>log4j.rootLogger=INFO, file, console# Log all infos to the consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n/<code>

然後啟動 session cluster 的命令行需要帶上參數:

<code>-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"/<code>

(4) 終於可以開始啟動 session cluster了。如下命令是啟動一個每個 TaskManager 是4G內存,2個CPU,4個slot 的 session cluster。

<code>bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4/<code>

更多的參數詳見文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes

使用 kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f 就能看到日誌了。

如果出現大量的如下這種日誌(目前遇到是雲廠商的LoadBalance liveness探測導致):

<code>2020-02-17 14:58:56,323 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Unhandled exceptionjava.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377) at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)/<code>

可以暫時在 log4j.properties 裡面配置上:

<code>log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file/<code>

這個日誌太多會導致 WebUI 上打開 jobmanger log 是空白,因為文件太大了前端無法顯示。

如果前面第(1)和第(2)步沒有做,會出現各種異常,通過 kubectl logs 就能很方便的看到日誌了。

Session cluster 啟動後可以通過 kubectl get pods,svc 來看是否正常。

通過端口轉發來查看 Web UI:

<code>kubectl port-forward service/kaibo-test 8081/<code>

打開 http://127.0.0.1:8001 就能看到 Flink 的 WebUI 了。

(5) 提交任務

<code>./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=k test.jar/<code>

我們從 Flink WebUI 頁面上可以看到,剛開始啟動時,UI上顯示 Total/Available Task Slots 為0, Task Managers 也是0。隨著任務的提交,資源會動態增加。任務停止後,資源會釋放掉。

在提交任務後,通過 kubectl get pods 能夠看到 Flink 為 TaskManager 分配了新的 Pod。

pods

(6) 停止 session cluster

<code>echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true/<code>

也可以手工刪除資源:

<code>kubectl delete service/<clusterid>/<code>

總 結

可以看到,Flink 1.10 版本對和 Kubernetes 的集成做了很好的嘗試。期待社區後續的 1.11 版本能對 per-job 提供支持,以及和 Kubernetes 的深度集成,例如基於原生 Kubernetes API 的高可用。最新進展請關注 FLINK-14460[5]。

參考鏈接:

[1] https://flink.apache.org/news/2020/02/11/release-1.10.0.html#native-kubernetes-integration-beta

[2] https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/

[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html

[4] https://kubernetes.io/docs/reference/access-authn-authz/rbac/

[5] https://issues.apache.org/jira/browse/FLINK-14460

周凱波(寶牛),阿里巴巴技術專家,四川大學碩士,2010 年畢業後加入阿里搜索事業部,從事搜索離線平臺的研發工作,參與將搜索後臺數據處理架構從 MapReduce 到 Flink 的重構。目前在阿里計算平臺事業部,專注於基於 Flink 的一站式計算平臺的建設。