flink sql 实战实例 及延伸问题:聚合/数据倾斜/DAU/Hive流批一体 等

flink sql 实战实例 及延伸问题

  • Flink SQL 计算用户分布
  • Flink SQL 计算 DAU
  • 多topic 数据更新mysql topic接入mysql
    • 引入 upsert-kafka-connector 以1.14.4版本为例
  • 数据倾斜问题:
  • 让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?
  • 多维高阶聚合
  • FlinkSql Upsert 与 Primary Key
  • flinksql Hive 流批一体
    • Streaming Sink
    • Streaming Source
    • Hive Dialect
    • Filesystem Connector

Flink SQL 计算用户分布

⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。

- 如果需要可以打开 minibatch
select  level, count(1) as uv, max(time) as time
from (select uid, level, time, row_number() over (partition by uid order by time desc) rn from source
) tmp
where rn =1 
group by level

Flink SQL 计算 DAU

⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。

SELECT  window_start, window_end, platform, sum(bucket_dau) as dau
from (SELECTwindow_start, window_end, platform, count(distinct uid) as bucket_dauFROM TABLE(CUMULATE(TABLE user_log,DESCRIPTOR(time),INTERVAL '60' SECOND, INTERVAL '1' DAY))GROUP BY                                  window_start, window_end, platform, MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by   window_start, window_end, platform优点:如果是曲线图的需求,可以完美回溯曲线图。
缺点:大窗口之间如果有数据乱序,有丢数风险;并且由于是 watermark 推动产出,所以数据产出会有延迟。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
select platform, count(1) as dau, max(time) as time
from (select uid, platform, time, row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rnfrom source
) tmp
where rn = 1
group byplatform优点:计算快。缺点:任务发生 failover,曲线图不能很好回溯。没法支持 cube 计算。或或或或或或或或或或或或或或或或或或或或或或或
-- 如果需要可以打开 minibatch
SELECT   max(time) as time, platform, sum(bucket_dau) as dau
from (SELECTmax(time) as time, platform, count(distinct uid) as bucket_dauFROM sourceGROUP BYplatform, MOD(HASH_CODE(user_id), 1024)
) t 
GROUP by   platform优点:计算快,支持 cube 计算。
缺点:任务发生 failover,曲线图不能很好回溯。

多topic 数据更新mysql topic接入mysql

-- 作业开发逻辑-- mysql -h数据库 -ubigdata_rw -pe20ycoy3yp09qij0kj8ngpcgxyywgmc9-- -Dyarn.application.queue=stream_data   -Dyarn.provided.lib.dirs=/streamx/flink/flink-1.12.5/lib/
CREATE TABLE Direction_Wind_create_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_create_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_like_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_like_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_comment_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_comment_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_share_source (properties ROW< area VARCHAR,comment_count BIGINT,share_count BIGINT,like_count BIGINT,user_id  VARCHAR,app_id BIGINT,feed_type BIGINT ,feed_id VARCHAR,`timestamp` VARCHAR >
,    proctime AS PROCTIME()) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'Direction_Wind_share_feed','connector.properties.bootstrap.servers'='Direction_Wind-hadoop-hdp-kafka-01:9092,Direction_Wind-hadoop-hdp-kafka-02:9092,Direction_Wind-hadoop-hdp-kafka-03:9092','connector.startup-mode' = 'latest-offset','update-mode' = 'append','format.type' = 'json','connector.properties.group.id' = 'dynamicRelease_bigdata_Direction_Wind_sql_test','format.derive-schema' = 'true');CREATE TABLE Direction_Wind_create_sink (feed_id VARCHAR,user_id VARCHAR,feed_type VARCHAR,time_stamp timestamp(3),area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_create_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.user_id as user_id,cast(properties.feed_type as varchar) as feed_type--  ,TO_DATE(properties.`timestamp`) as time_stamp,TO_TIMESTAMP(FROM_UNIXTIME(cast(properties.`timestamp` as bigint))) as time_stamp,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_create_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_like_sink (feed_id VARCHAR,like_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_like_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.like_count  as like_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_like_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_comment_sink (feed_id VARCHAR,comment_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_comment_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.comment_count  as comment_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_comment_source
) where rowNum = 1
;CREATE TABLE Direction_Wind_share_sink (feed_id VARCHAR,use_count BIGINT,feed_type VARCHAR,area VARCHAR, PRIMARY KEY (feed_id) NOT ENFORCED) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://数据库:3306/bireport?useUnicode=true&characterEncoding=UTF-8&useSSL=false','connector.用户' = 'bigdata_all','connector.pass@word' = '密码','connector.table' = 'Direction_Wind_dynamic_test','connector.write.flush.max-rows' = '1','connector.write.flush.interval' = '2s'--  'connector' = 'print');INSERT INTO Direction_Wind_share_sink
select * from (
SELECT  properties.feed_id as feed_id,properties.share_count  as   use_count,cast(properties.feed_type as varchar) as feed_type,properties.area  as area,ROW_NUMBER() OVER (PARTITION BY properties.feed_id ORDER BY properties.`timestamp` asc) as rowNum
FROM    Direction_Wind_share_source
) where rowNum = 1
;
这里要注意,如果去重直接用group by的方式,在批处理中还好,流式处理中,这部分数据会存放到内容中,并且越积越大,没有ttl,时间一长就会oom了,
Flink SQL Deduplicate 写法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只会在第一条数据来的时候更新 state,后续访问不会更新 state TTL,因此 state 会在用户设置的 state TTL 时间之后过期。  所以按理说 这种去重方式 不会百分百 有用,只能保持一段时间的 去重,感觉是不对的,正在测试中。

经过测试 在flink 1.12 版本时,flinksql的upsert into 功能 ,也就是 这种写法
在这里插入图片描述
是可以实现 update 功能的,但 必须要 group by 数据才行,并且要求把 把所有select 语句中的字段 都加入到 group by 语句,但这么写,又会导致 state 不停增大,过一段时间就会 OOM

引入 upsert-kafka-connector 以1.14.4版本为例

基本工作机制:

  • source:

在这里插入图片描述

  • sink:
    在这里插入图片描述
tenv.executeSql("CREATE TABLE upsert_kafka ("+ "province STRING, "+ "pv BIGINT, "+ "PRIMARY KEY (province) NOT ENFORCED"+ ") WITH ("+ "'connector' = 'upsert-kafka', "+ "'topic' = 'upsert_kafka2', "+ "'properties.bootstrap.servers' = 'doitedu:9092', "+ "'key.format' = 'csv', "+ "'value.format' = 'csv'"+ ")"
);DataStreamSource<Row> stream = env.fromElements(Row.ofKind(RowKind.INSERT, "sx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 1),Row.ofKind(RowKind.INSERT, "sx", 2),Row.ofKind(RowKind.INSERT, "gx", 2)
);tenv.createTemporaryView("s", stream, Schema.newBuilder().column("f0", DataTypes.STRING().notNull()).column("f1", DataTypes.INT()).build());
// 将查询结果(changelog 流),写入 kafka
tenv.executeSql("insert into upsert_kafka select f0, sum(f1) as pv from s group by f0");

写入的数据为
在这里插入图片描述

// 从 kafka 再读出上面的 changelog 结果 tenv.executeSql(" select * from upsert_kafka").print();
tenv.executeSql("select * from upsert_kafka").print();

读出的数据为
在这里插入图片描述

数据倾斜问题:

⭐ 场景:拿计算直播间的同时在线观看用户数来说,大 v 直播间的人数会比小直播间的任务多几个量级,因此如果计算一个直播间的数据需要注意这种业务数据倾斜的特点
⭐ 解决方案:计算这种数据时,我们可以先按照直播间 id 将数据进行打散,如下 SQL 案例所示(DataStream 也是相同的解决方案),内层打散,外层合并:

select id, sum(bucket_uv) as uv
from (select id, count(distinct uid) as bucket_uv from source group byid, mod(uid, 1000) -- 将大 v 分桶打散
)
group by id

⭐ 数据任务处理时参数\代码处理逻辑导致倾斜:
⭐ 场景:比如有时候虽然用户已经按照 key 进行分桶计算,但是【最大并发度】设置为 150,【并发度】设置为 100,会导致 keygroup 在 sub-task 的划分不均匀(其中 50 个 sub-task 的 keygroup 为 2 个,剩下的 50 个 sub-task 为 1 个)导致数据倾斜。
⭐ 解决方案:设置合理的【最大并发度】【并发度】,【最大并发度】最好为【并发度】的倍数关系,比如【最大并发度】1024,【并发度】512
⭐ 我已经设置【数据分桶打散】+【最大并发为并发 n 倍】,为啥还出现数据倾斜?
⭐ 场景:你的【数据分桶】和【最大并发数】之间可能是不均匀的。因为 Flink 会将 keyby 的 key 拿到之后计算 hash 值,然后根据 hash 值去决定发送到那个 sub-task 去计算。这是就有可能出现你的【数据分桶】key 经过 hash 计算完成之后,并不能均匀的发到所有的 keygroup 中。比如【最大并发数】4096,【数据分桶】key 只有 1024 个,那么这些数据必然最多只能到 1024 个 keygroup 中,有可能还少于 1024,从而导致剩下的 3072 个 keygroup 没有任何数据
⭐ 解决方案:其实可以利用【数据分桶】key 和【最大并行度】两个参数,在 keyby 中实现和 Flink key hash 选择 keygroup 的算法一致的算法,在【最大并发数】4096,【数据分桶】为 4096 时,做到分桶值为 1 的数据一定会发送到 keygroup1 中,2 一定会发到 keygroup2 中,从而缓解数据倾斜。

最大并行度的设置:最大并行度可以自己设置,也可以框架默认生成;默认的算法是取当前算子并行度的 1.5 倍和 2 的 7 次方比较,取两者之间的最大值,然后用上面的结果和 2 的 15 次方比较,取其中的最小值为默认的最大并行度,非常不建议自动生成,建议用户自己设置。

让你使用用户心跳日志(20s 上报一次)计算同时在线用户、DAU 指标,你怎么设计链路?

在这里插入图片描述
⭐ 有提到将用户上线标记为 1,下线标记为 0 的,然后将上线下线数据发到消息队列用实时计算引擎统计的

⭐ 有提到将用户心跳日志借助 Session Window Dynamic Gap 计算的

在这里插入图片描述

多维高阶聚合

在这里插入图片描述

FlinkSql Upsert 与 Primary Key

在flink1.11 及以后,flinksql 与blink 做了merge 所以有重大变更
流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比如 JDBC、HBase,当遇到复杂的 SQL 时,时常会出现:
在这里插入图片描述
UpsertStreamTableSink 需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。

如何发现 Primary Key?一个方法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。

这种情况下在简单 Query 当中很好,也满足语义,也非常自然。但是如果是一个复杂的 Query,比如聚合又 Join 再聚合,那就只有报错了。不能期待优化器有多智能,很多情况它都不能推断出 PK,而且,可能业务的 SQL 本身就不能推断出 PK,所以导致了这样的异常。

在这里插入图片描述
怎么解决问题?Flink 1.11 彻底的抛弃了这个机制,不再从 Query 来推断 PK 了,而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个Jdbc Sink 会按照对应的 Key 来进行更新。如此,就跟 Query 完全没有关系了,这样的设计可以定义得非常清晰,如何更新完全按照设置的定义来。

CREATE TABLE jdbc_table (id BIGINT,...PRIMARY KEY (id) NOT ENFORCED
)

flinksql Hive 流批一体

首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 使用调度工具来调度作业,比如作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的延迟。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时需要叠加起来
在这里插入图片描述
问题是慢,延迟大,并且 Ad-hoc 分析延迟也比较大,因为前面的数据入库,或者前面的调度的 ETL 会有很大的延迟。Ad-hoc 分析再快返回,看到的也是历史数据。

所以现在流行构建实时数仓,从 Kafka 读计算写入 Kafka,最后再输出到 BI DB,BI DB 提供实时的数据服务,可以实时查询。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。但是:

第一,基于 Queue,一般来说就是行存加 Queue,存储效率其实不高。
第二,基于预计算,最终会落到 BI DB,已经是聚合好的数据了,没有历史数据。而且 Kafka 存的一般来说都是 15 天以内的数据,没有历史数据,意味着无法进行 Ad-hoc 分析。所有的分析全是预定义好的,必须要起对应的实时作业,且写到 DB 中,这样才可用。对比来说,Hive 数仓的好处在于它可以进行 Ad-hoc 分析,想要什么结果,就可以随时得到什么结果。
在这里插入图片描述
能否结合离线数仓和实时数仓两者的优势,然后构建一个 Lambda 的架构?

核心问题在于成本过高。无论是维护成本、计算成本还是存储成本等都很高。并且两边的数据还要保持一致性,离线数仓写完 Hive 数仓、SQL,然后实时数仓也要写完相应 SQL,将造成大量的重复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁移、对数据等将带来大量人力成本。如今,实时分析会越来越多,不断的发生迁移,导致重复开发的成本也越来越高。少部分重要的作业尚可接受,如果是大量的作业,维护成本其实是非常大的。

如何既享受 Ad-hoc 的好处,又能实现实时化的优势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些探索和尝试,如下图所示。它能实时地按 Streaming 的方式来导出数据,写到 BI DB 中,并且这套系统也可以用分析计算框架来进行 Ad-hoc 的分析。这个图当中,最重要的就是 Flink Streaming 的导入。
在这里插入图片描述

Streaming Sink

早期 Flink 版本在 DataStreaming 层,已经有一个强大的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的系统,能实现一条数据不多,一条数据不少的 Sink。
在这里插入图片描述
具体原理是基于两阶段提交:

第一阶段:SnapshotPerTask,关闭需要 Commit 的文件,或者记录正在写的文件的 Offset。
第二阶段:NotifyCheckpointComplete,Rename 需要 Commit 的文件。注意,Rename 是一个原子且幂等的操作,所以只要保证 Rename 的 At-least-once,即可保证数据的 Exactly-once。

这样一个 File system 的 Writer 看似比较完美了。但是在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来通知 Hive Metastore 呢?
在这里插入图片描述
SQL 层在 StreamingFileSink,扩展了 Partition 的 Committer。

相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而后面再接了一个节点 PartitionCommitter。支持的 Commit Policy 有:

  • 内置支持 Add partition 到 Hive metastore;
  • 支持写 SuccessFile 到文件系统当中;
  • 并且也可以自定义 Committer,比如可以 analysis partition、合并 partition 里面的小文件。

Committer 挂在 Writer 后, 由 Commit Trigger 决定什么时机来 commit :

  • 默认的 commit 时机是,有文件就立即 commit。因为所有 commit 都是可重入的,所以这一点是可允许的。

  • 另外,也支持通过 partition 时间和 Watermark 来共同决定的。比如小时分区,如果现在时间到 11 点,10 点的分区就可以 commit 了。Watermark 保证了作业当前的准确性。

Streaming Source

Hive 数仓中存在大量的 ETL 任务,这些任务往往是通过调度工具来周期性的运行,这样做主要有两个问题:

  • 实时性不强,往往调度最小也是小时级。
  • 流程复杂,组件多,容易出现问题。

针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,支持:

  • Partition 表,监控 Partition 的生成,增量读取新的 Partition。
  • 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。

甚至可以使用 10 分钟级别的分区策略,使用 Flink 的 Hive streaming source 和 Hive streaming sink ,可以大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也支持针对 Table 全量的 Ad-hoc 查询,提高灵活性。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;/*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.partition.include' = 'latest','streaming-source.partition-order' = 'create-time','streaming-source.monitor-interval' = '1 h') */

另外除了 Scan 的读取方式,Flink 1.11 也支持了 Temporal Join 的方式,也就是以前常说的 Streaming Dim Join。

SELECTo.amout, o.currency, r.rate, o.amount * r.rate
FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency = o.currency

目前支持的方式是 Cache All,并且是不感知分区的,比较适合小表的情况。

Hive Dialect

Flink SQL 遵循的是 ANSI-SQL 的标准,而 Hive SQL 有它自己的 HQL 语法,它们之间的语法、语义都有些许不同。

如何让 Hive 用户迁移到 Flink 生态中,同时避免用户太大的学习成本?为此, Flink SQL 1.11 提供了 Hive Dialect,可以使得用户在 Flink 生态中使用 HQL 语言来计算。目前只支持 DDL,后续版本会逐步攻坚 Qeuries。

Filesystem Connector

Hive Integration 提供了一个重量级的集成,功能丰富,但是环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?

Flink table 在长久以来只支持一个 CSV 的 Filesystem Table,并且还不支持 Partition,行为上在某些方面也有些不符合大数据计算的直觉。

Flink 1.11 重构了整个 Filesystem connector 的实现:

  • 结合 Partition,现在,Filesystem connector 支持 SQL 中 Partition 的所有语义,支持 Partition 的 DDL,支持 Partition Pruning,支持静态 / 动态 Partition 的插入,支持 Overwrite 的插入。
  • 支持各种 Formats: ■ CSV ■ JSON ■ Aparch AVRO ■ Apache Parquet ■ Apache ORC
  • 支持 Batch 的读写。
  • 支持 Streaming sink,也支持 Partition commit,支持写 Success 文件。

用几句简单的 SQL,不用搭建 Hive 集成环境即可:

  • 启动一个流作业写入 Filesystem 中,然后在 Hive 端即可查询到 Filesystem 上的数据,相比之前 Datastream 的作业,简单 SQL 即可搞定离线数据的入库。
  • 通过 Filesystem Connector 来查询 Hive 数仓中的数据,功能没有 Hive 集成那么全,但是定义简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1113354.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

Linux离线安装插件

当公司Linux环境无外网情况下&#xff0c;需要先下载好离线安装包&#xff0c;然后上传到服务器&#xff0c;进行安装。 这里介绍一个下载插件安装包的网站&#xff0c;可以搜索到lrzsz、lsof、telnet、unzip、zip等安装包 搜索到想要的插件安装包后&#xff0c;下载并上传到服…

集合可视化:rainbow box与欧拉图

论文&#xff1a;A new diagram for amino acids: User study comparing rainbow boxes to Venn/Euler diagram 最近偶然看到了这篇论文&#xff0c;觉得很有意思&#xff0c;针对的任务是集合数据的可视化。 我们用示例来说明&#xff0c;比如图二的欧拉图&#xff0c;展示的…

备考2025年AMC8数学竞赛:2000-2024年AMC8真题练一练

我们今天来随机看五道AMC8的真题和解析&#xff0c;对于想了解或者加AMC8美国数学竞赛的孩子来说&#xff0c;吃透AMC8历年真题是备考最科学、最有效的方法之一。 为帮助孩子们更高效地备考&#xff0c;我整理了2000-2004年的全部AMC8真题&#xff0c;并且独家制作了多种在线练…

【蝶变跃升】壹起来|就业辅导系列活动——职业生涯规划和模拟面试

为使困难家庭更深层次了解自己就业现状&#xff0c;明确就业方向&#xff0c;同时提升在面试时的各类技巧。2024年2月17日&#xff0c;由平湖市民政局主办、平湖吾悦广场和上海聘也科技有限公司协办、平湖市壹起来公益发展中心承办的“蝶变跃升”就业辅导系列——职业生涯规划和…

探究网络工具nc(netcat)的使用方法及安装步骤

目录 &#x1f436;1. 什么是nc&#xff08;netcat&#xff09;&#xff1f; &#x1f436;2. nc&#xff08;netcat&#xff09;的基本使用方法 2.1 &#x1f959;使用 nc 进行端口监听 2.2 &#x1f959;使用 nc 进行端口扫描 2.3 &#x1f959;使用 Netcat 进行文件传输…

C#知识点-14(索引器、foreach的循环原理、泛型、委托)

索引器 概念&#xff1a;索引器能够让我们的对象&#xff0c;以索引&#xff08;下标&#xff09;的形式&#xff0c;便捷地访问类中的集合&#xff08;数组、泛型集合、键值对&#xff09; 应用场景&#xff1a; 1、能够便捷地访问类中的集合 2、索引的数据类型、个数、顺序不…

IDEA 2021.3激活

1、打开idea&#xff0c;在设置中查找Settings/Preferences… -> Plugins 内手动添加第三方插件仓库地址&#xff1a;https://plugins.zhile.io搜索&#xff1a;IDE Eval Reset 插件进行安装。应用和使用&#xff0c;如图

harmony 鸿蒙安全和高效的使用N-API开发Native模块

简介 N-API 是 Node.js Addon Programming Interface 的缩写&#xff0c;是 Node.js 提供的一组 C API&#xff0c;封装了V8 引擎的能力&#xff0c;用于编写 Node.js 的 Native 扩展模块。通过 N-API&#xff0c;开发者可以使用 C 编写高性能的 Node.js 模块&#xff0c;同时…

Linux篇:开发工具yum/vim/gcc/g++/Makefile/gdb

一. yum&#xff1a;软件包管理器 什么是软件包&#xff1f; 在Linux 下安装软件 , 一个通常的办法是下载到程序的源代码 , 并进行编译 , 得到可执行程序 . 但是这样太麻烦了, 于是有些人把一些常用的软件提前编译好 , 做成软件包 (可以理解成windows 上的安装程序) 放在…

【Vuforia+Unity】AR05-实物3D模型识别功能实现

对于3D物体的识别&#xff0c;可以是虚拟的也可以是实物的&#xff0c;但是对于虚拟的三维模型意义不大&#xff0c;我们完全可以把三维模型放在屏幕上截一张图&#xff0c;以图片识别的方式召唤数字内容&#xff0c;不过在虚拟现实中或许有用。 因此本文探讨的技术路线主要是…

网络安全“三保一评”深度解析

“没有网络安全就没有国家安全”。近几年&#xff0c;我国法律法规陆续发布实施&#xff0c;为承载我国国计民生的重要网络信息系统的安全提供了法律保障&#xff0c;正在实施的“3保1评”为我国重要网络信息系统的安全构筑了四道防线。 什么是“3保1评”&#xff1f; 等保、分…

Java并发基础:原子类之AtomicBoolean全面解析

本文概要 AtomicBoolean类优点在于能够确保布尔值在多线程环境下的原子性操作&#xff0c;避免了繁琐的同步措施&#xff0c;它提供了高效的非阻塞算法实现&#xff0c;可以大大提成程序的并发性能&#xff0c;AtomicBoolean的API设计非常简单易用。 AtomicBoolean核心概念 …

STM32 TIM2重映射

STM32定时器 文章目录 STM32定时器[TOC](文章目录) 前言一、问题分析二、代码 前言 最近想弄一个多路输出PWM&#xff0c;但是发现TIM2不能用&#xff0c;根据手册也对它进行重映射了&#xff0c;但是还是不能用&#xff0c;用示波器发现驱动能力比较弱&#xff0c;然后禁用jt…

jvm垃圾收集器-三色标记算法

1.对象已死吗? 在堆里面存放着Java世界中几乎所有的对象实例&#xff0c;垃圾收集器在对堆进行回收前&#xff0c;第一件事情就是要确定这些对象之中哪些还“存活”着&#xff0c;哪些已经“死去”&#xff08;即不可能再被任何途径使用的对象). 引计数法 引用计数算法是一…

打造个性化电子画册,提升品牌魅力

​个性化电子画册可以根据不同的用户群体&#xff0c;提供不同的内容。企业可以根据目标客户的特点&#xff0c;为他们定制不同的内容&#xff0c;如产品介绍、品牌故事、企业文化等。这样不仅可以吸引更多的用户关注&#xff0c;还可以增强用户对品牌的信任度。 但是怎么制作电…

【Linux基础】vim、常用指令、组管理和组权限

Linux基础 1、目录结构2、vi和vim3、常用指令运行级别找回密码帮助指令时间日期指令搜索查找文件目录操作磁盘管理指令压缩和解压缩 4、组管理和组权限用户操作指令权限 1、目录结构 Linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构中的最上层是根目录“/”&a…

【设计模式】01-装饰器模式Decorator

作用&#xff1a;在不修改对象外观和功能的情况下添加或者删除对象功能&#xff0c;即给一个对象动态附加职能 装饰器模式主要包含以下角色。 抽象构件&#xff08;Component&#xff09;角色&#xff1a;定义一个抽象接口以规范准备接收附加责任的对象。具体构件&#xff08…

qt-OPENGL-星系仿真

qt-OPENGL-星系仿真 一、演示效果二、核心程序三、下载链接 一、演示效果 二、核心程序 #include "model.h"Model::Model(QOpenGLWidget *_glWidget) { glWidget _glWidget;glWidget->makeCurrent();initializeOpenGLFunctions(); }Model::~Model() {destroyV…

hung task, soft lockup, hard lockup, workqueue stall

hung task&#xff0c;soft lockup&#xff0c;hard lockup&#xff0c;workqueue stall 是 linux 内核中的异常检测机制&#xff0c;这 4 个检测均是通过时间维度上的检测来判断异常。 在时间维度上的检测机制&#xff0c;有两个核心的点&#xff1a; &#xff08;1&#xff…

【ubuntu2004安装N卡驱动】

软硬件环境 硬件&#xff1a;联想notebook16&#xff0c;显卡4060laptop 软件&#xff1a; ubuntu20.04 驱动安装成功的版本&#xff1a;NVIDIA-Linux-x86_64-535.146.02.run 使用默认的驱动安装&#xff0c;没用原因如下 让手动安装。 手动安装 环境准备&#xff1a; sudo …