当前位置: 首页 > ops >正文

Flink on Native K8S安装部署

Flink on Native K8S安装部署

1、拥有一个K8S环境

2 、在K8S client机器上下载安装flink

wget  https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
tar -xvf flink-1.18.1-bin-scala_2.12.tgz

3、创建namespace

kubectl create namespace test-flink-session

4、创建secret用于拉取flink镜像

kubectl create secret docker-registry flink-registry-secret \--docker-server=registry-vpc.cn-beijing.aliyuncs.com \--docker-username=xxxx \--docker-password=****** \-n test-flink-session

5、创建账户并授予访问k8s资源权限

# 新建serviceaccount
kubectl create serviceaccount flink-service-account -n test-flink-session
#
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=test-flink-session:flink-service-account
# 权限验证
kubectl auth can-i list pods --namespace test-flink-session --as system:serviceaccount:test-flink-session:flink-service-account

6、HA方式启动flink-session

修改配置文件flink-conf.yaml

env.java.home: /opt/jdk1.8.0_202
env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${FLINK_LOG_PREFIX}.hprofjobmanager.memory.process.size: 1536m
jobmanager.memory.jvm-metaspace.size: 420m
taskmanager.memory.process.size: 1536mexecution.checkpointing.interval: 10m
taskmanager.memory.managed.fraction: 0.2
jobstore.expiration-time: 86400
rest.flamegraph.enabled: true
state.savepoints.dir: oss://wh-bigdata/flink/prod-savepoint/

命令行启动flink-session集群

/opt/deploy/flink-1.18.1/bin/kubernetes-session.sh -Dkubernetes.cluster-id=test-flink-cluster \
-Dkubernetes.service-account=flink-service-account \
-Dkubernetes.namespace=test-flink-session \
-Dkubernetes.jobmanager.cpu.amount=0.05 \
-Dkubernetes.jobmanager.cpu.limit-factor=20 \
-Dkubernetes.taskmanager.cpu.amount=0.05 \
-Dkubernetes.taskmanager.cpu.limit-factor=20 \
-Dkubernetes.container.image.pull-secrets=flink-registry-secret \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dfs.oss.endpoint=oss-cn-beijing.aliyuncs.com \
-Dfs.oss.accessKeyId=****** \
-Dfs.oss.accessKeySecret=****** \
-Dhigh-availability.type=kubernetes \
-Dhigh-availability.storageDir=oss://wh-bigdata/flink/recovery \
-Dkubernetes.jobmanager.replicas=2 \
-Dweb.upload.dir=/opt/flink/artifacts \
-Dkubernetes.flink.log.dir=/opt/flink/log/data-sync-center-source \
-Dstate.backend=hashmap \
-Dstate.checkpoints.dir=oss://weihai-bigdata/flink/checkpoint \
-Dkubernetes.pod-template-file.default=/opt/deploy/flink-1.18.1/flink-pod-template.yaml

这些属性也可以配置在flink-conf.yaml中, kubernetes.pod-template-file.default指定了创建JobManager和TaskManager Pod时模版文件

apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
namespace: test-flink-session
spec:
tolerations:
- key: "environment"
operator: "Equal"
value: "flink"
effect: "NoSchedule"
nodeSelector:
environment: flink
containers:
# Do not change the main container name
- name: flink-main-container
image: wh-cn-beijing.cr.volces.com/images/base-images:1.18-oss
imagePullPolicy: Always
env:
- name: TZ
value: Asia/Shanghai
resources:
requests:
cpu: "0.05"
memory: "1024Mi"
ephemeral-storage: 1024Mi
limits:
cpu: "0.1"memory: "2048Mi"
ephemeral-storage: 2048Mi
volumeMounts:
- mountPath: /opt/flink/volumes/hostpath
name: flink-volume-hostpath
- mountPath: /opt/flink/artifacts
name: flink-artifact
- mountPath: /opt/flink/log
name: flink-logs
# Use sidecar container to push logs to remote storage or do some other debugging things
volumes:
- name: flink-volume-hostpath
hostPath:
path: /tmp
type: Directory
- name: flink-artifact
persistentVolumeClaim:
claimName: nas-flink
- name: flink-logs
persistentVolumeClaim:
claimName: nas-flink-log

7、查看创建的资源

# 查看所有service
kubectl get svc -n test-flink-session
# 查看所有cm 
kubectl get cm -n test-flink-session
# 查看所有deploy:  
kubectl describe deploy -n test-flink-session
# 删掉deploy
kubectl delete deploy -n test-flink-session test-flink-cluster

8、固定IP和service端口

查看pod及端口

kubectl get pod -n test-flink-session -o wide

