当前位置: 首页 > backend >正文

Apache Sqoop数据采集问题

Sqoop数据采集格式问题

  • 一、Sqoop工作原理
  • 二、Sqoop命令格式
  • 三、Oracle数据采集格式问题
  • 四、Sqoop增量采集方案

Apache Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。

一、Sqoop工作原理

  • 数据导入:Sqoop通过MapReduce任务来实现数据的并行导入。首先,它会将关系型数据库中的数据表按照一定的规则进行分区,然后为每个分区启动一个Map任务,同时从数据库中读取相应分区的数据,并将数据写入到HDFS或其他Hadoop存储系统中。这样可以充分利用Hadoop集群的分布式计算能力,提高数据导入的效率。

  • 导出过程:与导入类似,Sqoop也会将数据进行分区处理,然后通过Map任务将Hadoop中的数据读取出来,并按照目标关系型数据库的格式要求,将数据写入到数据库中。

Sqoop通过创建一个数据传输的MR程序,进而实现数据传输。

Sqoop安装:

  1. JAVA环境配置
  2. Hadoop环境配置
  3. 相关数据库驱动包

只要环境满足以上设置,直接解压Sqoop安装包即可安装,修改配置后即可使用。

二、Sqoop命令格式

基础使用语法:

sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数

数据传输常用参数:

选项参数
–connectjdbc:mysql://hostname:3306(数据库连接URL)
–username数据库用户名
–password数据库用户密码
–table指定数据表
–columns指定表列值
–where数据过滤条件
–e/–query自定义SQL语句
–driver指定数据库驱动
–delete-target-dir导入数据时,清空目标目录
–target-dir指定导入数据的目录(通常为HDFS路径)
–export-dir指定导出数据的源目录(通常为HDFS路径)

Sqoop命令的使用方法可以通过sqoop -h命令查看相关使用方法,此处不在赘述了

三、Oracle数据采集格式问题

场景:

  • Step1: 查看业务数据库中 CISS_SERVICE_WORKORDER 表的数据条数。

    select count(1) as cnt from CISS_SERVICE_WORKORDER;  178609
  • Step2: 采集CISS_SERVICE_WORKORDER的数据到HDFS上

    sqoop import \
    --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \  
    --username ciss \
    --password 123456 \
    --table CISS4.CISS_SERVICE_WORKORDER \
    --delete-target-dir \
    --target-dir /test/full_imp/ciss4.ciss_service_workorder \
    --fields-terminated-by "\001" \   #指定数据分割符
    -m 1  #指定并行度
    
  • Step3: 使用Hive查看导入数据表的行数

    create external table test_text(
    line string # 将导入的数据一行作为表中的一列
    )
    location '/test/full_imp/ciss4.ciss_service_workorder';
    select count(*) from test_text;  195825

问题:
Sqoop采集完数据后,HDFS数据中存储的数据行数跟源数据库的数据量不符合。

原因:

  • sqoop以文本格式导入数据时,默认的换行符是特殊字符。
  • Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行

Oracle数据:

idnameage
001zhang\nsan18

Sqoop转换后的数据:

001zhang
san18

Hive表中的数据:

idnameage
001zhang
san18

解决方法:

  • 方案一:
    • 删除或者替换数据中的换行符
    • Sqoop参数 --hive-drop-import-delims 删除换行符
    • Sqoop参数 --hive-delims-replacement char 替换换行符

    不建议使用,破坏原始数据结构,ODS层数据尽量抱持原结构

  • 方案二:
    • 采用特殊的存储格式,AVRO格式

常见的文件格式介绍:

类型介绍
TextFileHive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低
SequenceFile含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大
AvroFile特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起
OrcFile列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快
ParquetFile列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强

Avro格式特点

  • 优点
    • 二进制数据存储,性能好、效率高
    • 使用JSON描述模式,支持场景更丰富
    • Schema和数据统一存储,消息自描述(将表中的一行数据作为对象存储,并且Schema为元数据)
    • 模式定义允许定义数据的排序
  • 缺点
    • 只支持Avro自己的序列化格式
    • 少量列的读取性能比较差,压缩比较低
  • 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景

Sqoop使用Avro格式:

  sqoop import \-Dmapreduce.job.user.classpath.first=true \--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \--username ciss \--password 123456 \--table CISS4.CISS_SERVICE_WORKORDER \--delete-target-dir \--target-dir /test/full_imp/ciss4.ciss_service_workorder \--as-avrodatafile \    # 选择文件存储格式为AVRO--fields-terminated-by "\001" \-m 1

Hive建表指定文件的存储格式:

create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';

AVRO 数据以 二进制序列化 存储,字段通过预定义的 模式(Schema) 解析,而非依赖分隔符,即使字段内容包含逗号、换行符等特殊字符,也不会影响数据结构的正确性。
Schema 定义(JSON 格式),明确描述了字段名称、类型、顺序等信息。

四、Sqoop增量采集方案

