基于Flink的实时开发平台-Dinky
前提介绍
Dinky 是一个开箱即用、易扩展,以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架的一站式实时计算平台,致力于流批一体和湖仓一体的探索与实践。 致力于简化Flink任务开发,提升Flink任务运维能力,降低Flink入门成本,提供一站式的Flink任务开发、运维、监控、报警、调度、数据管理等功能。
安装部署
本次安装版本为:1.2.3版本。
注意事项:自Dinky1.1.0版本之后,由dinky自主适配Flink版本。
1. 下载安装
tar -zxf dinky-release-${flink.version}-${dinky.version}.tar.gz
mv dinky-release-${flink.version}-${dinky.version} dinky
cd dinky
Dinky默认为H2数据库,方便测试部署使用,但是数据具备临时性,重启后消失。
MySQL数据库:推荐的一种方式,由Dinky在SQL目录下定义。
PostGreSQL数据库:由Dinky在SQL目录中定义(已测证1.2.3版本下,dinky嵌合postgres有问题)vim config/application.yml# 修改 Dinky 所使用的数据库类型为 mysql
spring:application:name: Dinkyprofiles:# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]# If you use mysql database, please configure mysql database connection information in application-mysql.yml# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with cautionactive: ${DB_ACTIVE:mysql} #[h2,mysql,pgsql]# 修改mysql配置文件
vim application-mysql.yml
# 修改 Dinky 的 mysql 链接配置
spring:datasource:url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: ${MYSQL_USERNAME:dinky}password: ${MYSQL_PASSWORD:dinky}driver-class-name: com.mysql.cj.jdbc.Driver
可以注意到dinky具备flyway的方式,当第一次部署时,flyway会自动完成,不需要专注为数据库层。
如果想手动创建,可以前往sql目录,自行创建数据库。
2. 配置数据库环境
建议dinky单独一个用户,具备权限管控
#登录mysql
mysql -uroot -p
#创建数据库
mysql>
CREATE DATABASE dinky;
#创建用户并允许远程登录
mysql>
create user 'dinky'@'%' IDENTIFIED WITH mysql_native_password by 'dinky';
#授权
mysql>
grant ALL PRIVILEGES ON dinky.* to 'dinky'@'%';
mysql>
flush privileges;
3. 上传相关依赖
Dinky需要具备内置的Flink环境,该Flink环境的实现需要用户自己在Dinky根目录下extends/flink1.17文件夹上传相关的Flink依赖,如flink-dist,flink-table等依赖。
其中有三个依赖比较特殊,第一个是flink-table-plannner,需要从Flink的opt目录中获取到,不能使用带loader相关依赖;Dinky当前版本下yarn环境下需要依赖于flink-shaded-hadoop的包;第三个,是第二个依赖缺少相关资源导致的,一个外部工具包。
如果Dinky需要连接集群环境,就需要将相关依赖包上传到hdfs中,进行关联使用;
4. 启动服务
# 启动
sh ./bin/auto.sh start 1.17
# 日志刷新会在logs/dinky.log中打印
# 关闭
sh ./bin/auto.sh stop
开发流程
账号准备
账号 | 密码 | 角色 |
---|---|---|
admin | Hnpost@2025 | 超级管理员 |
后期在正式使用过程中,可按需创建角色与用户。
进入页面
工作台:收括了此平台中所有计算任务的概览情况
数据开发:基于不同类型的作业,进行相应的数据开发。
运维中心:包含了所有作业在执行过程监控,包括指标、日志等可以在此页面进行查看。
注册中心:集合了平台任务中所需要使用到的一些集群环境、数据源、全局变量以及一些其他资源的配置。
认证中心:包含了平台中一些系统用户、角色等信息
配置中心:包含了一些全局配置:Dinky配置、flink配置等一些全局配置项。
实践案例
这种开发一个案例,通过从kafka接取数据,并入到postgres数据库中。
注册集群
dinky默认集成了一个本地flink,但是只能用于测试环境的开发,局限性很高,没有flink on Yarn综合性能强,故而我们需要通过dinky连接到搭建的flink on yarn环境。
进入注册中心,选择集群配置。
点击新建
有几项重要配置项:
-
类型:选择Yarn模式
-
集群配置名称:并不限制输入的格式是什么样子,但是推荐按照主业务-环境-集群的格式进行创建。
-
Hadoop配置文件路径:其余方式下需要有hadoop配置文件才能操作hdfs。这个可以按照实际服务器配置文件进行填写,可以参考下面填写完毕的模板。
-
Flink Lib路径:提供Flink的资源包路径,数据开发的一个底层依赖环境,填写hdfs配置路径,之前安装步骤中有涉及这些文件的上传。
-
Flink配置文件路径:Flink所使用的配置文件。这个可以按照实际服务器配置文件进行填写,可以参考下面填写完毕的模板。
-
Jar文件路径:需要配置dinky与flink的关联包。
配置完毕后,点击测试连接,是否可以测试成功,如果测试成功后,点击保存,集群环境已保存完毕。
注册数据源
主要用于管理数据源、包括数据源的创建、编辑、删除、查看元数据、可支持自动创建FlinkDDL等。
支持的数据源类型有:Mysql、Oracle、PostGreSQL、SQLServer、Phoenix、ClickHouse、Doris、StarRocks、Presto、Hive等。
进入配置中心,选择数据源,点击新建
有几个配置项:
-
数据源名称:没有过多限制,按照易读性进行设置即可。
-
数据源类型:按照所需的类型进行选择
-
用户名:数据源涉及的用户名信息
-
密码:数据源涉及的密码信息
-
数据库连接URL:数据源涉及的URL信息
-
Flink连接配置/Flink连接模块:在后期进行flink数据开发会用到,也可以不配置
Flink连接配置
将一些固定参数,主要是数据库连接信息封装到参数中,以便后期可以使用,开启了全局变量,这样可以避免暴漏密码等敏感信息。
Flink连接模块
在Flink作业进行创建时,可以提供一些预设信息,生成flink代码时,无需再次填写。
FlinkSQL开发
首选进入到页面后,创建相对应的业务目录,并创建出作业。
重点配置项
-
作业名称:按照实际作业业务名称进行填写
-
作业类型:选择对应的类型
-
模板:可对应的选择模板,如果上面配置完毕成功则可以,非必要项。
CREATE TABLE source_table (`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',name String,sex String
) WITH ('connector' = 'kafka','topic' = 'my-topic','properties.bootstrap.servers' = '10.177.32.73:9092,10.177.32.74:9092,10.177.32.251:9092','properties.group.id' = 'group_test','scan.startup.mode' = 'earliest-offset','format' = 'json'
);
CREATE TABLE IF NOT EXISTS sink_user (event_time timestamp,`name` STRING,`sex` STRING
) WITH ('connector' = 'jdbc','url' = 'jdbc:postgresql://10.177.32.71:10000/zcptdb?currentSchema=dinky&stringtype=unspecified','table-name' = 'sink_user','username' = 'zcpt_user','password' = 'DvtZc@#2024'
);
insert into sink_user select * from source_table;
右侧配置项:
-
作业的并行度:推荐默认为1,具体的并行度可以在算子层次进行单独设置
-
全局变量:如果需要引用全局变量,则需要开启这个
上面菜单栏为作业的控制栏,上述图中显示为选择作业运行环境,有本地模式/yarn集群模式。
是否开启元数据持久化,默认是禁用,如果需要使用,可以将过程化产生的数据表持久到数据册中,默认这个DefaultCataLog是基于内存的,所有在使用过程中需要特别注意。
-
运行:作业将会被提交运行
-
查询:比较特殊的选项,如果代码中包含查询语句,可以通过这个查询,在下面窗口中直接查看、
-
格式化:按照指定格式进行美化
-
发布:程序将会发布,并且所有开发项将变为不可更改项。
程序运行后,下面会有日志窗口出现。
也可以从执行历史中查看。
FlinkJar开发
可以通过dinky加载flink jar程序,并提交到flink相关的集群,避免了直接服务器启动。
加载资源
如果想使用jar程序,需要dinky平台能够识别jar包的路径,dinky注册中心有资源加载页面。
第一次启动dinky,需要先配置dinky的资源路径与实际地址的绑定关系。
具备三种方式,LOCAL/HDFS/OSS,最主要是LOCAL与HDFS,这里测试使用的是LOCAL的模式。将服务器实际目录路径绑定到这里。
并将要上传的jar文件放到该目录下。
前往注册中心>资源。第一次需要同步目录结构
数据开发
前往数据开发页面,在目录下创建作业,并选择jar的方式。
选择框可以直接进行编写,请注意选中后是一个rs:/前缀的地址,是dinky自定义的一个路径前缀。
这里选择一个Flink自带范例的一个单词统计的程序进行测试
后续就可前往运维中心中进行查看。
需要注意的是:Dinky现不支持SpringBoot项目,会产生版本冲突问题,导致启动不成功,如果想启动项目需要注册项目为maven项目。
全局变量引用
点击进入注册中心-全局变量
点击新建,创建一个变量
重点配置项:
-
变量名称:最好为英文名称
-
变量值:按需填写
运维中心
在上一步把作业已提交完毕并运行后,如果想查看作业运行的详细信息,可以前往运维中心中查看。
点击查看详情
最重要的是查看作业日志,分为三栏:Excepion、JobManager、TaskManager。
除此之外,页面还提供了Flink WebUI的跳转按钮,可以直接进入到页面。
进入之后,首先可以查看概览,