当前位置: 首页 > news >正文

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景

1.1 分布式计算引擎的核心优势

Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势:

  • 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-790)。
  • 多范式支持:同时支持批处理(Spark SQL)、流处理(Spark Streaming)、图计算(GraphX)。
  • 生态统一:无缝集成 Hadoop 生态,支持从 HDFS、HBase 等数据源直接读取数据。
  • 高吞吐量:单机可处理百万级数据记录,集群可扩展至数千节点。

典型应用场景

  • 实时数据分析(如电商用户行为分析)。
  • 大规模机器学习(基于 MLlib 库的模型训练)。
  • 流式数据处理(配合 Kafka 实现实时日志分析)。

二、集群环境规划与前置准备

2.1 集群节点规划(3 节点方案)

节点名称IP 地址角色分配内存配置数据目录
spark-node1192.168.88.130Master、Worker16GB/data/spark/data
spark-node2192.168.88.131Worker16GB/data/spark/data
spark-node3192.168.88.132Worker16GB/data/spark/data

2.2 前置依赖安装(所有节点)

  1. JDK 环境(需 1.8+,文档段落:3-248):

    bash

    yum install -y java-1.8.0-openjdk-devel
    java -version # 验证版本
    
  2. Hadoop 集群(已部署 HDFS 和 YARN,文档段落:3-633):
    确保 HDFS 服务正常,Spark 将使用 HDFS 作为分布式存储。
  3. SSH 免密登录(文档段落:3-523):

    bash

    ssh-keygen -t rsa -b 4096 -N ""
    ssh-copy-id spark-node2 && ssh-copy-id spark-node3
    

三、Spark 单机安装与配置

3.1 下载与解压安装包

bash

# 下载Spark 2.4.5(文档段落:3-796)
wget https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz# 解压到指定目录
tar -zxvf spark-2.4.5-bin-hadoop2.7.tgz -C /export/server/
ln -s /export/server/spark-2.4.5-bin-hadoop2.7 /export/server/spark # 创建软链接

3.2 核心配置文件修改

3.2.1 spark-env.sh(文档段落:3-800)

bash

vim /export/server/spark/conf/spark-env.sh
# 添加以下内容
export JAVA_HOME=/export/server/jdk
export HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
export YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
export SPARK_MASTER_HOST=spark-node1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_WORKER_CORES=4 # 每个Worker使用4核
export SPARK_WORKER_MEMORY=8g # 每个Worker使用8GB内存
3.2.2 slaves(文档段落:3-803)

bash

vim /export/server/spark/conf/slaves
# 添加以下内容(每行一个节点)
spark-node1
spark-node2
spark-node3
3.2.3 spark-defaults.conf(可选优化)

bash

cp /export/server/spark/conf/spark-defaults.conf.template /export/server/spark/conf/spark-defaults.conf
vim /export/server/spark/conf/spark-defaults.conf
# 添加以下优化配置
spark.executor.memory 4g
spark.driver.memory 2g
spark.sql.shuffle.partitions 24
spark.default.parallelism 24

四、集群化部署:节点间配置同步

4.1 分发安装目录到其他节点

bash

# 在spark-node1执行,复制到node2/node3
scp -r /export/server/spark spark-node2:/export/server/
scp -r /export/server/spark spark-node3:/export/server/

4.2 配置文件一致性验证

检查所有节点的spark-env.shslaves文件内容一致,确保:

  • SPARK_MASTER_HOST指向正确的 Master 节点。
  • slaves文件包含所有 Worker 节点主机名。

五、集群启动与状态验证

5.1 启动 Spark 集群

5.1.1 单节点启动(spark-node1 执行)

bash

# 启动Master和Worker
/export/server/spark/sbin/start-all.sh
5.1.2 后台启动(生产环境推荐)

bash

nohup /export/server/spark/sbin/start-all.sh &
tail -f /export/server/spark/logs/spark-root-master-spark-node1.log # 查看启动日志

5.2 验证集群状态

5.2.1 进程检查(所有节点执行)

bash

jps | grep -E "Master|Worker"
# spark-node1应显示Master和Worker进程
# spark-node2/node3应显示Worker进程
5.2.2 网页管理界面
  • Master 状态:访问http://spark-node1:8080,查看集群概述、Worker 列表、应用程序运行情况(文档段落:3-810)。
  • Worker 状态:在管理界面中点击节点名称,查看 CPU、内存、磁盘使用情况。