Sqoop 支持两种增量模式:

  • append 模式:
    适用于 仅追加数据 的表(如日志表),基于 递增列(如自增主键 id)采集新数据。

  • lastmodified 模式:
    适用于 数据会更新 的表(如用户表),基于 时间戳列(如 last_update_time)采集新增或修改的数据。

append模式要求源数据表具备自增列,如建表时设置的自增id
lastmodified模式要求源数据表具有时间戳字段。

Append模式:

要求:必须有一列自增的值,按照自增的int值进行判断

特点:只能导入增加的数据,无法导入更新的数据

场景:数据只会发生新增,不会发生更新的场景

sqoop import \                                   # 执行数据导入操作--connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)--username root \                             # 数据库用户名:root--password 123456 \                           # 数据库密码:123456--table tb_tohdfs \                           # 要导入的源表:tb_tohdfs--target-dir /sqoop/import/test02 \           # HDFS目标目录(数据将写入此路径)--fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)--check-column id \                           # 指定增量检查列:id(通常是自增主键)--incremental append \                        # 增量模式为“append”(仅导入新数据)--last-value 0 \                              # 上次导入的id最大值(初始值为0,首次导入id>0的数据)-m 1                                          # 使用1个Map任务(单线程)

appebd模式使用last-value记录上次导入的数据id最大值,初次导入一般为全量导入,即id>0

此处的last_value需要手动填写,因此可以使用Sqoop的job管理进行自动记录。

sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0
sqoop job --exec my_job  # 自动更新 last-value

lastmodified模式:
要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断

特点:既导入新增的数据也导入更新的数据

场景:表中的记录会新增或更新,且每次更新都会修改 lastmode 时间戳。一般无法满足要求,所以不用。

sqoop import \                                   # 执行数据导入操作--connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)--username root \                             # 数据库用户名:root--password 123456 \                           # 数据库密码:123456--table tb_lastmode \                         # 要导入的源表:tb_lastmode--target-dir /sqoop/import/test03 \           # HDFS目标目录(数据将写入此路径)--fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)--incremental lastmodified \                  # 增量模式为“lastmodified”(采集新增或修改的数据)--check-column lastmode \                     # 指定时间戳列:lastmode(记录数据的更新时间)--last-value '2021-06-06 16:09:32' \          # 上次导入的最大时间值(导入此时间之后的新增/修改数据)-m 1                                          # 使用1个Map任务(单线程)

lastmodified模式使用时间戳记载数据的更新线。

若同一条记录被多次更新,且 lastmode 时间超过 --last-value,Sqoop 会多次导入该记录。

解决方案:添加 --merge-key <主键列> 参数,合并新旧数据(基于主键去重):

 --merge-key id  # 假设 id 是主键列

自定义模式:
要求:每次运行的输出目录不能相同

特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集

场景:一般用于自定义增量采集每天的分区数据到Hive

sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1

自定义模式可以根据设置的sql进行数据导入,因此是最常用的场景。

http://www.xdnf.cn/news/2444.html

相关文章:

  • 极客时光:第二部分——用QLoRA、RunPod和Cursor以超低成本微调DeepSeek-7B打造你的聊天机器人
  • WHAT - 《成为技术领导者》思考题(第二章)
  • 加速用户体验:Amazon CloudFront 实践与优化技巧
  • PDFMathTranslate:让数学公式在PDF翻译中不再痛苦
  • 【Android】dialogX对话框框架
  • 【C++ 类和数据抽象】消息处理示例(2)
  • 《代码整洁之道》第9章 单元测试 - 笔记
  • es数据导出
  • Vue中Axios实战指南:高效网络请求的艺术
  • Excel如何安装使用EPM插件并且汉化?
  • uniapp+vue3表格样式
  • Golang | Builder模式
  • 大模型——Suna集成浏览器操作与数据分析的智能代理
  • Transformer数学推导——Q25 分析视觉-语言模型中区域注意力(Region Attention)的边界框投影公式
  • Ubuntu 22.04.4操作系统初始化详细配置
  • WPF使用SQLite与JSON文本文件结合存储体侧平衡数据的设计与实现
  • 【设计模式】享元模式
  • .aar中申请权限时使用了android:maxSdkVersion导致主App的权限组找不到对应的权限
  • 【机器学习-线性回归-4】线性回归中的最优解:从数学原理到实践应用
  • ESP32开发入门(四):ESP32-s3多串口开发实践
  • 深度整合Perforce P4+Jira+Confluence:游戏开发团队协作工具链搭建指南
  • 力扣热题——统计完全子数组的数目
  • 【MQ篇】RabbitMQ之死信交换机!
  • Node.js CSRF 保护指南:示例及启用方法
  • react slot传递
  • Python 操作 Excel 插入图表:解锁数据可视化的高效密码
  • 使用vue2 开发一个纯静态的校园二手交易平台-前端项目练习
  • WEBSTORM前端 —— 第2章:CSS —— 第3节:背景属性与显示模式
  • SpringMVC 通过ajax 前后端数据交互
  • 空间矩阵的思考