Skip to content

Spark Core — RDD 与执行引擎

什么是 Spark

Apache Spark 是一个统一的大数据分析引擎,基于内存计算,比 MapReduce 快 10-100 倍。它提供了批处理、流处理、机器学习、图计算的统一 API。

核心优势

  • 内存计算:中间结果缓存在内存,避免反复读写磁盘
  • 丰富 API:Scala/Java/Python/R 多语言支持
  • 统一引擎:一套代码处理批处理、流处理、ML
  • 与 Hadoop 生态无缝集成

架构设计

┌─────────────────────────────────────────────────────────────┐
│                      Spark 集群架构                          │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                  Driver Program                       │  │
│  │  SparkContext → DAGScheduler → TaskScheduler         │  │
│  └──────────────────────────┬───────────────────────────┘  │
│                             │                               │
│                    Cluster Manager                          │
│                (YARN / Standalone / K8s)                    │
│                             │                               │
│  ┌──────────────────────────▼───────────────────────────┐  │
│  │                  Worker 节点集群                      │  │
│  │  ┌─────────────────┐    ┌─────────────────┐         │  │
│  │  │   Executor 1    │    │   Executor 2    │         │  │
│  │  │  Task Task Task │    │  Task Task Task │         │  │
│  │  │  [内存缓存]      │    │  [内存缓存]      │         │  │
│  │  └─────────────────┘    └─────────────────┘         │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

核心组件

  • Driver:运行用户程序的 main 函数,创建 SparkContext,负责任务调度
  • Executor:运行在 Worker 节点上的进程,执行 Task,缓存数据
  • SparkContext:Spark 应用的入口,连接 Cluster Manager
  • DAGScheduler:将 RDD 依赖关系转换为 Stage DAG
  • TaskScheduler:将 Stage 中的 Task 分配给 Executor

RDD(弹性分布式数据集)

RDD 是 Spark 的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。

RDD 五大特性

  1. 分区列表:数据被切分成多个分区,分布在集群各节点
  2. 计算函数:每个分区的计算逻辑
  3. 依赖关系:与父 RDD 的依赖(窄依赖/宽依赖)
  4. 分区器(可选):Key-Value RDD 的分区策略
  5. 首选位置(可选):数据本地性,优先在数据所在节点计算

创建 RDD

python
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("SparkDemo").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 从集合创建
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=3)

# 从文件创建
rdd2 = sc.textFile("hdfs://namenode:9000/data/input.txt")

# 从其他 RDD 转换
rdd3 = rdd2.flatMap(lambda line: line.split(" "))

Transformation(转换算子)

转换算子是懒执行的,只记录转换逻辑,不立即计算:

python
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# map:一对一转换
rdd.map(lambda x: x * 2)  # [2, 4, 6, 8, ...]

# filter:过滤
rdd.filter(lambda x: x % 2 == 0)  # [2, 4, 6, 8, 10]

# flatMap:一对多转换
sc.parallelize(["hello world", "hi spark"]) \
  .flatMap(lambda line: line.split(" "))
# ["hello", "world", "hi", "spark"]

# reduceByKey:按 key 聚合
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
pairs.reduceByKey(lambda x, y: x + y)  # [("a", 4), ("b", 2)]

# groupByKey:按 key 分组(注意:数据量大时性能差,优先用 reduceByKey)
pairs.groupByKey()  # [("a", [1,3]), ("b", [2])]

# sortByKey:按 key 排序
pairs.sortByKey(ascending=True)

# join:两个 RDD 按 key 连接
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", "x"), ("b", "y")])
rdd1.join(rdd2)  # [("a", (1, "x")), ("b", (2, "y"))]

# distinct:去重
rdd.distinct()

# union:合并
rdd1.union(rdd2)

# repartition:重新分区(会触发 Shuffle)
rdd.repartition(10)

# coalesce:减少分区(不触发 Shuffle,只合并)
rdd.coalesce(2)

Action(行动算子)

行动算子触发实际计算,返回结果到 Driver:

python
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect:收集所有数据到 Driver(小数据量才用)
rdd.collect()  # [1, 2, 3, 4, 5]

# count:计数
rdd.count()  # 5

# first:第一个元素
rdd.first()  # 1

# take:取前 N 个
rdd.take(3)  # [1, 2, 3]

# reduce:聚合
rdd.reduce(lambda x, y: x + y)  # 15