5.2.3 命令行验证(文档段落:3-812)

bash

# 进入Spark Shell
/export/server/spark/bin/spark-shell --master spark://spark-node1:7077# 执行单词计数示例
val textFile = sc.textFile("hdfs://spark-node1:8020/README.txt")
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.collect()

六、核心功能测试与性能调优

6.1 数据处理测试

6.1.1 批量数据处理

使用 Spark SQL 处理 HDFS 上的 Parquet 文件:

sql

-- 在Spark SQL中执行
CREATE TABLE users USING parquet OPTIONS (path "hdfs://spark-node1:8020/users.parquet")
SELECT count(*) FROM users WHERE age > 30
6.1.2 流式数据处理

对接 Kafka 进行实时数据处理(需提前配置 Kafka 主题):

java

// 示例代码:Spark Streaming对接Kafka
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "kafka-node1:9092,kafka-node2:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test-group","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test-topic")
val stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
stream.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

6.2 性能优化策略

6.2.1 资源分配优化
  • Executor 配置:根据任务类型调整 Executor 数量与内存:

    bash

    # 提交任务时指定资源
    /export/server/spark/bin/spark-submit \
    --master spark://spark-node1:7077 \
    --num-executors 10 \
    --executor-memory 4g \
    --executor-cores 2 \
    --class com.example.MyApp \
    my-app.jar
    
  • 动态资源分配:在spark-defaults.conf中启用:

    properties

    spark.dynamicAllocation.enabled true
    spark.dynamicAllocation.minExecutors 5
    spark.dynamicAllocation.maxExecutors 50
    
6.2.2 内存优化
  • 调整存储与执行内存比例

    properties

    spark.memory.storageFraction 0.5  # 存储占比50%
    spark.memory.fraction 0.8        # 总内存使用比例80%
    
  • 启用堆外内存

    properties

    spark.memory.offHeap.enabled true
    spark.memory.offHeap.size 2g
    
6.2.3 任务调度优化
  • 广播大变量

    java

    val broadcastVar = sc.broadcast(loadLargeData())
    
  • 合并小任务

    properties

    spark.sql.autoBroadcastJoinThreshold 104857600  # 100MB以下自动广播
    spark.default.parallelism 24 * num-executors
    

七、常见故障排查与解决方案

7.1 集群无法启动

可能原因

  • SSH 免密失败:检查节点间 SSH 连接是否正常,~/.ssh/authorized_keys是否包含所有节点公钥(文档段落:3-523)。
  • 端口冲突:确保 Master 端口(7077)、WebUI 端口(8080)未被占用。
  • 配置文件错误:检查spark-env.shHADOOP_CONF_DIR是否指向正确的 Hadoop 配置目录。

解决方法

bash

# 示例:修复SSH免密问题
ssh spark-node2 "echo 'hello' > /tmp/test" # 验证连接
netstat -anp | grep 7077 # 检查端口占用

7.2 任务执行缓慢

可能原因

  • 资源分配不足:Executor 数量或内存设置过低,导致任务排队。
  • 数据倾斜:某个分区数据量过大,形成任务热点。
  • Shuffle 操作频繁:过多的 Shuffle 操作导致磁盘 IO 瓶颈。

解决方法

  • 增加 Executor 数量与内存,调整--num-executors--executor-memory参数。
  • 对倾斜数据进行重分区:

    java

    df.repartition(24).write.parquet("hdfs://path")
    
  • 优化 SQL 查询,减少不必要的 Shuffle:

    sql

    SELECT * FROM table WHERE condition ORDER BY id LIMIT 100
    

7.3 Worker 节点掉线

可能原因

  • 内存溢出:Worker 内存不足导致 JVM 崩溃。
  • 网络分区:节点间网络延迟过高或断开。
  • 磁盘故障:数据目录所在磁盘损坏,导致任务失败。

解决方法

  • 增加spark-worker-memory配置,降低单个任务内存占用。
  • 检查网络连接,确保节点间延迟 < 1ms,带宽≥1Gbps。
  • 更换故障磁盘,重启 Worker 进程:

    bash

    /export/server/spark/sbin/stop-worker.sh spark-node2:8081
    # 修复磁盘后重新启动
    /export/server/spark/sbin/start-worker.sh spark://spark-node1:7077
    

八、生产环境最佳实践

