Spark Core — RDD 与执行引擎
什么是 Spark
Apache Spark 是一个统一的大数据分析引擎,基于内存计算,比 MapReduce 快 10-100 倍。它提供了批处理、流处理、机器学习、图计算的统一 API。
核心优势:
- 内存计算:中间结果缓存在内存,避免反复读写磁盘
- 丰富 API:Scala/Java/Python/R 多语言支持
- 统一引擎:一套代码处理批处理、流处理、ML
- 与 Hadoop 生态无缝集成
架构设计
┌─────────────────────────────────────────────────────────────┐
│ Spark 集群架构 │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Driver Program │ │
│ │ SparkContext → DAGScheduler → TaskScheduler │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ Cluster Manager │
│ (YARN / Standalone / K8s) │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ Worker 节点集群 │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Executor 1 │ │ Executor 2 │ │ │
│ │ │ Task Task Task │ │ Task Task Task │ │ │
│ │ │ [内存缓存] │ │ [内存缓存] │ │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘核心组件:
- Driver:运行用户程序的 main 函数,创建 SparkContext,负责任务调度
- Executor:运行在 Worker 节点上的进程,执行 Task,缓存数据
- SparkContext:Spark 应用的入口,连接 Cluster Manager
- DAGScheduler:将 RDD 依赖关系转换为 Stage DAG
- TaskScheduler:将 Stage 中的 Task 分配给 Executor
RDD(弹性分布式数据集)
RDD 是 Spark 的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。
RDD 五大特性
- 分区列表:数据被切分成多个分区,分布在集群各节点
- 计算函数:每个分区的计算逻辑
- 依赖关系:与父 RDD 的依赖(窄依赖/宽依赖)
- 分区器(可选):Key-Value RDD 的分区策略
- 首选位置(可选):数据本地性,优先在数据所在节点计算
创建 RDD
python
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("SparkDemo").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 从集合创建
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)
# 从文件创建
rdd2 = sc.textFile("hdfs://namenode:9000/data/input.txt")
# 从其他 RDD 转换
rdd3 = rdd2.flatMap(lambda line: line.split(" "))Transformation(转换算子)
转换算子是懒执行的,只记录转换逻辑,不立即计算:
python
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# map:一对一转换
rdd.map(lambda x: x * 2) # [2, 4, 6, 8, ...]
# filter:过滤
rdd.filter(lambda x: x % 2 == 0) # [2, 4, 6, 8, 10]
# flatMap:一对多转换
sc.parallelize(["hello world", "hi spark"]) \
.flatMap(lambda line: line.split(" "))
# ["hello", "world", "hi", "spark"]
# reduceByKey:按 key 聚合
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
pairs.reduceByKey(lambda x, y: x + y) # [("a", 4), ("b", 2)]
# groupByKey:按 key 分组(注意:数据量大时性能差,优先用 reduceByKey)
pairs.groupByKey() # [("a", [1,3]), ("b", [2])]
# sortByKey:按 key 排序
pairs.sortByKey(ascending=True)
# join:两个 RDD 按 key 连接
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", "x"), ("b", "y")])
rdd1.join(rdd2) # [("a", (1, "x")), ("b", (2, "y"))]
# distinct:去重
rdd.distinct()
# union:合并
rdd1.union(rdd2)
# repartition:重新分区(会触发 Shuffle)
rdd.repartition(10)
# coalesce:减少分区(不触发 Shuffle,只合并)
rdd.coalesce(2)Action(行动算子)
行动算子触发实际计算,返回结果到 Driver:
python
rdd = sc.parallelize([1, 2, 3, 4, 5])
# collect:收集所有数据到 Driver(小数据量才用)
rdd.collect() # [1, 2, 3, 4, 5]
# count:计数
rdd.count() # 5
# first:第一个元素
rdd.first() # 1
# take:取前 N 个
rdd.take(3) # [1, 2, 3]
# reduce:聚合
rdd.reduce(lambda x, y: x + y) # 15
# foreach:遍历(在 Executor 端执行)
rdd.foreach(lambda x: print(x))
# saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://namenode:9000/output/")
# countByKey:统计每个 key 的数量
pairs.countByKey() # {"a": 2, "b": 1}RDD 依赖与 Stage 划分
窄依赖 vs 宽依赖
窄依赖(Narrow Dependency):
父 RDD 的每个分区最多被子 RDD 的一个分区使用
map, filter, flatMap, union
不需要 Shuffle,可以流水线执行
父 RDD: [P1] [P2] [P3]
↓ ↓ ↓
子 RDD: [P1] [P2] [P3]
宽依赖(Wide Dependency / Shuffle Dependency):
父 RDD 的每个分区可能被子 RDD 的多个分区使用
groupByKey, reduceByKey, join, repartition
需要 Shuffle,是 Stage 划分的边界
父 RDD: [P1] [P2] [P3]
↘↗ ↘↗ ↘↗
子 RDD: [P1] [P2] [P3]Stage 划分
DAGScheduler 按宽依赖划分 Stage:
RDD 依赖链:
A → B → C → D → E → F
↑ ↑
宽依赖(Shuffle) 宽依赖(Shuffle)
Stage 划分:
Stage 0: A → B → C(ResultStage 或 ShuffleMapStage)
Stage 1: D → E(ShuffleMapStage)
Stage 2: F(ResultStage)缓存与持久化
python
# cache():默认缓存到内存(MEMORY_ONLY)
rdd.cache()
# persist():指定存储级别
from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK) # 内存不够时溢写磁盘
rdd.persist(StorageLevel.MEMORY_ONLY_SER) # 序列化存储,节省内存
rdd.persist(StorageLevel.DISK_ONLY) # 只存磁盘
# 释放缓存
rdd.unpersist()存储级别选择:
- 数据能放入内存 →
MEMORY_ONLY - 内存不够 →
MEMORY_AND_DISK - 内存紧张 →
MEMORY_ONLY_SER(序列化,节省 2-5x 内存) - 需要容错 → 加
_2后缀(如MEMORY_AND_DISK_2,两个副本)
广播变量与累加器
python
# 广播变量:将大变量广播到每个 Executor,避免重复传输
lookup_table = {"a": 1, "b": 2, "c": 3}
broadcast_table = sc.broadcast(lookup_table)
rdd.map(lambda x: broadcast_table.value.get(x, 0))
# 累加器:在 Executor 端累加,Driver 端读取
counter = sc.accumulator(0)
def count_errors(line):
global counter
if "ERROR" in line:
counter += 1
rdd.foreach(count_errors)
print(f"Error count: {counter.value}")Spark 提交命令
bash
# 提交到 YARN
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--conf spark.default.parallelism=200 \
myapp.jar arg1 arg2
# 本地模式(测试)
spark-submit \
--master local[4] \
myapp.py性能调优核心
并行度设置
python
# 全局并行度(影响 RDD 默认分区数)
conf.set("spark.default.parallelism", "200")
# Shuffle 后的分区数
conf.set("spark.sql.shuffle.partitions", "200")
# 建议:分区数 = CPU 核心总数 * 2-3避免数据倾斜
python
# 问题:某个 key 数据量远大于其他 key
# 解决:加随机前缀打散
import random
def add_prefix(pair):
key, value = pair
prefix = random.randint(0, 9)
return (f"{prefix}_{key}", value)
# 第一步:加前缀聚合
result1 = rdd.map(add_prefix).reduceByKey(lambda x, y: x + y)
# 第二步:去前缀再聚合
result2 = result1.map(lambda p: (p[0].split("_")[1], p[1])) \
.reduceByKey(lambda x, y: x + y)减少 Shuffle
python
# 差:groupByKey 会将所有数据 Shuffle 到 Reduce 端
rdd.groupByKey().mapValues(sum)
# 好:reduceByKey 在 Map 端先做本地聚合,减少 Shuffle 数据量
rdd.reduceByKey(lambda x, y: x + y)