当前位置: 首页 > web >正文

flinksql实践(从kafka读数据)

本案例是基于flinksql实现的,将逐步实现从kafka读写数据,聚合查询,关联维表(外部系统)等。

环境准备

        首先确保电脑已经安装好zookeeper、kafka、flink。本文flink使用单机模式,zookeeper和kafka也使用单机配置。(环境配置部分可以跳过)

        首先启动zookeeper

# 启动zookeeper
bin/zkServer.sh start# 关闭zookeeper
bin/zkServer.sh stop

        然后启动kafka

# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 关闭kafka 
bin/kafka-server-stop.sh -daemon config/server.properties

        启动flink集群,并打开sql客户端

# 启动flink集群
bin/start-cluster.sh
# 启动sql客户端
bin/sql-client.sh embedded -s yarn-session 
# 关闭flink集群
bin/stop-cluster.sh

 案例1:从kafka读取csv格式数据

数据准备

csv格式数据是以','进行分隔。本案例设计一个学生信息表,包含学生id,学生姓名,学生年级字段。

# 示范数据 
1,"zhangsan",2021
2,"lisi",2021
3,"wangwu",2024
4,"Bob",2021
5,"Lily",2022

kafka 主题相关命令

# 遍历已有topic
bin/kafka-topics.sh --bootstrap-server 192.168.137.201:9092 --list
# 生产者
bin/kafka-console-producer.sh --bootstrap-server node1:9092 --topic test
# 消费者
bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic test --from-beginning

创建kafka表

create table kafkatable (`stu_id` bigint,`stu_name` string,`grade` bigint
) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'node1:9092','properties.group.id' = 'testgroup','scan.startup.mode' = 'latest-offset','format' = 'csv'
)	

本案例读test分区的最新数据,如需修改,参考以下链接:

Kafka | Apache Flink

注意:kafka-flink连接器需要手动上传到flink安装目录下的lib目录下,官网有提供对应jar包。

案例实现

-- 实时查看kafkatable数据
select * from kafkatable;-- 查看不同年级学生数量
select grade,count(1) from kafkatable group by grade;

案例2:从kafka读取json格式数据

数据准备

{"stu_id": 1, "stu_name": "zhangsan", "grade": 2021}
{"stu_id": 2, "stu_name": "lisi", "grade": 2021}
{"stu_id": 3, "stu_name": "wangwu", "grade": 2024}
{"stu_id": 4, "stu_name": "Bob", "grade": 2021}
{"stu_id": 5, "stu_name": "Lily", "grade": 2022}

创建kafka表

create table student_info (`stu_id` bigint,`stu_name` string,`grade` bigint
) with ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'node1:9092','properties.group.id' = 'testgroup','scan.startup.mode' = 'latest-offset','format' = 'json'
);	

案例实现

-- 实时遍历
select * from student_info;

http://www.xdnf.cn/news/5888.html

相关文章:

  • 在Linux系统中开放指定端口访问(允许远程访问数据库)
  • 电脑关机再开机会换IP吗?深入解析分配机制
  • PHP-FPM 调优配置建议
  • linux入门学习(介绍、常用命令、vim、shell)
  • .Net HttpClient 处理错误与异常
  • 机器学习 --- 数据集
  • 【Java】网络编程(Socket)
  • set(CMAKE_C_FLAGS “${CMAKE_C_FLAGS} -ansi -pedantic -Wall“)
  • JVM——方法内联之去虚化
  • 【go】binary包,大小端理解,read,write使用,自实现TCP封包拆包案例
  • Go构建高并发权重抽奖系统:从设计到优化全流程指南
  • Python 基础语法与数据类型(八) - 函数参数:位置参数、关键字参数、默认参数、可变参数 (*args, **kwargs)
  • 【PyTorch】深度学习实践——第二章:线性模型
  • 【数据结构】——栈和队列OJ
  • python酒店健身俱乐部管理系统
  • iPaaS 集成平台如何解决供应链响应速度问题?
  • Spring AI 开发本地deepseek对话快速上手笔记
  • 07_Java中的锁
  • 系统平衡与企业挑战
  • Tomcat与纯 Java Socket 实现远程通信的区别
  • 中国人工智能智能体研究报告
  • Linux的文件查找与压缩
  • 关于cleanRL Q-learning
  • Java集合框架详解与使用场景示例
  • MySQL 5.7在CentOS 7.9系统下的安装(下)——给MySQL设置密码
  • Android NDK 高版本交叉编译:为何无需配置 FLAGS 和 INCLUDES
  • org.slf4j.MDC介绍-笔记
  • 集成DHTMLX 预订排期调度组件实践指南:如何实现后端数据格式转换
  • web 自动化之 yaml 数据/日志/截图
  • Boundary Attention Constrained Zero-Shot Layout-To-Image Generation