Apache Spark 学习笔记

logged in 2023 and written with Claude in 2026.

本文是学习 Apache Spark 过程中的系统性笔记,涵盖 Spark 编程模型(RDD / DataFrame / Dataset)、集群搭建、性能优化、内存管理,以及与 Hadoop / Hive / Impala / Kafka 等生态的集成。


1. 概述

Apache Spark 是一个面向大规模数据处理的统一分析引擎,具备以下特点:

  • 通用性:同时支持 SQL 查询、流式计算、机器学习、图计算等场景
  • 多数据源:可运行在 HDFS、Hive、HBase、Kafka、S3 等绝大多数数据源之上
  • 高性能:基于内存计算,相比 MapReduce 有数量级的性能提升

2. 核心数据抽象:RDD / Dataset / DataFrame

Spark 提供三种核心数据抽象,三者均具备**惰性求值(Lazy Evaluation)**机制——在调用 map 等 Transformation 时不会立即执行,只有遇到 foreach 等 Action 时才真正触发计算。

抽象 特点
RDD 最底层抽象,不支持 SQL;通过粗粒度变换构建数据血缘(Lineage),节点故障时可从血缘重算恢复
Dataset 分布式数据集合,知道数据存在但不知道如何组织;强类型,编译期检查
DataFrame Dataset[Row] 的别名,每行按列名组织,类型固定;支持 SQL 查询,是最常用的抽象

DataFrame 与 Dataset 拥有完全相同的成员函数,区别仅在于每行数据类型:DataFrame 每行类型为 Row,需通过 getAs 方法或模式匹配取出特定字段。

RDD 的设计参考了 DryadLINQ 的 API 风格。

参考:Dataset vs DataFrame 对比(知乎)


3. Spark 编程模型

3.1 入口:SparkSession

SparkSession 是使用 Dataset / DataFrame API 进行 Spark 编程的统一入口,只在 Driver(Master)进程中可见

3.2 Transformation vs Action