每次重新部署deployment时,rest-svc端口都会变化,当前flink k8s native模式不能通过配置文件指定rest-svc端口,需要修改k8s svc的nodePort来固定端口

#强制service端口
kubectl  replace --force -f /opt/deploy/flink-1.18.1/svc/test-flink-cluster.yaml#test-flink-cluster.yaml文件内容如下:
#给jobmanager pod打上特殊标签
#pod_name=$(kubectl get pod -n test-flink-session |grep test-flink-cluster | awk  -F ' '  '{print $1}')
#kubectl label pod ${pod_name} environment=test-flink-cluster-rest -n test-flink-session
#test-flink-cluste.yaml
apiVersion: v1
kind: Service
metadata:
name: test-flink-cluste-rest
namespace: test-flink-session
spec:type: NodePort#selector:#  environment: test-flink-cluste-restports:- name: restprotocol: TCPport: 8081targetPort: 8081nodePort: 32583  #此处用于固定rest-svc的端口		

也可通过kubectl edit svc test-flink-cluste-rest -n test-flink-session将ports下的nodePort修改为32583

9、提交fink任务

flink-session有内存泄漏的风险,任务提交次数多了后,会导致JobManager OOM crash, 在HA的模式下会重新拉起一个JobManager恢复之前执行的Job

  • 通过命令行提交,用户的main函数在提交机器上执行,编译成StreamGraph后通过rest接口提交给session集群执行

    # 向flink session cluster提交任务
    ./bin/flink run   --target kubernetes-session -Dkubernetes.cluster-id=test-flink-cluster -Dkubernetes.namespace=test-flink-session -d -c com.wh.crm.DemoTask /home/appuser/wh-crm-flink-1.0.1-SNAPSHOT.jar -env test #log配置文件:conf/log4j-console.properties
    # 查看日志
    kubectl logs -n test-flink-session -f test-flink-cluster-taskmanager-1-4
    #进入Pod查看
    kubectl exec -n test-flink-session -it  test-flink-cluster-taskmanager-1-4 -- bash
    
  • 通过webui提交,在web ui上传jar包,存放到JobManager Pod本地磁盘临时目录中,如果希望session集群重启后依然能看到上传的jar, 则需要将临时目录挂载到持久化卷中

    注意 Entry Class为必填项,且需要注意主类名后不要有空格。其余三个参数为可选项。

    Parallelism: 全局的并行度,会被代码中setParallelism()方法设置的并行度覆盖

    Program Arguments: 命令行使用 - 或 – 设置的自定义参数,flink代码中使用ParameterTool.fromArgs进行解析。例如,-env test

    Savepoint Path: flink任务savepoint的地址,用于断点恢复任务。

    Allow Non Restored State:设置Savepoint Path后,可以选择是否开启,一般设置Savepoint Path后需要开启。

  • Application模式:

    镜像中需包含Job Jar

    /home/appuser/flink-1.18.1/bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=binlog-kafka-handle \
    -Dkubernetes.service-account=flink-service-account \
    -Dkubernetes.namespace=test-flink-session \
    -Dkubernetes.jobmanager.cpu.amount=0.05 \
    -Dkubernetes.jobmanager.cpu.limit=0.05 \
    -Dkubernetes.taskmanager.cpu.amount=0.05 \
    -Dkubernetes.taskmanager.cpu.limit=0.05 \
    -Dkubernetes.container.image.pull-secrets=flink-registry-secret \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dtaskmanager.numberOfTaskSlots=1 \
    -Djobmanager.memory.process.size=1536mb \
    -Djobmanager.memory.jvm-metaspace.size=718mb \
    -Dtaskmanager.memory.process.size=1024mb \
    -Dtaskmanager.memory.managed.size=32mb \
    -Dtaskmanager.memory.jvm-metaspace.size=128mb \
    -Dfs.oss.endpoint=oss-cn-beijing-internal.aliyuncs.com \
    -Dfs.oss.accessKeyId=****** \
    -Dfs.oss.accessKeySecret=****** \
    -Dkubernetes.pod-template-file.default=/home/appuser/flink-1.18.1/flink-oss-pod-template.yaml \
    -c com.wh.binlog.CdcMysqlWashingTask \
    local:///opt/flink/artifacts/binlog_kafka_handle-out-1.1-SNAPSHOT.jar \  #jar包在镜像中的路径
    -env test #参数
    

有时需定制flink-image, 让image中包含用户库依赖的jar:

