Flink与Kubernetes集成
引言
在当今大数据与云计算蓬勃发展的时代,容器编排与流处理技术成为企业数据处理架构的关键支柱。Kubernetes作为容器编排系统的行业标准,能够高效自动化地部署、扩展和管理计算机应用程序;Apache Flink则是流处理和批处理领域的佼佼者,以强大的实时处理能力和精准的状态管理著称。当Flink与Kubernetes实现深度集成,二者优势互补,为企业带来了更加灵活、高效、智能的数据处理解决方案。本文将基于详实的操作笔记,深入解析Flink与Kubernetes集成的全流程,以及不同部署模式的技术要点与实践细节。
一、集成环境搭建
1.1 环境要求
Flink与Kubernetes的集成对运行环境有着严格且明确的要求,这些条件是确保集成顺利进行和集群稳定运行的基础:
- Kubernetes版本:Kubernetes集群版本必须在1.9及以上,高版本的Kubernetes不仅提供了更丰富的功能特性,还能更好地与Flink进行兼容性适配,保证Flink作业在集群上的稳定运行和高效调度。
- kubectl配置:
kubecconfig
文件是连接本地客户端与Kubernetes集群的桥梁,通常存储在~/.kube/config
路径下 。通过执行kubectl auth can-i <list|create|edit|delete> pods
命令,可以验证当前用户是否具备对Pods和服务进行列出、创建、编辑、删除等操作的权限。若权限不足,在后续部署Flink集群资源时,将会遇到权限拒绝的错误,导致部署失败。 - Kubernetes DNS:Kubernetes DNS服务的开启至关重要,它承担着集群内服务发现的核心功能。在Flink与Kubernetes集成环境中,各组件之间需要通过服务名称进行通信,Kubernetes DNS能够将服务名称解析为对应的IP地址,确保Flink JobManager、TaskManager等组件之间的网络通信畅通无阻。
- RBAC权限:默认的
default
服务账号需要具备RBAC(基于角色的访问控制)中创建、删除Pods的权限。然而,为了实现更精细化的资源管理和权限隔离,建议专门为Flink创建独立的命名空间和服务账号。这样做不仅可以避免因权限混乱导致的部署失败风险,还能显著降低后期的运维成本,使得Flink集群的管理更加清晰、安全和高效。
1.2 创建专属资源
为了给Flink集群打造一个独立、安全且便于管理的运行环境,需要按照以下步骤创建专属资源:
- 创建命名空间:使用命令
kubectl create ns flink
,即可创建一个名为flink
的命名空间。这个命名空间就像是一个独立的“数据处理小世界”,后续所有与Flink相关的资源,如Pods、Services、ConfigMaps等,都将部署在这个空间内,实现与其他应用资源的逻辑隔离。 - 创建ServiceAccount:执行
kubectl create serviceaccount flink-service-account -n flink
命令,创建flink-service-account
服务账号。该服务账号将作为Flink集群与Kubernetes集群进行交互的“身份凭证”,用于验证操作权限和进行安全通信。 - 用户授权:通过
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
命令,为flink-service-account
服务账号赋予edit
权限。这意味着该账号可以对集群内的资源进行创建、修改和删除等操作,从而满足Flink集群部署和运行过程中对资源管理的需求 。
二、Flink Standalone Kubernetes部署模式
2.1 模式概述
Flink Standalone Kubernetes集成模式支持session
和application
两种部署模式,而per-job
模式目前仅在YARN环境中支持,并且在Flink 1.15版本中已在YARN环境下被弃用(具体可参考FLINK-26000相关内容) 。本次重点研究非HA(高可用)部署模式,虽然在实际生产环境中,HA模式能够提供更高的可靠性和容错能力,但非HA模式与HA模式在核心原理和大部分配置上是相通的,HA模式的详细配置可参考官方文档进行深入学习。session
和application
模式的主要差异体现在JobManager和TaskManager服务的声明方式上,不过两种模式也存在一些通用的集群资源配置。
2.2 通用集群资源配置
在Flink Standalone Kubernetes部署中,以下几种资源是通用的,它们为Flink集群的正常运行提供了基础配置和服务支持:
- flink-configuration-configmap.yaml:该文件主要用于配置Flink的核心参数以及日志相关设置,是Flink集群运行的重要配置文件。示例内容如下:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2 log4j-console.properties: |+# 如下配置会同时影响用户代码和 Flink 的日志行为rootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分#logger.flink.name = org.apache.flink#logger.flink.level = INFO# 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。# root logger 的配置不会覆盖此处配置。# 你必须手动修改这里的日志级别。logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# 将所有 info 级别的日志输出到 consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# 将所有 info 级别的日志输出到指定的 rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# 关闭 Netty channel handler 中不相关的(错误)警告logger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
- flink-reactive-mode-configuration-configmap.yaml:当需要启用Flink的响应式调度模式时,需要配置此文件。除了包含与
flink-configuration-configmap.yaml
类似的核心参数配置外,还会设置与响应式模式相关的特定参数,如调度模式和检查点间隔等。示例如下:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels: app: flink
data:flink-conf.yaml: jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m parallelism.default: 2scheduler-mode: reactiveexecution.checkpointing.interval: 10s log4j-console.properties: |+# 如下配置会同时影响用户代码和 Flink 的日志行为rootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender# 如果你只想改变 Flink 的日志行为则可以取消如下的注释部分#logger.flink.name = org.apache.flink #logger.flink.level = INFO # 下面几行将公共 libraries 或 connectors 的日志级别保持在 INFO 级别。# root logger 的配置不会覆盖此处配置。# 你必须手动修改这里的日志级别。logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafka logger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # 将所有 info 级别的日志输出到 consoleappender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# 将所有 info 级别的日志输出到指定的 rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # 关闭 Netty channel handler 中不相关的(错误)警告 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
- jobmanager-service.yaml:这是一个可选的Service资源,仅在非HA模式下需要使用,其主要作用是定义JobManager服务,用于集群内部组件之间的通信。通过该Service,JobManager的RPC端口(用于任务调度通信)、Blob Server端口(用于管理二进制大对象)和Web UI端口(用于用户监控和管理界面访问)得以暴露。具体配置如下:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
- jobmanager-rest-service.yaml:同样是可选的Service,该服务的作用是将JobManager的REST端口暴露为Kubernetes节点端口,使得外部客户端可以通过节点的IP地址和指定端口访问Flink的Web UI界面,方便用户对Flink作业进行监控、管理和操作。配置示例如下:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
- taskmanager-query-state-service.yaml:此Service也是可选的,它的功能是将TaskManager的查询状态服务端口暴露为公共Kubernetes node的节点端口,通过该端口可以访问Flink的Queryable State服务,用于查询和管理TaskManager中的状态数据。配置内容如下:
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
当完成上述通用集群资源的配置文件编写后,可以使用以下命令来创建这些服务资源:
# 在执行以下指令时,优先检查是否已经定义了通用集群资源声明
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
2.3 Application模式部署
Flink Application集群是一种专门为运行单个Application而设计的专用集群,在这种模式下,部署集群时必须确保对应的Application能够正常运行。因此,在提交Flink作业任务时,首先需要启动Flink Application集群,保证其处于可用状态,然后才能进行作业提交操作。
在Kubernetes上部署一个基本的Flink Application集群时,通常需要包含以下三个核心组件:
- 一个运行JobManager的Application:JobManager在Flink集群中扮演着“大脑”的角色,负责作业的调度、任务分配以及与TaskManager之间的协调通信,确保整个作业的顺利执行。
- 运行若干个TaskManager的Deployment:TaskManager是实际执行任务的工作节点,它们接收来自JobManager的任务指令,进行数据处理和计算工作。通过Deployment来管理TaskManager,可以方便地实现任务节点的扩缩容,以适应不同作业负载的需求。
- 暴露JobManager上REST和UI端口的Service:该Service的作用是将JobManager的REST API端口和Web UI端口暴露出来,方便用户通过浏览器访问Flink的Web界面,对作业进行监控、提交、暂停、取消等操作,同时也便于外部系统通过REST API与Flink集群进行交互。
在进行Application集群资源定义时,需要特别注意以下几点:
- jobmanager-job.yaml:在该文件中,
args
属性必须指定用户作业的主类,这是告诉Flink集群从哪个类开始执行用户提交的作业。此外,还可以参考如何设置JobManager参数的相关文档,了解如何将额外的参数传递给jobmanager-job.yaml
配置中指定的Flink镜像,以满足不同作业的个性化需求。 - job artifacts参数:该参数所对应的作业资源(如Jar包等)必须能够从资源定义示例中的
job-artifacts-volume
处获取。如果是在minikube集群中创建这些组件,那么可以将定义示例中的job-artifacts-volume
挂载为主机的本地目录,方便快捷地提供作业资源。若不使用minikube集群,也可以使用Kubernetes集群中任何其它可用类型的volume,如PersistentVolumeClaim等,来提供作业所需的资源。另外,还可以选择构建一个已经包含job artifacts参数的自定义镜像,将作业资源直接打包在镜像中,简化资源管理流程。
以下是Apllication Mode集群资源定义模板,在实际生产环境中使用时,需要根据具体需求进行相应的调整:
- jobmanager-application-non-ha.yaml:非HA模式下JobManager的配置模板示例:
实在抱歉,给你带来不好的体验!我将分多次输出完整内容,每次保证段落完整连贯。以下从Flink Standalone Kubernetes部署模式的Session模式继续。
2.4 Session模式部署
Flink Session集群以长期运行的Deployment形式存在,可在其上运行多个作业。部署包含运行JobManager的Deployment、运行TaskManager的Deployment和暴露JobManager端口的Service。
- 资源定义模板: jobmanager-session-deployment-non-ha.yaml:非HA模式下JobManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.14.4-scala_2.11args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
jobmanager-session-deployment-ha.yaml:HA模式下JobManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1 # 通过设置大于 1 的整型值来开启 Standby JobManagerselector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.14.4-scala_2.11env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 的属性值。args: ["jobmanager", "$(POD_IP)"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改serviceAccountName: flink-service-account # 拥有创建、编辑、删除 ConfigMap 权限的 Service 账号volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
taskmanager-session-deployment.yaml:TaskManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:1.14.4-scala_2.11args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999 # 参考官方 flink 镜像中的 _flink_ 用户,如有必要可以修改volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
- 集群操作命令
创建Session集群:
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml
停止Flink session集群:
kubectl delete -f taskmanager-session-deployment.yaml
kubectl delete -f jobmanager-session-deployment.yaml
三、Flink Native Kubernetes部署模式
Flink Native kubernetes模式默认只启动jobmanager,之后根据job任务提交情况,动态的申请、启动taskmanager计算资源,目前该模式支持session、application部署方式,不支持per - job方式。
3.1 Flink Native Kubernetes(Session)
Flink Session Native on kubernetes和Flink 流程大致相同,都需要构建基础dockerfile,首先需要将获取的基础镜像push到本地仓库中,其次才是构建镜像仓库。
- 编写dockerfile
cat >> kubernete-native-session <<EOF
# 将以下内容填充到该docker文件
FROM apache/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
EOF
- 构建docker镜像
docker build -t ruijie/bigdata/flink-session:1.14.6-scala_2.12. --no-cache
#推送到私有仓库中
docker push ruijie/bigdata/flink-session:1.14.6-scala_2.12
- 创建flink native session kubernetes集群
./bin/kubernetes-session.sh \-Dkubernetes.cluster-id=flink-native-cluster \-Dkubernetes.container.image=ruijie/bigdata/flink-session:1.14.6-scala_2.12 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dkubernetes.rest-service.exposed.type=NodePort
执行完毕之后,可以得到以下结果,我们指定的是nodePort而非clusterId,后续会对这一部分进行详细解释,执行完毕之后我们的kubernetes 的flink native session创建完毕了,可以通过日志打印看出web - ui暴露的地址进行访问。
通过访问控制台打印的日志可以找到web - ui访问地址;或者通过kubectl get pods -n flink
查看,然后通过kubectl logs -f
查看日志。
4. 提交任务
./bin/flink run \--target kubernetes-session \-Dkubernetes.cluster-id=flink-native-session \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \./examples/streaming/TopSpeedWindowing.jar \./examples/streaming/WordCount.jar \-Dkubernetes.taskmanager.cpu=2000m \-Dexternal-resource.limits.kubernetes.cpu=4000m \-Dexternal-resource.limits.kubernetes.memory=10Gi \-Dexternal-resource.requests.kubernetes.cpu=2000m \-Dexternal-resource.requests.kubernetes.memory=8Gi \-Dkubernetes.taskmanager.cpu=2000m
- 删除集群资源
kubectl delete deployment/flink-native-session
- 取消正在运行的任务
echo'stop' | ./bin/kubernetes-session.sh \-Dkubernetes.cluster-id=my-first-flink-cluster \-Dexecution.attached=true
3.2 Flink Application Native Kubernetes
Flink application on native 和以上的相同都需要经过dockerfile 文件的编写和构建镜像。
- 编写dockerfile文件
cat >> flink-application<<EOF
FROM flink:1.14.6-scala_2.12
ENV FLINK_HOME=/opt/flink
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
EOF
- 构建镜像与操作
# 构建镜像
docker build -f flink-application -t docker.ruijie.com/bigdata/application-flink:1.14.6. --no-cache
# 推动到本地仓库中
docker push docker.ruijie.com/bigdata/application-flink:1.14.6
# 删除本地镜像
docker rmi docker.ruijie.com/bigdata/application-flink:1.14.6
# 删除kubernetes镜像
crictl rmi docker.ruijie.com/bigdata/application-flink:1.14.6
同时也需要构建和下发到其他node节点将镜像加载进去,避免因找不到镜像而报错。命名空间和权限也相同,参考以上步骤。
3. 启动任务
./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=flink-application-cluster \-Dkubernetes.container.image=docker.ruijie.com/bigdata/application/flink:1.14.6 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dkubernetes.websocket.timeout=60000 \-Dexternal-resource.limits.kubernetes.memory=1Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///usrlib/TopSpeedWindowing.jar
- 查看任务运行情况
# 查看pods和svc
kubectl get pods,svc -n flink
# 查看日志
kubectl logs -f rj-flink-cluster-f4d9b796-lqg7q -n flink
- native application集群管理
# 列出集群上正在运行的作业
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=flink-native-session
# 取消正在运行的作业
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=flink-native-session <jobId>
以上就是Flink与Kubernetes集成的全流程及各模式详细操作。如果在实际操作中有任何疑问,或想了解某部分的更多细节,欢迎随时告诉我。