Delta Lake 数据湖
什么是 Delta Lake
Delta Lake 是由 Databricks 开源的开放格式存储层,在 Parquet 文件之上增加了事务日志(Transaction Log),为数据湖带来了 ACID 事务、Schema 强制、时间旅行等数据仓库特性。
解决的核心问题:
- 数据湖没有 ACID 事务,并发写入会导致数据损坏
- 无法高效地更新/删除数据
- Schema 不一致导致下游读取失败
- 无法查询历史版本数据
核心特性
ACID 事务
python
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 写入 Delta 表(原子性保证)
df.write.format("delta").mode("overwrite").save("/data/delta/users")
# 并发写入安全(乐观并发控制)
df1.write.format("delta").mode("append").save("/data/delta/users")
df2.write.format("delta").mode("append").save("/data/delta/users")Upsert(Merge)操作
python
from delta.tables import DeltaTable
deltaTable = DeltaTable.forPath(spark, "/data/delta/users")
# 增量数据
updates = spark.createDataFrame([
(1, "Alice Updated", 29),
(4, "Dave New", 25),
], ["id", "name", "age"])
# Merge:存在则更新,不存在则插入
deltaTable.alias("target").merge(
updates.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"age": "source.age"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"age": "source.age"
}).execute()时间旅行(Time Travel)
python
# 读取历史版本(按版本号)
df_v0 = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/data/delta/users")
# 读取历史版本(按时间戳)
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("/data/delta/users")
# 查看历史记录
deltaTable = DeltaTable.forPath(spark, "/data/delta/users")
deltaTable.history().show()+-------+-------------------+------+--------+--------------------+
|version| timestamp|userId|userName| operation|
+-------+-------------------+------+--------+--------------------+
| 3|2024-01-03 10:00:00| null| null| MERGE|
| 2|2024-01-02 10:00:00| null| null| DELETE|
| 1|2024-01-01 10:00:00| null| null| UPDATE|
| 0|2024-01-01 09:00:00| null| null| WRITE|
+-------+-------------------+------+--------+--------------------+Schema 强制与演进
python
# Schema 强制:写入不匹配的数据会报错
df_wrong_schema.write.format("delta") \
.mode("append") \
.save("/data/delta/users")
# AnalysisException: A schema mismatch detected when writing to the Delta table
# Schema 演进:允许自动添加新列
df_new_columns.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save("/data/delta/users")事务日志原理
Delta Lake 的核心是 _delta_log 目录下的 JSON 事务日志:
/data/delta/users/
├── _delta_log/
│ ├── 00000000000000000000.json ← 版本 0:初始写入
│ ├── 00000000000000000001.json ← 版本 1:UPDATE
│ ├── 00000000000000000002.json ← 版本 2:DELETE
│ ├── 00000000000000000003.json ← 版本 3:MERGE
│ └── 00000000000000000010.checkpoint.parquet ← 每 10 个版本生成 checkpoint
├── part-00000-xxx.parquet
├── part-00001-xxx.parquet
└── ...每个 JSON 日志文件记录:
add:新增的 Parquet 文件remove:删除的 Parquet 文件metaData:Schema 变更commitInfo:操作信息(时间、用户、操作类型)
SQL 操作
sql
-- 创建 Delta 表
CREATE TABLE users (
id INT,
name STRING,
age INT
) USING DELTA
PARTITIONED BY (dt STRING)
LOCATION '/data/delta/users';
-- 插入数据
INSERT INTO users VALUES (1, 'Alice', 28, '2024-01-01');
-- 更新数据
UPDATE users SET age = 29 WHERE id = 1;
-- 删除数据
DELETE FROM users WHERE id = 1;
-- Merge
MERGE INTO users AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- 时间旅行
SELECT * FROM users VERSION AS OF 0;
SELECT * FROM users TIMESTAMP AS OF '2024-01-01';
-- 查看历史
DESCRIBE HISTORY users;
-- 清理旧版本文件(保留最近 7 天)
VACUUM users RETAIN 168 HOURS;
-- 优化(合并小文件)
OPTIMIZE users;
-- Z-Order 优化(多维聚簇,提升过滤性能)
OPTIMIZE users ZORDER BY (id, dt);与 Spark 集成
python
# Maven 依赖(pom.xml)
# io.delta:delta-spark_2.12:3.0.0
# 创建 SparkSession
spark = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 流式写入 Delta(Structured Streaming)
query = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user_events") \
.load() \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/users") \
.start("/data/delta/users")
# 流式读取 Delta(Change Data Feed)
spark.readStream \
.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.load("/data/delta/users") \
.writeStream \
.format("console") \
.start()Delta Lake vs Iceberg vs Hudi
| 特性 | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| 开源方 | Databricks | Netflix | Uber |
| ACID 事务 | ✅ | ✅ | ✅ |
| 时间旅行 | ✅ | ✅ | ✅ |
| Schema 演进 | ✅ | ✅(更强) | ✅ |
| Upsert | ✅ | ✅ | ✅(更成熟) |
| Spark 集成 | 最佳 | 好 | 好 |
| Flink 集成 | 好 | 好 | 好 |
| 流式写入 | ✅ | ✅ | ✅(专长) |
| 社区活跃度 | 高 | 高 | 高 |
选型建议:
- Spark 为主的数仓 → Delta Lake
- 需要强 Schema 演进 → Iceberg
- 实时 CDC 场景 → Hudi