Flink 系列之十二 - Data Stream API的输出算子
之前做过数据平台,对于实时数据采集,使用了Flink。现在想想,在数据开发平台中,Flink的身影几乎无处不在,由于之前是边用边学,总体有点混乱,借此空隙,整理一下Flink的内容,算是一个知识积累,同时也分享给大家。
注意:由于框架不同版本改造会有些使用的不同,因此本次系列中使用基本框架是 Flink-1.19.x,Flink支持多种语言,这里的所有代码都是使用java,JDK版本使用的是19。
代码参考:https://github.com/forever1986/flink-study.git
目录
- 1 输出算子(Sink)
- 1.1 什么是输出算子
- 1.2 Flink中Sink的实现方式
- 2 Sink的演示
- 2.1 File Sink
- 2.2 JDBC Sink
- 2.3 Kafka Sink
- 2.4 Redis Sink
前面几章讲了执行环境、输入算子,中间计算算子,本章开始来讲Data Stream API的输出算子,这样就基本了解了Data Stream API的整个流程。
1 输出算子(Sink)
1.1 什么是输出算子
输出算子:顾名思义就是将数据流输出到外部的存储。跟输入算子一样,输出算子同样包括文件、kafka、数据库等等,包括之前一直在使用的print算子,其实它也是一个输出算子。这一部分Flink也是将其独立到Connector模块提供很好的规范和扩展性,其中Flink内置很多源算子,也支持扩展自定义的源算子。这里通过《Flink官方网站文档》可以实时看到已经支持哪些输出算子。下图为官方截图,到目前为止支持的输出算子,其中有sink字样的就是支持该外部系统输出:
除了上面的Flink本身支持的算子之外,还可以在bahir网站看看其他人参照Flink的Sink的扩展规范实现的外部输出算子Connector:
1.2 Flink中Sink的实现方式
Flink中Sink的实现方法有过几次比较大的改动,且更新得比较快,导致之前使用旧方法实现的Connector还没有更新使用最新版本,但是由于Flink还是兼容上一代的版本,因此在使用不同Connector时,会看到有不同风格的实现方式。目前来说,实现Sink有以下几种方式:
Sink版本 | 描述 |
---|---|
SinkFunction | 最早的版本,现在基本上弃用,大部分的connector都不是使用这种方式,当然有些简单的还是可以使用,比如Redis。这个是一个接口,实现方式也比较简单,在网络上搜索自定义Sink基本上都是基于它来的。通过addSink方法加入到Stream中。 |
SinkV1版本 | 这个版本是在Flink1.12引入的,是最早使用较为完善的写入机制,接口风格也是大变,支持精准一次的两段协议提交,因此对于有事务的外部Sink,基于SinkV1版本实现的基本上能够达到生产级别的要求。通过addSink方法加入到Stream中。 |
SinkV2版本 | 这个版本是在Flink1.12引入的,是为了解决SinkV1版本灵活度不够,无法合并小文件等问题,因此推出了SinkV2版本,改进了这些问题,风格也算是大变。通过sinkTo方法加入到Stream中。 |
SinkV2 plus版本 | 这个版本主要是对SinkV2版本做了一些小的风格上的改动,提高其使用灵活度,因此风格上改动并不大,这是部分类做了修改。通过sinkTo方法加入到Stream中。 |
小技巧:通过通过addSink方法加入到Stream中都是SinkFunction和SinkV1版本;通过sinkTo方法加入到Stream中,都是SinkV2版本或者SinkV2 plus版本。而且你还会发现一个特点,就是Flink内置的Sink并且使用度越高就会越使用最新版本,其次就是使用性次之,再者就是第三方实现的Connector版本会越低。
但是记住,最新版本可以支持的功能更为强大,并且也是为了解决原先的问题,这一块在下一章Sink输出算子底层原理再讲。这里说这个,就是为了让大家知道下面的Connector演示,存在不同编码方式,这就是因为某些Connector还没有改到最新版本,沿用老版本所致。
2 Sink的演示
接下来会讲一些常见的Flink内置的输出算子和外部实现的输出算子,然后再了解底层的原理。
以下代码参考lesson06子模块
1)新建lesson06子模块,其pom引入如下:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><scope>provided</scope></dependency><!-- 可以启动本地的Web-UI控制台插件--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><scope>provided</scope></dependency>
</dependencies>
2.1 File Sink
File Sink是可以将数据输出到文件中,其jar并未包含在Flink基础jar包中,因此需要引入格外的Connector。
示例说明:使用一个自动生成Source算子生成字符串数据,并写入到某个文件夹中。
FileSink是基于SinkV2 plus版本实现的Sink
1)在lesson06子模块的pom文件中引入flink的flink-connector-files依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId>
</dependency>
2)FileSinkDemo类:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.time.Duration;
import java.time.ZoneId;public class FileSinkDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 需要设置checkpoint,输出的文件才会进入finished,不然只会存在 in-progress 或者 pending 状态,也就是没法变成.log结尾的文件env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);env.setParallelism(2);// 2 读取数据- 来自自定义的GeneratorFunction,我们这里设置每秒生成100个字符串DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) value -> "value" + value,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100), // 每秒钟生成100条数据Types.STRING);DataStreamSource<String> source = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");// 3. 输出:FileSink<String> fileSink = FileSink// 按行输出:第一个参数是输出路径,第二个参数是数据序列化,这里由于是字符串,所以使用Flink提供的SimpleStringEncoder.<String>forRowFormat(new Path("E:/logs"), new SimpleStringEncoder<>())// 这是批量输出,这里就不演示
// .forBulkFormat()// 配置文件分目录,如果不设置,则按照每小时分一个目录,可以通过forRowFormat方法进去看看使用的是DateTimeBucketAssigner默认构建方法.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH-mm", ZoneId.systemDefault()))// 配置生成的文件的名称,可以配置前缀和后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("flink-").withPartSuffix(".log").build())// 配置文件滚动策略,这里设置10秒或者超过5KB就更新文件.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofSeconds(10)).withMaxPartSize(new MemorySize(5 * 1024)).build()).build();source.sinkTo(fileSink);// 执行env.execute();}
}
输出:
知识点:本示例需要关注5个点
1)看文件夹中生成的文件,是按照代码中设置的滚动方式,一分钟生成一个
2)看到生成的文件是以flink-开头,.log结尾,符合示例的配置
3)同时也看到文件有2个后缀非.log的,这是表示正在写入,之所以是2个是因为示例设置了并行度是2。
4)如果刷新频繁观察,会发现有时候后缀非.log的文件不止2个,这是因为还没到达checkpoint的时间点,只有checkpoint之后才会该名称,这就是为什么要设置checkpoint的原因(因为不开启checkpoint,文件会处于在 in-progress 或者 pending 状态,也就是没法变成.log结尾的文件)
5)示例中设置的滚动每个文件的策略是每10秒或者5KB滚动一次,两者达成其一即会写入新的文件
2.2 JDBC Sink
Flink常见的Sink还有JDBC。这里演示如何将数据写入到MySQL数据库。
示例说明:从socket中获取数据,并转换为一个Tuple3,在将其插入到MySQL数据库中
JDBCSink保留了2个版本分别是:SinkV1版本和SinkV2版本。下面demo使用SinkV2版本来实现。
前期准备:准备一个flink_test数据库,并创建如下表:
CREATE TABLE flink_test.log_table (server_id varchar(100) NOT NULL,cpu_value DOUBLE NULL,collect_time BIGINT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
1)在flink-study父项目pom文件中的dependencyManagement引入flink的flink-connector-jdbc以及mysql-connector的版本定义:
<!-- 引入flink的jdbc连接器 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><!-- 版本是3.2.0-1.19 --><version>${flink-connector-jdbc.version}</version>
</dependency>
<!-- 引入mysql的驱动 -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!-- 版本是8.0.30--><version>${mysql-connector-java.version}</version>
</dependency>
2)在lesson06子模块中的pom引入flink的flink-connector-jdbc以及mysql-connector:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId>
</dependency>
3)JDBCSinkDemo 类
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.sink.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class JDBCSinkDemo2 {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据-来自socketDataStreamSource<String> source = env.socketTextStream("localhost", 9999);// 3. 将数据转换为3元组SingleOutputStreamOperator<Tuple3<String, Double, Long>> map = source.map(new Tuple3MapFunction());// 4. 写入到mysql数据库:注意这里JdbcSink.sink()有2个方法,一个是基于SinkV1版本,一个是使用SinkV2版本。这里使用SinkV2版本JdbcSink<Tuple3<String, Double, Long>> jdbcSink = JdbcSink.<Tuple3<String, Double, Long>>builder().withQueryStatement("insert into log_table values(?,?,?)",(JdbcStatementBuilder<Tuple3<String, Double, Long>>) (preparedStatement, value) -> {preparedStatement.setString(1, value.f0);preparedStatement.setDouble(2, value.f1);preparedStatement.setLong(3, value.f2);}).withExecutionOptions(JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(10) // 每次插入的批次数量.withBatchIntervalMs(1000) // 批次插入间隔时间.build()).buildAtLeastOnce(new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()// 数据库连接.withUrl("jdbc:mysql://127.0.0.1:3306/flink_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true").withUsername("root") // 账号.withPassword("root") // 密码.build());// sinkTo就是SinkV2版本map.sinkTo(jdbcSink);// 执行env.execute();}public static class Tuple3MapFunction implements MapFunction<String, Tuple3<String,Double,Long>> {@Overridepublic Tuple3<String, Double, Long> map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");long value3 = 0;if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}if(values.length >= 3){try {value3 = Long.parseLong(values[2]);}catch (Exception ignored){}}return new Tuple3<>(value1,value2,value3);}}
}
4)输入:
5)查看数据库的记录
知识点:示例结果可以看到数据成功地插入到数据库中。但这里还有很多细节,可以参考《官方文档》
有一个知识点要讲一下,关于精准一次的实现:
1)JdbcSink提供了exactlyOnceSink()方法可以实现精准一次(也就是一条数据精准插入一条数据)
2)使用精准一次,那么需要XADataSource事务,因此需要使用XADataSource
3)配置中还需要设置:withTransactionPerConnection(true)
4)对于的数据库,比如MySQL需要能够支持XA事务(目前大部分关系型数据库都支持)
2.3 Kafka Sink
做大数据的也一样离不开Kafka,在前面的源算子中就演示过从Kafka订阅消费数据,这里是反过来,Flink作为消息发送方,往Kafka发送数据。但是这里要演示的是精准一次,在实际业务中,有时候做数据同步,那么往往都是要求精准一次。
示例说明:从socket中获取数据,然后将数据发送到Kafka中
KafkaSink是基于SinkV2版本实现的Sink
1)在lesson06子模块中pom引入flink-connector-kafka依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId>
</dependency>
2)KafkaSinkDemo 类:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaSinkDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 需要设置checkpoint(精准一次必须设置)env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);env.setParallelism(1);// 2. 读取数据-来自socketDataStreamSource<String> source = env.socketTextStream("localhost", 9999);// 3. 输出:KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// kafka的地址.setBootstrapServers("clickhouse1:9092,clickhouse2:9092,clickhouse3:9092")// 配置序列化类和topic.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("mytopic").setValueSerializationSchema(new SimpleStringSchema()).build())// 指定事务前缀(精准一次必须设置).setTransactionalIdPrefix("flink-")// 指定使用精准一次(精准一次必须设置).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 指定事务超时时间(精准一次必须设置):该时间必须大于checkpoint时间.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, ((Integer)(3 * 60 * 1000)).toString()).build();source.sinkTo(kafkaSink);// 执行env.execute();}
}
知识点:示例中需要注意Kafka的精准一次需要配置以下内容:
1)开启checkpoint
2)指定事务前缀
3)设置为DeliveryGuarantee.EXACTLY_ONCE
4)设置事务超时时间,必须大于checkpoint时间
2.4 Redis Sink
不论是前面的源算子,还是现在输出算子都是Flink内置的算子,还没有演示过外部第三方实现的Connector,这里以一个Redis为例子,来演示如何往Redis中输入数据。在实际的场景中,比如数据库的数据会缓存放到Redis中,但是由于数据库和redis是两个不同存储,无法使用事务方式同时写入数据,导致有时候Redis并没有写入成功;因此可以采用监听binlog日志然后写入到redis中,这时候Flink就可以实现这种方式。当然这里只是演示如何写入Redis,至于如何读取数据库的binlog,这一块在《系列之六 - Data Stream API的源算子》中讲过MongoDB CDC的方式,同理也可以使用MySQL CDC,感兴趣的朋友可以去看看。
示例说明:假设是一个监听来自服务器cpu的日志,日志格式是“服务器id,cpu,时间”,示例中先使用map算子转换为Tuple2类型,再将其服务器id作为key,cpu作为值,放进Redis的String类型中,每次都是最新的cpu值,这样就可以缓存最新cpu值。
RedisSink是基于SinkFunction版本实现的Sink
1)在flink-study的pom中dependencyManagement引入flink-connector-redis版本管理
<!-- 引入bahir的redis连接器 -->
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><!-- 版本是1.0--><version>${mysql-connector-redis.version}</version>
</dependency>
2)在lesson06子模块中,引入flink-connector-redis
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId>
</dependency>
3)RedisSinkDemo类:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisSinkDemo {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据-来自socketDataStreamSource<String> source = env.socketTextStream("localhost", 9999);SingleOutputStreamOperator<Tuple2<String, Double>> map = source.map(new Tuple2MapFunction());// 3. 输出到redis中RedisSink<Tuple2<String, Double>> redisSink = new RedisSink<>(// 这是配置Redis的连接信息new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(),// 这是配置数据格式,我们自己实现一个类RedisExampleMappernew RedisExampleMapper(RedisCommand.SET));// 这里使用addSink,而非sinkTo,是因为目前RedisSink还是返回SinkFunction,没有改造成Sinkmap.addSink(redisSink);// 执行env.execute();}public static class RedisExampleMapper implements RedisMapper<Tuple2<String, Double>> {// 执行Redis的命令,我们从外面传入RedisCommand.SET,意味着我们使用String格式存储private RedisCommand redisCommand;public RedisExampleMapper(RedisCommand redisCommand){this.redisCommand = redisCommand;}// 设置命令,也是定义类型@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(redisCommand);}// 设置key@Overridepublic String getKeyFromData(Tuple2<String, Double> data) {return "flink-"+data.f0;}// 设置value@Overridepublic String getValueFromData(Tuple2<String, Double> data) {return data.f1.toString();}}public static class Tuple2MapFunction implements MapFunction<String, Tuple2<String,Double>> {@Overridepublic Tuple2<String, Double> map(String value) throws Exception {String[] values = value.split(",");String value1 = values[0];double value2 = Double.parseDouble("0");if(values.length >= 2){try {value2 = Double.parseDouble(values[1]);}catch (Exception e){value2 = Double.parseDouble("0");}}return new Tuple2<>(value1,value2);}}
}
4)当在socket中输入2个服务器的信息
可以看到Redis中的数据
5)再输入第三条数据,key是server1的
会发现Redis中覆盖了value值
结语:本章先是讲了关于Flink的Sink演变版本,然后又演示了一些常见的Sink输出算子。但是对于深入了解关于输出算子Sink还不够,下一章将通过了解输出算子的底层原理并自定义自己的输出算子来更深入了解Flink的输出算子Sink。