Spark 的所有操作分为两类:

  • Transformation:将一个 RDD/DataFrame 转换为另一个,仅记录 Lineage,不触发计算(如 mapfilterjoin
  • Action:将 RDD/DataFrame 转换为其他数据形式,触发真正的计算(如 foreachcountcollect

每次 Action 调用对应一个 Spark Job

3.3 Job 分解:Application → Job → Stage → Task

Application
  └── Job(每次 Action 调用 = 一个 Job)
        └── Stage(每个 Shuffle 依赖 = 一个 Stage 边界)
              └── Task(每个并行计算单元;同一 Stage 内所有 Task 执行相同代码,处理不同数据分片)

每个 Node 是一个独立的 JVM 进程,内部运行若干 Executor,每个 Executor 将 Stage 内的任务拆分为多个 Task 并行执行。


4. Shuffle 机制与宽窄依赖

4.1 宽依赖 vs 窄依赖

  • 窄依赖:子分区数据只依赖一个父分区,无需跨节点数据移动(如 mapfilter
  • 宽依赖:子分区依赖多个父分区,需要 Shuffle 数据(如 groupByKeyjoin

Shuffle 是划分 DAG Stage 的标识,会产生大量网络 I/O,是影响 Spark 性能的关键因素。

4.2 会触发 Shuffle 的 Transformation

以下操作会引发 Shuffle,使用时需格外注意:

  • repartition
  • joincogroup
  • 所有 *By / *ByKey 系列操作(groupByKeyreduceByKeysortByKey 等)

4.3 减少 Shuffle 的常见优化手段

① 用 reduceByKey 替代 groupByKey

// 低效:全量数据通过网络传输后再聚合
rdd.groupByKey().mapValues(_.sum)

// 高效:先在各 partition 局部累积,Shuffle 后再合并
rdd.reduceByKey(_ + _)

② 避免"新增列后 join 回来"的写法,直接在原数据上生成新列。

③ 小表广播(Broadcast Join)

当 join 的一侧数据集足够小,可以用 broadcast 将其复制到每个 Executor 内存,避免 Shuffle:

val broadcastSmall = spark.sparkContext.broadcast(smallRdd)
largeRdd.map { row =>
  val small = broadcastSmall.value
  // ...
}

参考:Spark 优化(知乎)


5. 性能优化

5.1 cache vs persist

  • cache():将 RDD/DataFrame 缓存到内存,等价于 persist(StorageLevel.MEMORY_ONLY)
  • persist(level):支持指定存储级别(内存、磁盘、序列化等),灵活性更高

参考:cache vs persist 详解

5.2 broadcast 广播变量

broadcast 将变量复制到每个 Executor(而非每个 Task),大幅减少内存占用和网络传输。

  • 适用于 RDD、Dataset、DataFrame,也适用于普通 Scala / Java 对象
  • 限制:广播对象必须是 不可变(immutable)

5.3 foreachPartition

foreachPartition 将数据分散到各 Slave 节点上执行,相比 foreach 减少了 Driver 与 Executor 之间的通信开销,适合写数据库等场景。

5.4 关注 Stage 数量与耗时

性能分析时,需重点关注:

  • 聚合操作产生的 Stage 数量及各 Stage 耗时
  • 尽量减少宽依赖(即减少 Stage 数量)
  • 了解数据特点,明确处理目标

6. 内存管理

Spark Executor 的内存被划分为多个区域(Storage Memory、Execution Memory、User Memory 等),理解内存模型对调优至关重要。

参考:


7. SparkSQL 实战示例

7.1 连接 Hive 并执行查询

val spark = SparkSession.builder()
  .appName("MyApp")
  .enableHiveSupport()
  .getOrCreate()

// 查询任务表
val df = spark.sql("SELECT * FROM demo_match_task WHERE status = 0")
df.show()

参考:Spark SQL Hive Tables 官方文档

7.2 读取 JSON 并注册临时视图

val df = spark.read.json("task.json")
df.show()

val t = df.filter("status == 0")
t.show()

t.createOrReplaceTempView("task")
spark.sql("SELECT * FROM task").show()

7.3 典型 ETL 流程示例

-- 对任务表循环执行匹配逻辑
SELECT *
FROM user_tag t
LEFT JOIN (
  SELECT *
  FROM match_orig_data a
  WHERE a.file_id = ?
  LEFT JOIN user_table b ON a.user_id = b.user_id
) c ON t.user_id = c.user_id;

-- 写入结果后更新任务状态
UPDATE demo_match_task SET status = 1 WHERE task_id = ?;

8. 集群搭建(Hadoop + Hive + Spark)

8.1 Hadoop

集群 Web UI 地址(示例):

服务 地址
Hadoop NameNode 192.168.xx.1:50070
YARN ResourceManager 192.168.xx.1:8088
Spark Master 192.168.xx.1:8080

8.2 Hive(采用 MySQL 管理 Metastore)

参考:Hive + MySQL Metastore 配置

8.3 Spark 集成 Hive

参考:Spark Integration with Hive

8.4 Spark 任务提交

spark-submit \
  --class com.example.MyApp \
  --master spark://192.168.xx.1:7077 \
  --executor-memory 4G \
  --total-executor-cores 8 \
  /path/to/your_app.jar

重要参数说明:

参数 说明
--class 主类全限定名
--master Spark Master 地址
--executor-memory 每个 Executor 分配的内存
--total-executor-cores 总 CPU 核数

参考:Submitting Applications 官方文档

8.5 打包注意事项

  • 使用 maven-assembly-plugin 打包任务 JAR
  • 推荐使用 maven-shade-plugin 打 Uber JAR,支持包重命名,避免类冲突
  • Spark 客户端版本尽量与集群版本保持一致

9. 生态集成

9.1 Hadoop 与 MapReduce

Hadoop 本质是:HDFS 分布式文件系统 + MapReduce 计算引擎 + YARN 资源管理

MapReduce 是一种面向分布式的底层编程模型(Programming Model,指通过类似库调用的方式触发执行),对复杂计算场景开发成本较高。Hive 本质是 SQL → MapReduce → 任务解析 的中间件,降低了使用门槛。

9.2 Kafka 集成

9.3 Impala vs Hive

Impala 是为解决 Hive 交互式查询性能问题而生的方案:复用 Hive 的元数据管理,但跳过 MapReduce,使用自研执行引擎直接读取数据,延迟更低。

使用 JDBC 连接 Hive / Impala 时,若集群开启 Kerberos,需在连接前完成认证。

参考:Impala和Hive的关系 - 山中何事 - 博客园

9.4 Kudu vs HBase

维度 Kudu HBase
存储模型 列式,支持随机读写 列式,优化随机读写
适用场景 同时有顺序和随机读写;数据更新时效性要求高 海量数据随机读写
典型场景 时序数据分析、实时报表、ODS 用户画像、日志存储

参考:Kudu 介绍(知乎) | Kudu vs HBase(腾讯云)


10. 时间戳问题

跨组件时间戳处理是大数据开发中的常见坑点,各层行为不一致。

组件 时间戳行为
MySQL TIMESTAMP 类型与时区相关,存储时转换为 UTC
Hive TIMESTAMP 无时区概念,Parquet 格式会影响其行为
Spark 读取无时区时间戳时按 UTC 处理;写入 Hive 时可能因时区设置附加偏移

解决思路:统一将时间戳转换到 UTC 后再处理,避免跨层时区混乱。

参考:Hive 不同 TIMESTAMP 类型说明


参考文献

以下链接在正文中未直接引用,但均来自原始学习记录,供延伸阅读。

Spark 官方文档

Hadoop 生态

列式存储

Kudu

Kerberos 认证

ETL


最后修改于 2023-01-01