您好,登錄后才能下訂單哦!
這篇文章主要介紹“基于k8s如何部署Session模式Flink集群”,在日常操作中,相信很多人在基于k8s如何部署Session模式Flink集群問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”基于k8s如何部署Session模式Flink集群”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在分布式計算領域中,Apache Flink是一個快速、可靠且易于使用的計算引擎。Flink集群是一個分布式系統,它由Flink JobManager和多個Flink TaskManager組成。部署Flink集群時,高可用性是非常重要的一個考慮因素。
在Flink中,有兩種部署模式:Standalone和Session。Standalone模式下,Flink集群是一組獨立的進程,它們共享同一個配置文件,并通過Akka通信。Session模式下,Flink集群是動態的、可伸縮的,可以根據需要啟動或停止。Session模式下,Flink JobManager和TaskManager進程運行在容器中,可以通過k8s進行動態管理。
Session模式的優點是:
可以根據需要啟動或停止Flink集群
可以動態添加或刪除TaskManager
可以使用k8s的伸縮功能自動調整Flink集群的大小
可以與k8s的其他資源進行整合,例如存儲卷、網絡策略等
因此,Session模式是在Kubernetes上部署Flink集群的首選模式。
在 Flink 的處理過程中,數據可能會存儲在不同的文件系統中,如本地文件系統、HDFS、S3 等。為了統一處理這些文件系統,Flink 引入了 FileSystem 的概念,它是一個抽象的接口,提供了對不同文件系統的統一訪問方式。
fileSystem 的實現類可以通過 Flink 的配置文件指定。Flink 支持多種文件系統,包括本地文件系統、HDFS、S3、Google Cloud Storage 等,因為minio實現了s3協議,所以也可以使用minio來作為文件系統。
組件 | 版本號 |
---|---|
kubernetes | 1.15.12 |
flink | 1.15.3 |
使用minio作為文件系統需要增加s3相關的依賴jar包,所以需要自己制作鏡像
Dockerfile:
FROM apache/flink:1.15.3-scala_2.12 # 需要用到的jar包 # flink-cdc ADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/ # jdbc連接器 ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/ # mysql驅動 ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/ # oracle驅動 ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/ # 文件系統插件需要放到插件目錄,按規范放置 RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/
構建鏡像:
docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .
配置文件分兩個部分,flink-conf.yaml
和log4j-console.properties
apiVersion: v1 kind: ConfigMap metadata: name: flink-config namespace: szyx-flink labels: app: flink data: flink-conf.yaml: |+ kubernetes.cluster-id: szyx-flink # 所在的命名空間 kubernetes.namespace: szyx-flink jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 2867m parallelism.default: 2 execution.checkpointing.interval: 10s # 文件系統 fs.default-scheme: s3 # minio地址 s3.endpoint: https://minio.k8s.io:9000 # minio的bucket s3.flink.bucket: szyxflink s3.access-key: <minio賬號> s3.secret-key: <minio密碼> # 狀態存儲格式 state.backend: rocksdb s3.path.style.access: true blob.storage.directory: /opt/flink/tmp/blob web.upload.dir: /opt/flink/tmp/upload io.tmp.dirs: /opt/flink/tmp # 狀態管理 # checkpoint存儲地址 state.checkpoints.dir: s3://szyxflink/state/checkpoint # savepoint存儲地址 state.savepoints.dir: s3://szyxflink/state/savepoint # checkpoint間隔 execution.checkpointing.interval: 5000 execution.checkpointing.mode: EXACTLY_ONCE # checkpoint保留數量 state.checkpoints.num-retained: 3 # history-server# 監視以下目錄中已完成的作業 jobmanager.archive.fs.dir: s3://szyxflink/completed-jobs # 每 10 秒刷新一次 historyserver.archive.fs.refresh-interval: 10000 historyserver.archive.fs.dir: s3://szyxflink/completed-jobs # 高可用 high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://szyxflink/ha # 每6個小時觸發一次savepoint kubernetes.operator.periodic.savepoint.interval: 6h kubernetes.operator.savepoint.history.max.age: 24h kubernetes.operator.savepoint.history.max.count: 5 # Restart of unhealthy job deployments kubernetes.operator.cluster.health-check.enabled: true # Restart failed job deployments kubernetes.operator.job.restart.failed: true log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
在 Kubernetes 上部署 Flink 集群時,需要創建一個 serviceAccount 來授權 Flink 任務在 Kubernetes 集群中執行。ServiceAccount 是 Kubernetes 中一種資源對象,用于授權 Pod 訪問 Kubernetes API。當 Flink JobManager 或 TaskManager 啟動時,需要使用這個 serviceAccount 來與 Kubernetes API 交互,獲取集群資源并進行任務的調度和執行。
apiVersion: v1 kind: ServiceAccount metadata: name: flink-service-account namespace: szyx-flink --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: szyx-flink name: flink rules: - apiGroups: [""] resources: ["pods", "services","configmaps"] verbs: ["create", "get", "list", "watch", "delete"] - apiGroups: [""] resources: ["pods/log"] verbs: ["get"] - apiGroups: ["batch"] resources: ["jobs"] verbs: ["create", "get", "list", "watch", "delete"] - apiGroups: ["extensions"] resources: ["ingresses"] verbs: ["create", "get", "list", "watch", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: namespace: szyx-flink name: flink-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: flink subjects: - kind: ServiceAccount name: flink-service-account namespace: flink
jobManager掛載用pvc
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: flink-tmp namespace: szyx-flink spec: accessModes: - ReadWriteOnce resources: requests: storage: 40Gi
Deployment:
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager namespace: szyx-flink spec: replicas: 1 # Set the value to greater than 1 to start standby JobManagers selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager imagePullPolicy: Always image: sivdead/flink:1.15.3_scala_2.12 env: # 注入POD的ip到容器內 - name: POD_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.podIP # 時區 - name: TZ value: Asia/Shanghai # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP. args: ["jobmanager", "$(POD_IP)"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 resources: requests: memory: "8192Mi" cpu: "4" limits: memory: "8192Mi" cpu: "4" volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf - name: tmp-dir mountPath: /opt/flink/tmp securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps # 節點選擇器 nodeSelector: zone: mainland # 節點容忍 tolerations: - key: zone value: mainland effect: NoSchedule volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties name: tmp-dir persistentVolumeClaim: claimName: flink-tmp
Service:
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager
Ingress:
apiVersion: extensions/v1beta1 kind: Ingress metadata: annotations: # 因為有可能需要上傳jar包,所以需要設置大一些 nginx.ingress.kubernetes.io/proxy-body-size: 300m nginx.ingress.kubernetes.io/rewrite-target: /$1 name: job-manager namespace: szyx-flink spec: rules: - host: flink.k8s.io http: paths: - backend: serviceName: flink-jobmanager servicePort: 8081 path: /flink/(.*)
訪問http://flink.k8s.io/flink/
能打開flink界面,說明部署完成
Deployment:
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager namespace: szyx-flink spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager imagePullPolicy: Always image: sivdead/flink:1.15.3_scala_2.12 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary resources: requests: memory: "8192Mi" cpu: "4" limits: memory: "8192Mi" cpu: "4" # 節點選擇器 nodeSelector: zone: mainland # 節點容忍 tolerations: - key: zone value: mainland effect: NoSchedule volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
部署完成后,打開flink頁面,查看TaskManages:
在頁面上提交flink自帶的示例:WordCount.jar
重啟jobmanager,檢查作業jar包是否依然存在
運行作業
檢查運行結果
到此,關于“基于k8s如何部署Session模式Flink集群”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。