logged in 2023 and written with Claude in 2026.
本文是学习 Apache Spark 过程中的系统性笔记,涵盖 Spark 编程模型(RDD / DataFrame / Dataset)、集群搭建、性能优化、内存管理,以及与 Hadoop / Hive / Impala / Kafka 等生态的集成。
1. 概述
Apache Spark 是一个面向大规模数据处理的统一分析引擎,具备以下特点:
- 通用性:同时支持 SQL 查询、流式计算、机器学习、图计算等场景
- 多数据源:可运行在 HDFS、Hive、HBase、Kafka、S3 等绝大多数数据源之上
- 高性能:基于内存计算,相比 MapReduce 有数量级的性能提升
2. 核心数据抽象:RDD / Dataset / DataFrame
Spark 提供三种核心数据抽象,三者均具备**惰性求值(Lazy Evaluation)**机制——在调用 map 等 Transformation 时不会立即执行,只有遇到 foreach 等 Action 时才真正触发计算。
| 抽象 | 特点 |
|---|---|
| RDD | 最底层抽象,不支持 SQL;通过粗粒度变换构建数据血缘(Lineage),节点故障时可从血缘重算恢复 |
| Dataset | 分布式数据集合,知道数据存在但不知道如何组织;强类型,编译期检查 |
| DataFrame | Dataset[Row] 的别名,每行按列名组织,类型固定;支持 SQL 查询,是最常用的抽象 |
DataFrame 与 Dataset 拥有完全相同的成员函数,区别仅在于每行数据类型:DataFrame 每行类型为 Row,需通过 getAs 方法或模式匹配取出特定字段。
RDD 的设计参考了 DryadLINQ 的 API 风格。
参考:Dataset vs DataFrame 对比(知乎)
3. Spark 编程模型
3.1 入口:SparkSession
SparkSession 是使用 Dataset / DataFrame API 进行 Spark 编程的统一入口,只在 Driver(Master)进程中可见。
3.2 Transformation vs Action
Spark 的所有操作分为两类:
- Transformation:将一个 RDD/DataFrame 转换为另一个,仅记录 Lineage,不触发计算(如
map、filter、join) - Action:将 RDD/DataFrame 转换为其他数据形式,触发真正的计算(如
foreach、count、collect)
每次 Action 调用对应一个 Spark Job。
3.3 Job 分解:Application → Job → Stage → Task
Application
└── Job(每次 Action 调用 = 一个 Job)
└── Stage(每个 Shuffle 依赖 = 一个 Stage 边界)
└── Task(每个并行计算单元;同一 Stage 内所有 Task 执行相同代码,处理不同数据分片)
每个 Node 是一个独立的 JVM 进程,内部运行若干 Executor,每个 Executor 将 Stage 内的任务拆分为多个 Task 并行执行。
4. Shuffle 机制与宽窄依赖
4.1 宽依赖 vs 窄依赖
- 窄依赖:子分区数据只依赖一个父分区,无需跨节点数据移动(如
map、filter) - 宽依赖:子分区依赖多个父分区,需要 Shuffle 数据(如
groupByKey、join)
Shuffle 是划分 DAG Stage 的标识,会产生大量网络 I/O,是影响 Spark 性能的关键因素。
4.2 会触发 Shuffle 的 Transformation
以下操作会引发 Shuffle,使用时需格外注意:
repartitionjoin、cogroup- 所有
*By/*ByKey系列操作(groupByKey、reduceByKey、sortByKey等)
4.3 减少 Shuffle 的常见优化手段
① 用 reduceByKey 替代 groupByKey
// 低效:全量数据通过网络传输后再聚合
rdd.groupByKey().mapValues(_.sum)
// 高效:先在各 partition 局部累积,Shuffle 后再合并
rdd.reduceByKey(_ + _)
② 避免"新增列后 join 回来"的写法,直接在原数据上生成新列。
③ 小表广播(Broadcast Join)
当 join 的一侧数据集足够小,可以用 broadcast 将其复制到每个 Executor 内存,避免 Shuffle:
val broadcastSmall = spark.sparkContext.broadcast(smallRdd)
largeRdd.map { row =>
val small = broadcastSmall.value
// ...
}
参考:Spark 优化(知乎)
5. 性能优化
5.1 cache vs persist
cache():将 RDD/DataFrame 缓存到内存,等价于persist(StorageLevel.MEMORY_ONLY)persist(level):支持指定存储级别(内存、磁盘、序列化等),灵活性更高
5.2 broadcast 广播变量
broadcast 将变量复制到每个 Executor(而非每个 Task),大幅减少内存占用和网络传输。
- 适用于 RDD、Dataset、DataFrame,也适用于普通 Scala / Java 对象
- 限制:广播对象必须是 不可变(immutable) 的
5.3 foreachPartition
foreachPartition 将数据分散到各 Slave 节点上执行,相比 foreach 减少了 Driver 与 Executor 之间的通信开销,适合写数据库等场景。
5.4 关注 Stage 数量与耗时
性能分析时,需重点关注:
- 聚合操作产生的 Stage 数量及各 Stage 耗时
- 尽量减少宽依赖(即减少 Stage 数量)
- 了解数据特点,明确处理目标
6. 内存管理
Spark Executor 的内存被划分为多个区域(Storage Memory、Execution Memory、User Memory 等),理解内存模型对调优至关重要。
参考:
7. SparkSQL 实战示例
7.1 连接 Hive 并执行查询
val spark = SparkSession.builder()
.appName("MyApp")
.enableHiveSupport()
.getOrCreate()
// 查询任务表
val df = spark.sql("SELECT * FROM demo_match_task WHERE status = 0")
df.show()
7.2 读取 JSON 并注册临时视图
val df = spark.read.json("task.json")
df.show()
val t = df.filter("status == 0")
t.show()
t.createOrReplaceTempView("task")
spark.sql("SELECT * FROM task").show()
7.3 典型 ETL 流程示例
-- 对任务表循环执行匹配逻辑
SELECT *
FROM user_tag t
LEFT JOIN (
SELECT *
FROM match_orig_data a
WHERE a.file_id = ?
LEFT JOIN user_table b ON a.user_id = b.user_id
) c ON t.user_id = c.user_id;
-- 写入结果后更新任务状态
UPDATE demo_match_task SET status = 1 WHERE task_id = ?;
8. 集群搭建(Hadoop + Hive + Spark)
8.1 Hadoop
- 注意将域名暴露为自己的 IP 或
0.0.0.0 - 参考:Hadoop 搭建(知乎) | Hadoop 搭建(SegmentFault)
集群 Web UI 地址(示例):
| 服务 | 地址 |
|---|---|
| Hadoop NameNode | 192.168.xx.1:50070 |
| YARN ResourceManager | 192.168.xx.1:8088 |
| Spark Master | 192.168.xx.1:8080 |
8.2 Hive(采用 MySQL 管理 Metastore)
8.3 Spark 集成 Hive
参考:Spark Integration with Hive
8.4 Spark 任务提交
spark-submit \
--class com.example.MyApp \
--master spark://192.168.xx.1:7077 \
--executor-memory 4G \
--total-executor-cores 8 \
/path/to/your_app.jar
重要参数说明:
| 参数 | 说明 |
|---|---|
--class |
主类全限定名 |
--master |
Spark Master 地址 |
--executor-memory |
每个 Executor 分配的内存 |
--total-executor-cores |
总 CPU 核数 |
参考:Submitting Applications 官方文档
8.5 打包注意事项
- 使用
maven-assembly-plugin打包任务 JAR - 推荐使用
maven-shade-plugin打 Uber JAR,支持包重命名,避免类冲突 - Spark 客户端版本尽量与集群版本保持一致
9. 生态集成
9.1 Hadoop 与 MapReduce
Hadoop 本质是:HDFS 分布式文件系统 + MapReduce 计算引擎 + YARN 资源管理。
MapReduce 是一种面向分布式的底层编程模型(Programming Model,指通过类似库调用的方式触发执行),对复杂计算场景开发成本较高。Hive 本质是 SQL → MapReduce → 任务解析 的中间件,降低了使用门槛。
9.2 Kafka 集成
9.3 Impala vs Hive
Impala 是为解决 Hive 交互式查询性能问题而生的方案:复用 Hive 的元数据管理,但跳过 MapReduce,使用自研执行引擎直接读取数据,延迟更低。
使用 JDBC 连接 Hive / Impala 时,若集群开启 Kerberos,需在连接前完成认证。
参考:Impala和Hive的关系 - 山中何事 - 博客园
9.4 Kudu vs HBase
| 维度 | Kudu | HBase |
|---|---|---|
| 存储模型 | 列式,支持随机读写 | 列式,优化随机读写 |
| 适用场景 | 同时有顺序和随机读写;数据更新时效性要求高 | 海量数据随机读写 |
| 典型场景 | 时序数据分析、实时报表、ODS | 用户画像、日志存储 |
参考:Kudu 介绍(知乎) | Kudu vs HBase(腾讯云)
10. 时间戳问题
跨组件时间戳处理是大数据开发中的常见坑点,各层行为不一致。
| 组件 | 时间戳行为 |
|---|---|
| MySQL | TIMESTAMP 类型与时区相关,存储时转换为 UTC |
| Hive | TIMESTAMP 无时区概念,Parquet 格式会影响其行为 |
| Spark | 读取无时区时间戳时按 UTC 处理;写入 Hive 时可能因时区设置附加偏移 |
解决思路:统一将时间戳转换到 UTC 后再处理,避免跨层时区混乱。
参考文献
以下链接在正文中未直接引用,但均来自原始学习记录,供延伸阅读。
Spark 官方文档
Hadoop 生态
列式存储
Kudu
Kerberos 认证
ETL
最后修改于 2023-01-01