Apache Iceberg
什么是 Iceberg
Apache Iceberg 是由 Netflix 开源的开放表格式(Open Table Format),专为大规模分析工作负载设计。它在 Parquet/ORC/Avro 文件之上提供了一个统一的表抽象层,支持 ACID 事务、Schema 演进、分区演进等高级特性。
核心优势:
- 隐藏分区(Hidden Partitioning):查询无需感知分区列
- 分区演进:可以修改分区策略而不重写数据
- Schema 演进:安全地添加、删除、重命名列
- 行级删除:支持高效的 DELETE/UPDATE
- 多引擎支持:Spark、Flink、Hive、Trino 均可读写
核心架构
Iceberg 表结构
├── metadata/
│ ├── v1.metadata.json ← 表元数据(Schema、分区规范、快照列表)
│ ├── v2.metadata.json
│ ├── snap-xxx.avro ← 快照清单列表(Manifest List)
│ └── manifest-xxx.avro ← 清单文件(Manifest File,记录数据文件)
└── data/
├── dt=2024-01-01/
│ ├── part-00000.parquet
│ └── part-00001.parquet
└── dt=2024-01-02/
└── part-00000.parquet三层元数据结构:
- Metadata File:表的当前状态(Schema、分区规范、当前快照)
- Manifest List:快照包含的所有 Manifest 文件列表,含统计信息
- Manifest File:数据文件列表,含每个文件的列统计信息(min/max/null count)
Spark 集成
python
# 依赖:org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.0
spark = SparkSession.builder \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "/data/iceberg/warehouse") \
.getOrCreate()DDL 操作
sql
-- 创建 Iceberg 表
CREATE TABLE local.db.users (
id BIGINT,
name STRING,
age INT,
email STRING
) USING iceberg
PARTITIONED BY (days(created_at)) -- 隐藏分区:按天分区
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'snappy'
);
-- 分区演进(无需重写数据!)
ALTER TABLE local.db.users
ADD PARTITION FIELD bucket(16, id); -- 增加 bucket 分区
-- Schema 演进
ALTER TABLE local.db.users ADD COLUMN phone STRING AFTER email;
ALTER TABLE local.db.users RENAME COLUMN email TO email_address;
ALTER TABLE local.db.users DROP COLUMN phone;DML 操作
sql
-- 插入
INSERT INTO local.db.users VALUES (1, 'Alice', 28, 'alice@example.com');
-- 更新(行级更新)
UPDATE local.db.users SET age = 29 WHERE id = 1;
-- 删除(行级删除)
DELETE FROM local.db.users WHERE id = 1;
-- Merge
MERGE INTO local.db.users AS t
USING updates AS s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.name = s.name, t.age = s.age
WHEN NOT MATCHED THEN INSERT *;时间旅行
sql
-- 按快照 ID 查询
SELECT * FROM local.db.users VERSION AS OF 1234567890;
-- 按时间戳查询
SELECT * FROM local.db.users TIMESTAMP AS OF '2024-01-01 00:00:00';
-- 查看快照历史
SELECT * FROM local.db.users.snapshots;
-- 查看数据文件
SELECT * FROM local.db.users.files;
-- 查看清单文件
SELECT * FROM local.db.users.manifests;Flink 集成
java
// Flink SQL 使用 Iceberg
TableEnvironment tEnv = TableEnvironment.create(settings);
// 注册 Iceberg Catalog
tEnv.executeSql(
"CREATE CATALOG iceberg_catalog WITH (" +
" 'type'='iceberg'," +
" 'catalog-type'='hadoop'," +
" 'warehouse'='hdfs://namenode:9000/iceberg/warehouse'" +
")"
);
tEnv.executeSql("USE CATALOG iceberg_catalog");
// 流式写入 Iceberg
tEnv.executeSql(
"INSERT INTO db.users " +
"SELECT id, name, age FROM kafka_source"
);隐藏分区
Iceberg 最强大的特性之一:查询时无需指定分区列。
sql
-- 传统 Hive 分区(需要知道分区列)
SELECT * FROM hive_table WHERE dt = '2024-01-01'; -- 必须指定 dt
-- Iceberg 隐藏分区(自动分区裁剪)
SELECT * FROM iceberg_table WHERE created_at >= '2024-01-01';
-- Iceberg 自动识别 created_at 对应 days(created_at) 分区,只扫描相关分区支持的分区转换:
| 转换 | 说明 | 示例 |
|---|---|---|
identity(col) | 直接按列值分区 | PARTITIONED BY (identity(dt)) |
bucket(N, col) | 哈希分桶 | PARTITIONED BY (bucket(16, user_id)) |
truncate(W, col) | 截断字符串/数字 | PARTITIONED BY (truncate(4, user_id)) |
year(col) | 按年分区 | PARTITIONED BY (year(created_at)) |
month(col) | 按月分区 | PARTITIONED BY (month(created_at)) |
days(col) | 按天分区 | PARTITIONED BY (days(created_at)) |
hours(col) | 按小时分区 | PARTITIONED BY (hours(created_at)) |
表维护
sql
-- 过期快照清理(保留最近 7 天)
CALL local.system.expire_snapshots(
table => 'db.users',
older_than => TIMESTAMP '2024-01-01 00:00:00',
retain_last => 10
);
-- 删除孤立文件(不在任何快照中引用的文件)
CALL local.system.remove_orphan_files(table => 'db.users');
-- 合并小文件
CALL local.system.rewrite_data_files(
table => 'db.users',
strategy => 'sort',
sort_order => 'id ASC NULLS LAST'
);
-- 重写清单文件(优化元数据)
CALL local.system.rewrite_manifests(table => 'db.users');Iceberg vs Delta Lake 深度对比
| 维度 | Iceberg | Delta Lake |
|---|---|---|
| 隐藏分区 | ✅ 支持 | ❌ 不支持 |
| 分区演进 | ✅ 无需重写数据 | ❌ 需要重写 |
| Schema 演进 | ✅ 更完整 | ✅ 基本支持 |
| 多引擎支持 | ✅ 最广泛 | ✅ 主要 Spark |
| 行级删除 | ✅ Copy-on-Write / MOR | ✅ |
| 并发控制 | 乐观并发 | 乐观并发 |
| 元数据扩展性 | 更好(文件级统计) | 好 |
| 生态成熟度 | 高 | 高 |
选型建议
如果你的数据平台需要支持多个计算引擎(Spark + Flink + Trino),优先选择 Iceberg,它的多引擎兼容性最好。如果主要用 Spark,Delta Lake 的 Spark 集成体验更流畅。