8.1 高可用性配置

8.1.1 多 Master 部署(HA 模式)

通过 Zookeeper 实现 Master 自动故障转移(文档段落:3-793):

bash

# 修改spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.zk.url=zk-node1:2181,zk-node2:2181,zk-node3:2181 -Dspark.zk.dir=/spark"# 启动多个Master
/export/server/spark/sbin/start-master.sh --host spark-node1
/export/server/spark/sbin/start-master.sh --host spark-node2
8.1.2 数据持久化
  • Checkpoint 机制:对长时间运行的流作业设置 Checkpoint:

    java

    streamingContext.checkpoint("hdfs://spark-node1:8020/checkpoints")
    
  • 作业历史服务器:配置 HistoryServer 记录作业历史:

    bash

    # 修改spark-site.xml
    <property><name>spark.history.fs.logDirectory</name><value>hdfs://spark-node1:8020/spark-logs</value>
    </property>
    /export/server/spark/sbin/start-history-server.sh
    

8.2 安全与资源管理

8.2.1 启用 Kerberos 认证

bash

# 配置spark-env.sh
export SPARK_KERBEROS_PRINCIPAL="spark@EXAMPLE.COM"
export SPARK_KERBEROS_KEYTAB="/etc/kerberos/keytabs/spark.keytab"# 提交任务时指定认证
kinit -kt spark.keytab spark@EXAMPLE.COM
/export/server/spark/bin/spark-submit --master yarn --deploy-mode cluster ...
8.2.2 资源队列管理

通过 YARN 队列管理不同应用资源(需提前配置 YARN 队列):

bash

/export/server/spark/bin/spark-submit \
--master yarn --deploy-mode cluster \
--queue production \
--resources memory=8g,vcores=4 \
--class com.example.MyApp my-app.jar

九、总结:Spark 集群部署核心流程

通过以上步骤,可构建一个高可用、高性能的 Spark 分布式计算集群,支撑大规模数据处理与分析任务。生产环境中需结合业务场景优化资源分配、内存使用及任务调度策略,并利用 Spark 生态工具(如 Spark SQL、MLlib)提升开发效率。参考官方文档(Spark Documentation)可进一步学习流处理、机器学习等高级特性及性能调优技巧。

http://www.xdnf.cn/news/961525.html

相关文章:

  • 什么是MongoDB
  • freeCAD 学习 step1
  • 【FFmpeg学习(2)】视频概念
  • 雨季智慧交通:从车辆盲区到客流统计的算法全覆盖
  • ubuntu + nginx 1.26 + php7.4 + mysql8.0 调优
  • Cypher 查询语言完全指南(2024最新版)—— Neo4j 图数据库实战教程
  • Unity | AmplifyShaderEditor插件基础(第八集:噪声波动shader)
  • QT中实现tcp连接
  • MongoDB数据库应用
  • AO810 AO810V2 - ABB Ability系统800xA硬件
  • 校园导航系统核心技术解析:高精度定位与 AR 实景导航的应用实践
  • 【动作】AVA:时空定位原子视觉动作视频数据集
  • 【Dv3Admin】系统视图角色菜单按钮权限API文件解析
  • Doris “_stream_load“ 方式批量导入数据
  • Remmina远程访问如何开启本地音频?
  • (41)课60--61高级篇: MySQL体系结构(连接层、服务层、引擎层、存储层)。存储引擎是基于表的,可为不同表指定不同的存储引擎;查询表结构语句 show create table 表名
  • #Word“嵌入式”插图显示不全的解决教程
  • 在Word中使用 Microsoft Print to PDF和另存为PDF两种方式生成的 PDF文件
  • ubuntu24安装TensorRT
  • ubuntu24.04安装IDEA2025.1.2搭建java开发环境
  • 数据结构-链表OJ-回文链表,如何将时间复杂度控制为O(N),空间复杂度控制为O(1)?
  • POI设置Excel单元格背景色
  • DataFrame中.iloc 属性
  • HTAP 技术:融合事务与分析的数据处理新范式
  • 【数据篇】持久化核心:整合 JPA/MyBatis 实现优雅的数据库操作
  • pcie问答--0609
  • 激光隐形切割(Stealth Dicing)技术
  • Oracle数据库对IPv6的支持情况
  • 造成服务器重启的原因都有哪些?
  • Lang*生态系统多个专业框架及他们的作用