Skip to content

Delta Lake 数据湖

什么是 Delta Lake

Delta Lake 是由 Databricks 开源的开放格式存储层,在 Parquet 文件之上增加了事务日志(Transaction Log),为数据湖带来了 ACID 事务、Schema 强制、时间旅行等数据仓库特性。

解决的核心问题

  • 数据湖没有 ACID 事务,并发写入会导致数据损坏
  • 无法高效地更新/删除数据
  • Schema 不一致导致下游读取失败
  • 无法查询历史版本数据

核心特性

ACID 事务

python
from delta.tables import DeltaTable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 写入 Delta 表(原子性保证)
df.write.format("delta").mode("overwrite").save("/data/delta/users")

# 并发写入安全(乐观并发控制)
df1.write.format("delta").mode("append").save("/data/delta/users")
df2.write.format("delta").mode("append").save("/data/delta/users")

Upsert(Merge)操作

python
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/data/delta/users")

# 增量数据
updates = spark.createDataFrame([
    (1, "Alice Updated", 29),
    (4, "Dave New", 25),
], ["id", "name", "age"])

# Merge:存在则更新,不存在则插入
deltaTable.alias("target").merge(
    updates.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "age": "source.age"
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "name": "source.name",
    "age": "source.age"
}).execute()

时间旅行(Time Travel)

python
# 读取历史版本(按版本号)
df_v0 = spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/data/delta/users")

# 读取历史版本(按时间戳)
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load("/data/delta/users")

# 查看历史记录
deltaTable = DeltaTable.forPath(spark, "/data/delta/users")
deltaTable.history().show()
+-------+-------------------+------+--------+--------------------+
|version|          timestamp|userId|userName|           operation|
+-------+-------------------+------+--------+--------------------+
|      3|2024-01-03 10:00:00|  null|    null|               MERGE|
|      2|2024-01-02 10:00:00|  null|    null|              DELETE|
|      1|2024-01-01 10:00:00|  null|    null|              UPDATE|
|      0|2024-01-01 09:00:00|  null|    null|               WRITE|
+-------+-------------------+------+--------+--------------------+

Schema 强制与演进

python
# Schema 强制:写入不匹配的数据会报错
df_wrong_schema.write.format("delta") \
    .mode("append") \
    .save("/data/delta/users")
# AnalysisException: A schema mismatch detected when writing to the Delta table

# Schema 演进:允许自动添加新列
df_new_columns.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("append") \
    .save("/data/delta/users")

事务日志原理

Delta Lake 的核心是 _delta_log 目录下的 JSON 事务日志:

/data/delta/users/
├── _delta_log/
│   ├── 00000000000000000000.json  ← 版本 0:初始写入
│   ├── 00000000000000000001.json  ← 版本 1:UPDATE
│   ├── 00000000000000000002.json  ← 版本 2:DELETE
│   ├── 00000000000000000003.json  ← 版本 3:MERGE
│   └── 00000000000000000010.checkpoint.parquet  ← 每 10 个版本生成 checkpoint
├── part-00000-xxx.parquet
├── part-00001-xxx.parquet
└── ...

每个 JSON 日志文件记录:

  • add:新增的 Parquet 文件
  • remove:删除的 Parquet 文件
  • metaData:Schema 变更
  • commitInfo:操作信息(时间、用户、操作类型)

SQL 操作

sql
-- 创建 Delta 表
CREATE TABLE users (
    id    INT,
    name  STRING,
    age   INT
) USING DELTA
PARTITIONED BY (dt STRING)
LOCATION '/data/delta/users';

-- 插入数据
INSERT INTO users VALUES (1, 'Alice', 28, '2024-01-01');

-- 更新数据
UPDATE users SET age = 29 WHERE id = 1;

-- 删除数据
DELETE FROM users WHERE id = 1;

-- Merge
MERGE INTO users AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- 时间旅行
SELECT * FROM users VERSION AS OF 0;
SELECT * FROM users TIMESTAMP AS OF '2024-01-01';

-- 查看历史
DESCRIBE HISTORY users;

-- 清理旧版本文件(保留最近 7 天)
VACUUM users RETAIN 168 HOURS;

-- 优化(合并小文件)
OPTIMIZE users;

-- Z-Order 优化(多维聚簇,提升过滤性能)
OPTIMIZE users ZORDER BY (id, dt);

与 Spark 集成

python
# Maven 依赖(pom.xml)
# io.delta:delta-spark_2.12:3.0.0

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("DeltaLakeDemo") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 流式写入 Delta(Structured Streaming)
query = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user_events") \
    .load() \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/users") \
    .start("/data/delta/users")

# 流式读取 Delta(Change Data Feed)
spark.readStream \
    .format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 0) \
    .load("/data/delta/users") \
    .writeStream \
    .format("console") \
    .start()

Delta Lake vs Iceberg vs Hudi

特性Delta LakeApache IcebergApache Hudi
开源方DatabricksNetflixUber
ACID 事务
时间旅行
Schema 演进✅(更强)
Upsert✅(更成熟)
Spark 集成最佳
Flink 集成
流式写入✅(专长)
社区活跃度

选型建议

  • Spark 为主的数仓 → Delta Lake
  • 需要强 Schema 演进 → Iceberg
  • 实时 CDC 场景 → Hudi

本站内容由 褚成志 整理编写,仅供学习参考