#Dockerfile文件
FROM registry.cn-beijing.aliyuncs.com/yican/flink:1.18
RUN ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
ENV FLINK_HOME=/opt/flink
WORKDIR $FLINK_HOME
RUN mkdir ./plugins/oss-fs-hadoop && rm -rf  ./lib/flink-table-planner-loader-1.18.1.jar
RUN cp ./opt/flink-oss-fs-hadoop-1.18.1.jar ./plugins/oss-fs-hadoop/ && cp ./opt/flink-table-planner_2.12-1.18.1.jar ./lib/
## chown很重要,否则本地copy的类在镜像中时root用户无法被加载
COPY --chown=flink:flink *.jar /opt/flink/lib/
#COPY flink-fs-hadoop-shaded-1.18-SNAPSHOT.jar flink-oss-fs-hadoop-1.18.1.jar flink-hadoop-fs-1.18.1.jar ./plugins/oss-fs-hadoop/
#RUN  sed -i '/^env\.java\.opts\.all:/d' $FLINK_HOME/conf/flink-conf.yaml; \
#     echo 'env.java.opts: "-Dfile.encoding=UTF-8"' | tee -a $FLINK_HOME/conf/flink-conf.yaml > /dev/null
## 更新包索引并安装必要的工具
#RUN apt-get update && \
#    apt-get install -y unzip# 安装ossutil
#RUN curl https://gosspublic.alicdn.com/ossutil/install.sh | bash#COPY ossutilconfig /root

10、关闭flink-sesesion

$ echo 'stop' | ./bin/kubernetes-session.sh \-Dkubernetes.namespace=test-flink-session \-Dkubernetes.cluster-id=test-flink-cluster \-Dexecution.attached=true或者手动清理
# 删除deployment
kubectl  delete deployment/test-flink-cluster -n test-flink-session
#删除svc
kubectl  delete svc test-flink-cluster-rest -n test-flink-session
#删除cm, 存储了配置信息、Pod模版、HA信息
kubectl  delete cm flink-config-test-flink-cluster pod-template-test-flink-cluster test-flink-cluster-cluster-config-map -n test-flink-session

11、错误日志发kafka

配置log4j-console.properties

rootLogger.appenderRef.kafka.ref = KafkaAppenderappender.kafka.type = Kafka
appender.kafka.name = KafkaAppender
appender.kafka.syncSend = false
appender.kafka.topic = ${sys:log-topic:-flink-app-log}
appender.kafka.p.type=Property
appender.kafka.p.name=bootstrap.servers
appender.kafka.p.value=${sys:kafka-broker}
appender.kafka.layout.type = PatternLayout
appender.kafka.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.kafka.filter.threshold.type = ThresholdFilter
appender.kafka.filter.threshold.level = ERROR 
http://www.xdnf.cn/news/18129.html

相关文章:

  • 软件系统运维常见问题
  • 快手可灵招海外产品运营实习生
  • 51单片机拼接板(开发板积木)
  • 计算机毕设推荐:痴呆症预测可视化系统Hadoop+Spark+Vue技术栈详解
  • MySQL事务篇-事务概念、并发事务问题、隔离级别
  • Vibe 编码技巧与建议(Vibe Coding Tips and Tricks)
  • AAA服务器技术
  • Qt中使用QString显示平方符号(如²)
  • 搭建最新--若依分布式spring cloudv3.6.6 前后端分离项目--步骤与记录常见的坑
  • 【qml-5】qml与c++交互(类型单例)
  • 前端下载文件、压缩包
  • Java网络编程:TCP与UDP通信实现及网络编程基础
  • 集成电路学习:什么是Object Tracking目标跟踪
  • 大模型参数如何影响模型的学习和优化?
  • 从H.264到AV1:音视频技术演进与模块化SDK架构全解析
  • 开源游戏引擎Bevy 和 Godot
  • ProfiNet从站转Modbus TCP网关技术详解
  • 【深度解析】2025年中国GEO优化公司:如何驱动“答案营销”
  • 【实时Linux实战系列】实时大数据处理与分析
  • 关闭VSCode Markdown插件在Jupyter Notebook中的自动预览
  • 第四章:大模型(LLM)】07.Prompt工程-(2)Zero-shot Prompt
  • Node.js完整安装配置指南(包含国内镜像配置)
  • 【2025CVPR-目标检测方向】学习稳健且硬件自适应的对象检测器,以应对边缘设备的延迟攻击
  • 黑马java入门实战笔记
  • 链路聚合路由器OpenMPTCProuter源码编译与运行
  • 【Day 30】Linux-Mysql数据库
  • vue的双向数据绑定
  • 【DL学习笔记】损失函数各个类别梳理
  • Go并发编程-goroutine
  • Docker小游戏 | 使用Docker部署文字风格冒险网页小游戏