# foreach:遍历(在 Executor 端执行)
rdd.foreach(lambda x: print(x))

# saveAsTextFile:保存到文件
rdd.saveAsTextFile("hdfs://namenode:9000/output/")

# countByKey:统计每个 key 的数量
pairs.countByKey()  # {"a": 2, "b": 1}

RDD 依赖与 Stage 划分

窄依赖 vs 宽依赖

窄依赖(Narrow Dependency):
  父 RDD 的每个分区最多被子 RDD 的一个分区使用
  map, filter, flatMap, union
  不需要 Shuffle,可以流水线执行

  父 RDD:  [P1] [P2] [P3]
              ↓    ↓    ↓
  子 RDD:  [P1] [P2] [P3]

宽依赖(Wide Dependency / Shuffle Dependency):
  父 RDD 的每个分区可能被子 RDD 的多个分区使用
  groupByKey, reduceByKey, join, repartition
  需要 Shuffle,是 Stage 划分的边界

  父 RDD:  [P1] [P2] [P3]
              ↘↗  ↘↗  ↘↗
  子 RDD:  [P1] [P2] [P3]

Stage 划分

DAGScheduler 按宽依赖划分 Stage:

RDD 依赖链:
A → B → C → D → E → F
        ↑           ↑
    宽依赖(Shuffle)  宽依赖(Shuffle)

Stage 划分:
Stage 0: A → B → C(ResultStage 或 ShuffleMapStage)
Stage 1: D → E(ShuffleMapStage)
Stage 2: F(ResultStage)

缓存与持久化

python
# cache():默认缓存到内存(MEMORY_ONLY)
rdd.cache()

# persist():指定存储级别
from pyspark import StorageLevel

rdd.persist(StorageLevel.MEMORY_AND_DISK)  # 内存不够时溢写磁盘
rdd.persist(StorageLevel.MEMORY_ONLY_SER)  # 序列化存储,节省内存
rdd.persist(StorageLevel.DISK_ONLY)        # 只存磁盘

# 释放缓存
rdd.unpersist()

存储级别选择

  • 数据能放入内存 → MEMORY_ONLY
  • 内存不够 → MEMORY_AND_DISK
  • 内存紧张 → MEMORY_ONLY_SER(序列化,节省 2-5x 内存)
  • 需要容错 → 加 _2 后缀(如 MEMORY_AND_DISK_2,两个副本)

广播变量与累加器

python
# 广播变量:将大变量广播到每个 Executor,避免重复传输
lookup_table = {"a": 1, "b": 2, "c": 3}
broadcast_table = sc.broadcast(lookup_table)

rdd.map(lambda x: broadcast_table.value.get(x, 0))

# 累加器:在 Executor 端累加,Driver 端读取
counter = sc.accumulator(0)

def count_errors(line):
    global counter
    if "ERROR" in line:
        counter += 1

rdd.foreach(count_errors)
print(f"Error count: {counter.value}")

Spark 提交命令

bash
# 提交到 YARN
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.sql.shuffle.partitions=200 \
  --conf spark.default.parallelism=200 \
  myapp.jar arg1 arg2

# 本地模式(测试)
spark-submit \
  --master local[4] \
  myapp.py

性能调优核心

并行度设置

python
# 全局并行度(影响 RDD 默认分区数)
conf.set("spark.default.parallelism", "200")

# Shuffle 后的分区数
conf.set("spark.sql.shuffle.partitions", "200")

# 建议:分区数 = CPU 核心总数 * 2-3

避免数据倾斜

python
# 问题:某个 key 数据量远大于其他 key
# 解决:加随机前缀打散

import random

def add_prefix(pair):
    key, value = pair
    prefix = random.randint(0, 9)
    return (f"{prefix}_{key}", value)

# 第一步:加前缀聚合
result1 = rdd.map(add_prefix).reduceByKey(lambda x, y: x + y)

# 第二步:去前缀再聚合
result2 = result1.map(lambda p: (p[0].split("_")[1], p[1])) \
                 .reduceByKey(lambda x, y: x + y)

减少 Shuffle

python
# 差:groupByKey 会将所有数据 Shuffle 到 Reduce 端
rdd.groupByKey().mapValues(sum)

# 好:reduceByKey 在 Map 端先做本地聚合,减少 Shuffle 数据量
rdd.reduceByKey(lambda x, y: x + y)

本站内容由 褚成志 整理编写,仅供学习参考