大数据学习笔记
大数据学习笔记
Hadoop 高频面试题
Hadoop 作为大数据生态系统的核心框架,在面试中经常被涉及。以下是常见的高频面试题分类及解答要点。
Hadoop 基础概念
Hadoop 核心组件有哪些?
Hadoop 主要由 HDFS(分布式文件系统)、YARN(资源管理器)和 MapReduce(计算框架)组成。生态扩展包括 Hive、HBase、Spark 等。
HDFS 的设计原理是什么?
HDFS 采用主从架构,NameNode 管理元数据,DataNode 存储实际数据。支持高容错性,默认块大小为 128MB(Hadoop 2.x/3.x),适合大文件存储。
// HDFS 文件写入示例
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/test/file.txt");
FSDataOutputStream outputStream = fs.create(path);
outputStream.writeBytes("Hello HDFS");
outputStream.close();
MapReduce 工作原理
MapReduce 的执行流程是怎样的?
分为 InputSplit、Map、Shuffle、Reduce 和 Output 阶段。Shuffle 阶段包括 Partition、Sort 和 Combiner(可选)。
如何优化 MapReduce 性能?
- 合理设置 Map 和 Reduce 任务数量。
- 使用 Combiner 减少数据传输。
- 选择合适的数据压缩格式(如 Snappy)。
# 简单 MapReduce 示例(Python 版)
from mrjob.job import MRJobclass WordCount(MRJob):def mapper(self, _, line):for word in line.split():yield word, 1def reducer(self, word, counts):yield word, sum(counts)if __name__ == '__main__':WordCount.run()
YARN 资源管理
YARN 的主要角色有哪些?
- ResourceManager:全局资源调度。
- NodeManager:单节点资源管理。
- ApplicationMaster:应用级任务协调。
YARN 调度器有哪些?
- FIFO Scheduler:先进先出。
- Capacity Scheduler:队列资源隔离。
- Fair Scheduler:动态平衡资源分配。
HDFS 高可用性
如何保证 NameNode 高可用?
通过 HA 架构,使用 Active 和 Standby NameNode,依赖 ZooKeeper 实现故障自动切换。EditLogs 共享存储(如 QJM)。
HDFS 数据一致性如何保障?
客户端写入数据时,需收到所有 DataNode 的 ACK 确认。校验和(Checksum)机制防止数据损坏。
Hadoop 生态工具
Hive 与传统数据库的区别?
Hive 是数据仓库工具,基于 HDFS 存储,适合批处理,SQL 转化为 MapReduce/Tez/Spark 作业。
Spark 为什么比 MapReduce 快?
- 内存计算减少磁盘 I/O。
- 基于 DAG 的执行引擎优化任务调度。
- 支持更丰富的算子(如 RDD 转换操作)。
性能调优与故障排查
如何解决 DataNode 磁盘不均衡?
运行 hdfs diskbalancer
命令,或手动调整数据分布。
常见的 MapReduce 失败原因有哪些?
- 内存不足:调整
mapreduce.map.memory.mb
参数。 - 数据倾斜:优化 Key 分布或使用二次排序。
进阶问题
Hadoop 3.x 有哪些新特性?
- Erasure Coding:节省存储空间。
- YARN Timeline Service v2:改进任务监控。
- 支持 GPU 和 FPGA 等异构计算。
如何实现小文件合并?
使用 Hadoop Archive(HAR)或通过 Spark/Hive 合并为 SequenceFile。
实际场景问题
如何处理海量数据的排序?
- MapReduce 全排序:使用单个 Reduce 或采样分区。
- TeraSort 算法:自定义 Partitioner 实现范围分区。
HDFS 如何应对节点宕机?
通过副本机制(默认 3 副本)自动恢复数据,NameNode 触发副本复制任务。
Hive 基本介绍
Hive 是基于 Hadoop 的数据仓库工具,用于处理结构化数据。它将 SQL 查询转换为 MapReduce、Tez 或 Spark 任务,适合离线批处理场景。核心特点包括:
- 类 SQL 语法:支持 HiveQL,降低学习成本。
- 元数据管理:存储在关系型数据库(如 MySQL)中。
- 数据存储:底层依赖 HDFS,支持文本、ORC、Parquet 等格式。
- 扩展性:支持 UDF(用户自定义函数)和 UDAF(聚合函数)。
Hive 常见面试题
数据模型与存储
Hive 内部表和外部表的区别
- 内部表:数据由 Hive 管理,删除表时数据一并删除。
- 外部表:数据路径由用户指定,删除表仅删除元数据。
-- 创建内部表
CREATE TABLE internal_table (id INT, name STRING);-- 创建外部表
CREATE EXTERNAL TABLE external_table (id INT) LOCATION '/user/data';
分区和分桶的作用
- 分区:按列值(如日期)划分目录,加速查询。
- 分桶:对数据哈希分文件,优化 JOIN 和采样。
-- 分区表示例
CREATE TABLE partitioned_table (id INT) PARTITIONED BY (date STRING);-- 分桶表示例
CREATE TABLE bucketed_table (id INT) CLUSTERED BY (id) INTO 4 BUCKETS;
HiveQL 优化
JOIN 操作优化
- Map Join:小表加载到内存,避免 Reduce 阶段。
- 倾斜优化:使用
skewjoin
或拆分倾斜键。
-- 启用 Map Join
SET hive.auto.convert.join=true;
数据倾斜解决方案
- 增加 Reduce 数量:
set hive.exec.reducers.bytes.per.reducer=256000000
。 - 倾斜键单独处理:先过滤倾斜键再 UNION ALL。
执行流程与架构
Hive 执行流程
- 解析 SQL 生成抽象语法树(AST)。
- 转换为逻辑执行计划。
- 优化逻辑计划(如谓词下推)。
- 生成物理计划(MapReduce/Spark)。
元数据存储位置
默认存储在 Derby,生产环境通常改用 MySQL。
实际场景问题
如何导出 Hive 查询结果到本地?
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/output'
SELECT * FROM table_name;
动态分区配置
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT INTO TABLE partitioned_table PARTITION(dt)
SELECT id, dt FROM source_table;
ORC 和 Parquet 格式对比
- ORC:Hive 原生支持,列存储,适合 Hive 场景。
- Parquet:跨生态(如 Spark),压缩效率高。
性能调优
关键参数配置
-- 并行执行
SET hive.exec.parallel=true;-- 合并小文件
SET hive.merge.mapfiles=true;
Explain 分析查询计划
EXPLAIN EXTENDED SELECT * FROM table WHERE id=100;
Flume 基本介绍
Flume 是 Apache 旗下的分布式、高可靠、高可用的日志收集系统,专为海量日志数据的聚合和传输设计。其核心架构基于Agent(由 Source、Channel、Sink 组成),支持自定义扩展和多种数据源/目的地(如 Kafka、HDFS、本地文件等)。
核心组件
- Source:数据来源(如
netcat
、exec
、spooldir
、Kafka
)。 - Channel:临时存储数据的缓冲(如
memory
、file
、JDBC
)。 - Sink:数据目的地(如
HDFS
、HBase
、Kafka
)。
特点
- 事务机制保证数据可靠性。
- 支持多级 Agent 串联(Flow Pipeline)。
- 插件化架构,易于扩展。
Flume 高频面试题
1. Flume 的可靠性如何保证?
- Channel 持久化:使用
file channel
或JDBC channel
避免内存丢失。 - 事务机制:Source 到 Channel、Channel 到 Sink 均通过事务提交/回滚。
- Sink 重试:失败后自动重试写入。
2. Flume 如何解决数据重复问题?
- 幂等性设计:部分 Sink(如 HDFS)支持覆盖写入。
- 外部去重:结合下游系统(如 HBase 主键)或业务逻辑处理。
3. Memory Channel 和 File Channel 的区别?
- Memory Channel:高性能但易丢失,适合对可靠性要求不高的场景。
- File Channel:基于磁盘存储,可靠性高但性能较低。
4. 如何监控 Flume 运行状态?
- JMX 接口:暴露指标(如 Channel 大小、事件数量)。
- 自定义监控脚本:解析日志或调用 API。
5. Flume 如何与 Kafka 集成?
- Source 端:使用
Kafka Source
消费数据。 - Sink 端:使用
Kafka Sink
推送数据。
# Kafka Source 示例配置
agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.kafka.bootstrap.servers = localhost:9092
agent.sources.kafkaSrc.kafka.topics = test_topic
6. Flume 的拦截器(Interceptor)作用?
- 数据清洗:过滤、修改或添加头信息。
- 常见类型:
Timestamp
、Host
、Regex
拦截器。
7. 如何优化 Flume 性能?
- 批量提交:调整
batchSize
(如 Sink 的hdfs.batchSize
)。 - 并行度:增加 Sink 组的处理器数量(
sinkgroups
)。 - Channel 选择:高吞吐场景优先使用
memory channel
。
8. Flume 事务的工作流程?
- Put 事务:Source 从外部读取事件,提交到 Channel。
- Take 事务:Sink 从 Channel 提取事件,写入目的地后提交。
任一阶段失败会回滚。
9. 列举 Flume 的常见拓扑结构
- 单 Agent:
Source → Channel → Sink
。 - 多级串联:
Agent1 → Avro Sink → Avro Source → Agent2
。 - 多路复用:通过
Selector
将数据路由到不同 Sink。
10. Flume 与 Logstash 的对比
- Flume:适合 Hadoop 生态,侧重可靠性。
- Logstash:支持更丰富的数据解析(如 Grok),但资源消耗较高。
HBase简介
HBase是一个开源的、分布式的、面向列的NoSQL数据库,基于Google的Bigtable模型设计,运行在Hadoop文件系统(HDFS)之上。它支持海量数据的随机读写,适合实时查询和大规模数据存储场景。
核心特性:
- 高扩展性:可横向扩展至数千节点,处理PB级数据。
- 强一致性:保证同一行的读写操作原子性。
- 自动分片:数据按Region自动分区并负载均衡。
- 稀疏存储:仅存储非空数据,节省空间。
适用场景:
- 实时读写(如日志分析、消息记录)。
- 需要快速随机访问的大数据集。
- 高写入吞吐量的应用。
HBase高频面试题
基础概念
-
HBase与RDBMS的区别
- RDBMS支持复杂查询和事务,HBase适合高吞吐、非结构化数据。
- HBase无固定schema,RDBMS需预定义表结构。
-
Row Key设计原则
- 避免单调递增(防止热点问题),采用散列或复合键。
- 长度不宜过长(影响存储效率)。
架构与原理
-
HBase架构组件
- RegionServer:负责数据读写和Region管理。
- HMaster:协调Region分配、故障恢复。
- ZooKeeper:维护集群状态和元数据。
-
Region分裂机制
- 当Region大小超过阈值(默认10GB),分裂为两个子Region。
- 分裂过程由RegionServer触发,HMaster协调。
数据操作
-
HBase读写流程
- 写流程:Client → ZooKeeper(定位RegionServer)→ MemStore → WAL → HFile。
- 读流程:Client → BlockCache → MemStore → HFile(合并结果)。
-
Compaction的作用
- 合并小文件,减少查询时的I/O开销。
- 分为Minor和Major Compaction,后者清理过期数据。
性能优化
-
热点问题解决方案
- 预分区:提前规划Region分布。
- Salting:在Row Key前添加随机前缀。
-
Bloom Filter的应用
- 快速判断某行数据是否存在,减少无效磁盘读取。
- 在Get操作中显著提升性能。
代码示例
// Java连接HBase示例
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "zk_host");
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("test_table"));Put put = new Put(Bytes.toBytes("row1"));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes("value"));
table.put(put);
table.close();
故障处理
-
RegionServer宕机恢复
- HMaster重新分配Region到其他节点。
- WAL日志重放恢复未持久化数据。
-
HBase与Hive集成
- 通过Hive外部表映射HBase数据,支持SQL查询。
- 需配置HBaseStorageHandler。
Scala 基本介绍
Scala(Scalable Language)是一种多范式编程语言,结合了面向对象和函数式编程的特性。运行在 JVM 上,兼容 Java 生态,适合构建高性能、高可扩展性的应用。
核心特性
- 静态类型:通过类型推断简化代码。
- 函数式编程:支持高阶函数、不可变数据和纯函数。
- 面向对象:所有值均为对象,类型和行为通过类和特质(trait)定义。
- 并发模型:基于
Actor
模型的Akka
框架简化并发编程。
Scala 高频面试题
语言特性
1. 解释 val
和 var
的区别
val
声明不可变变量(类似 Java 的final
)。var
声明可变变量。
val x = 10 // 不可重新赋值
var y = 20 // 可修改:y = 30
2. 什么是特质(Trait)?与 Java 接口的区别
- 特质类似接口,但可以包含具体方法实现。
- 支持多重继承(一个类可混入多个特质)。
trait Greeter {def greet(): Unit = println("Hello")
}
class MyGreeter extends Greeter
函数式编程
3. 高阶函数示例
高阶函数以函数作为参数或返回值。
def operate(f: (Int, Int) => Int, a: Int, b: Int): Int = f(a, b)
val sum = operate((x, y) => x + y, 3, 5) // 输出 8
4. 解释 map
、flatMap
和 filter
map
:对集合每个元素应用函数。flatMap
:先映射后扁平化(如处理嵌套列表)。filter
:保留满足条件的元素。
val list = List(1, 2, 3)
list.map(_ * 2) // List(2, 4, 6)
list.flatMap(x => List(x, x*2)) // List(1, 2, 2, 4, 3, 6)
list.filter(_ > 1) // List(2, 3)
并发与模式匹配
5. case class
的作用
- 不可变数据类,自动生成
equals
、hashCode
等方法。 - 常用于模式匹配。
case class Person(name: String, age: Int)
val p = Person("Alice", 30)
p match { case Person(n, a) => println(n) }
6. 解释 Future
和 Promise
Future
表示异步计算的结果。Promise
用于手动完成一个Future
。
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future { Thread.sleep(1000); 42 }
future.onComplete {case Success(value) => println(value)case Failure(e) => e.printStackTrace()
}
实际应用
7. 用 Scala 实现单例模式
使用 object
关键字直接定义单例。
object Singleton {def show(): Unit = println("I'm a singleton")
}
Singleton.show()
8. 解释隐式转换(Implicit Conversion)
通过隐式方法或参数扩展功能(需谨慎使用)。
implicit def intToString(x: Int): String = x.toString
val s: String = 42 // 自动调用 intToString
9. 什么是类型擦除?如何解决?
- JVM 在运行时擦除泛型类型信息。
- 可通过
ClassTag
或TypeTag
保留类型信息。
import scala.reflect.ClassTag
def detectType[T: ClassTag](list: List[T]): Unit = {println(implicitly[ClassTag[T]].runtimeClass)
}
detectType(List(1, 2)) // 输出 int
10. 解释 Option
和 Either
Option
:避免null
,使用Some
/None
。Either
:表示可能错误的结果(Left
为错误,Right
为正确值)。
val opt: Option[Int] = Some(5)
opt.getOrElse(0) // 5val either: Either[String, Int] = Right(10)
either match {case Right(v) => println(v)case Left(e) => println(e)
}
Spark 基本介绍
Apache Spark 是一个开源的分布式计算框架,专为大规模数据处理设计。它提供了高效的内存计算能力,支持批处理、流处理、机器学习、图计算等多种计算范式。
核心特点:
- 内存计算:通过 RDD(弹性分布式数据集)减少磁盘 I/O,提升性能。
- 多语言支持:提供 Scala、Java、Python、R 的 API。
- 丰富的生态:集成 Spark SQL(结构化数据处理)、Spark Streaming(流处理)、MLlib(机器学习)、GraphX(图计算)。
- 容错性:通过 RDD 的血缘(Lineage)机制实现数据恢复。
架构组件:
- Driver:协调任务调度,向集群管理器申请资源。
- Executor:在工作节点上执行任务,存储数据。
- Cluster Manager:管理资源(如 YARN、Mesos、Standalone)。
Spark 高频面试题
RDD 相关
1. 什么是 RDD?
RDD(Resilient Distributed Dataset)是 Spark 的核心数据结构,代表一个不可变、可分区的分布式数据集。特性包括:
- 分区:数据分布式存储。
- 血统(Lineage):记录生成 RDD 的转换操作,用于容错。
- 持久化:通过
persist()
或cache()
缓存到内存/磁盘。
2. 宽依赖 vs 窄依赖
- 窄依赖:父 RDD 的每个分区最多被子 RDD 的一个分区依赖(如
map
、filter
)。 - 宽依赖:父 RDD 的分区被子 RDD 的多个分区依赖(如
groupByKey
、reduceByKey
),需要 Shuffle。
示例代码:RDD 创建与转换
from pyspark import SparkContext
sc = SparkContext("local", "RDD Demo")
data = [1, 2, 3, 4]
rdd = sc.parallelize(data) # 创建 RDD
squared_rdd = rdd.map(lambda x: x * x) # 窄依赖操作
grouped_rdd = rdd.groupBy(lambda x: x % 2) # 宽依赖操作
Spark SQL 与 DataFrame
3. DataFrame 与 RDD 的区别
- DataFrame:结构化数据,包含 Schema 信息,支持 SQL 查询,优化器(Catalyst)自动优化执行计划。
- RDD:低层次 API,无 Schema,需手动优化。
示例代码:DataFrame 操作
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrame Demo").getOrCreate()
df = spark.read.json("data.json") # 读取 JSON 文件
df.select("name").show() # SQL 风格操作
Spark 运行机制
4. Spark 任务提交流程
- 用户代码提交后,Driver 生成 DAG(有向无环图)。
- DAGScheduler 将 DAG 划分为 Stage(按宽依赖划分)。
- TaskScheduler 将 Task 分发到 Executor 执行。
5. Shuffle 过程
- 发生在宽依赖操作(如
reduceByKey
)。 - 数据按 Key 重新分区,跨节点传输。
- 可能成为性能瓶颈,可通过调整分区数优化。
性能调优
6. 常见调优手段
- 内存管理:调整
spark.executor.memory
和spark.memory.fraction
。 - 并行度:通过
spark.default.parallelism
控制分区数。 - 序列化:使用 Kryo 序列化(
spark.serializer=org.apache.spark.serializer.KryoSerializer
)。
示例代码:设置 Kryo 序列化
conf = SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)
Spark Streaming
7. 微批处理(Micro-batching)原理
- 将流数据切分为小批次(如 1 秒),每个批次作为一个 RDD 处理。
- 通过
DStream
(离散流)抽象实现。
示例代码:流处理
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, batchDuration=1) # 1 秒的批次
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.countByValue()
word_counts.pprint()
ssc.start()
总结
Spark 的核心优势在于内存计算和统一的处理框架。掌握 RDD、DataFrame、任务调度及调优是面试的关键点。实际应用中需根据场景选择 API(如流处理用 Structured Streaming 替代 DStream)。