当前位置: 首页 > news >正文

Flink读取Kafka写入Paimon

Flink SQL

-- 1)注册 Paimon 源
CREATE CATALOG paimon_hive
WITH('type' = 'paimon','warehouse' = 'hdfs://xxxxx/paimon','metastore' = 'hive','hive-conf-dir' = '/xxxxx/conf','uri' = 'thrift://域名1:9083,thrift://域名2:9083');-- 2)声明 Kafka 源
create table kafkaSource (`_timestamp` string,`name` string,`age` string,`id` string
) with ('connector' = 'kafka','format' = 'json','topic' = 'topic1234','properties.bootstrap.servers' = '你的Kafka Brokers','properties.group.id' = 'kafka-to-paimon','scan.startup.mode' = 'latest-offset'
);-- 3)读取+写入Paimon
INSERT INTO paimon_hive.paimon.odm_kafka_log
SELECTname AS `name`,age AS `age`,id AS `id`FROM_UNIXTIME(CAST(CAST(`_timestamp` AS BIGINT) / 1000 AS BIGINT), 'yyyyMMdd') AS `day`
FROM kafkaSource;

Flink Table (Java)

Maven依赖

<!-- 添加Flink依赖-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.15.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.0</version>
</dependency>
<!-- flink table相关类 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.15.0</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.15.0</version>
</dependency>
<!-- 添加Paimon依赖-->
<dependency><groupId>org.apache.paimon</groupId><artifactId>paimon-flink-1.15</artifactId><version>0.5.0-incubating</version>
</dependency>

Job类

package job;import com.google.protobuf.ByteString;
import function.GalaxyToPaimonFlatMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Properties;/*** @Author zhangjinke* @Create 2023/12/25 17:02* @Description 将银河PB格式日志写入到Paimon* @Wiki -* @Modifier -* @ModificationTime -* @Node -*/public class GalaxyToPaimonJob {private static final Logger LOG = LoggerFactory.getLogger(GalaxyToPaimonJob.class);private static final String GROUP_ID = "job.GalaxyToPaimonJob";public static void main(String[] args) {try {ParameterTool tool = ParameterTool.fromArgs(args);int source = tool.getInt("source");int flatmap = tool.getInt("flatmap");// Kafka consumerProperties galaxyPro = new Properties();properties.setProperty("bootstrap.servers", bootstrap_servers);properties.setProperty("group.id", groupId);// 自动检测topic分区变化时间间隔properties.put("flink.partition-discovery.interval-millis", "60000");properties.put("refresh.leader.backoff.ms", 6000);KafkaSource<ByteString> galaxyKafkaSource = KafkaSource.<ByteString>builder().setTopics(PropertyUtil.get("user_event_etl_topic")).setValueOnlyDeserializer(new ByteStringSchema()).setProperties(galaxyPro).setStartingOffsets(OffsetsInitializer.latest()).build();/** 1、 创建flink流式执行环境 */final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(120000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(180000L);env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getConfig().setAutoWatermarkInterval(0);env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(200, 60000 * 2L));env.setParallelism(32);/** 2、 添加 用户+事件 Source 源 */SingleOutputStreamOperator<Row> rsoso = env.fromSource(galaxyKafkaSource, WatermarkStrategy.noWatermarks(), "GalaxyToPaimonSource").uid("GalaxyToPaimonSource_Uid").name("GalaxyToPaimonSource_Name").setParallelism(source)/** 3、 简单取出字段,下发GalaxyEntity对象 */.flatMap(new GalaxyToPaimonFlatMap()).uid("GalaxyToPaimonFlatMapFunction_Uid").name("GalaxyToPaimonFlatMapFunction_Name").setParallelism(flatmap).returns(Types.ROW_NAMED(new String[]{"realtime", "ip", "session_id", "app_id", "device_uuid""day", "hour"},Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING));/** 4、创建flink table执行环境 */StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);Schema schema = Schema.newBuilder().column("realtime", DataTypes.STRING()).column("ip", DataTypes.STRING()).column("session_id", DataTypes.STRING()).column("app_id", DataTypes.STRING()).column("device_uuid", DataTypes.STRING()).column("day", DataTypes.STRING()).column("hour", DataTypes.STRING()).build();/** 5、创建 Paimon catalog */tableEnv.executeSql("CREATE CATALOG paimon_hive WITH ('type' = 'paimon', 'warehouse'='hdfs://xxxxx/paimon')");tableEnv.executeSql("USE CATALOG paimon_hive");/** 6、将流表注册为一个临时视图 */tableEnv.createTemporaryView("odm_event_realtime_view", rsoso, schema);/** 7、将数据插入到 Paimon 表中 */tableEnv.executeSql("INSERT INTO paimon.odm_event_realtime SELECT * FROM odm_event_realtime_view");env.execute("job.GalaxyToPaimonJob");} catch (Exception e) {LOG.error("GalaxyToPaimonJob启动失败!", e);}}
}

Function类

package function;import com.google.protobuf.ByteString;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class GalaxyToPaimonFlatMap extends RichFlatMapFunction<ByteString, Row> {private static final Logger log = LoggerFactory.getLogger(GalaxyToPaimonFlatMap.class);private static final DateTimeFormatter inputDateFormat = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");private static final DateTimeFormatter outputDateFormat = DateTimeFormatter.ofPattern("yyyyMMdd");private static final DateTimeFormatter outputHourFormat = DateTimeFormatter.ofPattern("yyyyMMddHH");@Overridepublic void flatMap(ByteString bytes, Collector<Row> out) {try {// 创建结果RowRow row = new Row(86);// 使用myProtoBufObj对象依次赋值myProtoBufObjDataToProtoBuf.myProtoBufObj myProtoBufObj = myProtoBufObjDataToProtoBuf.myProtoBufObj.parseFrom(bytes);String realtime = myProtoBufObj.getRealtime();row.setField(0, realtime);row.setField(1, myProtoBufObj.getIp());row.setField(2, myProtoBufObj.getSessionId());row.setField(3, myProtoBufObj.getAppId());row.setField(4, myProtoBufObj.getDeviceUuid());row.setField(5, LocalDateTime.parse(realtime, inputDateFormat).format(outputDateFormat));row.setField(6, LocalDateTime.parse(realtime, inputDateFormat).format(outputHourFormat));// 将 Row 对象输出out.collect(row);} catch (Exception e) {log.error("function.GalaxyToPaimonFlatMap error is:  ", e);}}
}
http://www.xdnf.cn/news/995941.html

相关文章:

  • C++11中char16_t和char32_t的入门到精通
  • 黑马点评面试话术
  • uniapp 时钟
  • 电动汽车驱动模式扭矩控制设计方法
  • 三、DevEco Studio安装和HelloWorld应用
  • Kubernetes 集群安全(身份认证机制、SecurityContext、Network Policy网络策略、预防配置泄露、全面加固集群安全)
  • Springboot仿抖音app开发之消息业务模块后端复盘及相关业务知识总结
  • C++核心编程(动态类型转换,STL,Lanmda)
  • 【EdgeAI实战】(3)边缘AI开发套件 STM32N6570X0 用户手册
  • 【递归、搜索与回溯算法】概括
  • Vue + Vite 项目部署 Docker 全攻略:原理、路由机制、问题排查与开发代理解析
  • 使用 PyTorch 和 SwanLab 实时可视化模型训练
  • Python使用总结之Linux部署python3环境
  • 【测试开发】数据类型篇-列表推导式和字典推导式
  • Vue3+TypeScript实现责任链模式
  • XML 注入与修复
  • 接口测试不再难:智能体自动生成 Postman 集合
  • Apache 反向代理Unity服务器
  • Golang启用.exe文件无法正常运行
  • NGINX 四层 SSL/TLS 支持ngx_stream_ssl_module
  • vue3集成高德地图绘制轨迹地图
  • 鸿蒙 UI 开发基础语法与组件复用全解析:从装饰器到工程化实践指南
  • uni-app 小程序 Cannot read property ‘addEventListener‘ of undefined, mounted hook
  • 一.干货干货!!!SpringAI入门到实战-小试牛刀
  • 山东大学《Web数据管理》期末复习宝典【万字解析!】
  • Mac 系统 Node.js 安装与版本管理指南
  • 使用Gitlab CI/CD结合docker容器实现自动化部署
  • React 集中状态管理方案
  • CentOS变Ubuntu后后端程序SO库报错,解决方案+原理分析!
  • .NET 中的异步编程模型