Skip to content

Apache Iceberg

什么是 Iceberg

Apache Iceberg 是由 Netflix 开源的开放表格式(Open Table Format),专为大规模分析工作负载设计。它在 Parquet/ORC/Avro 文件之上提供了一个统一的表抽象层,支持 ACID 事务、Schema 演进、分区演进等高级特性。

核心优势

  • 隐藏分区(Hidden Partitioning):查询无需感知分区列
  • 分区演进:可以修改分区策略而不重写数据
  • Schema 演进:安全地添加、删除、重命名列
  • 行级删除:支持高效的 DELETE/UPDATE
  • 多引擎支持:Spark、Flink、Hive、Trino 均可读写

核心架构

Iceberg 表结构
├── metadata/
│   ├── v1.metadata.json          ← 表元数据(Schema、分区规范、快照列表)
│   ├── v2.metadata.json
│   ├── snap-xxx.avro             ← 快照清单列表(Manifest List)
│   └── manifest-xxx.avro         ← 清单文件(Manifest File,记录数据文件)
└── data/
    ├── dt=2024-01-01/
    │   ├── part-00000.parquet
    │   └── part-00001.parquet
    └── dt=2024-01-02/
        └── part-00000.parquet

三层元数据结构

  1. Metadata File:表的当前状态(Schema、分区规范、当前快照)
  2. Manifest List:快照包含的所有 Manifest 文件列表,含统计信息
  3. Manifest File:数据文件列表,含每个文件的列统计信息(min/max/null count)

Spark 集成

python
# 依赖:org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.0

spark = SparkSession.builder \
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/data/iceberg/warehouse") \
    .getOrCreate()

DDL 操作

sql
-- 创建 Iceberg 表
CREATE TABLE local.db.users (
    id      BIGINT,
    name    STRING,
    age     INT,
    email   STRING
) USING iceberg
PARTITIONED BY (days(created_at))  -- 隐藏分区:按天分区
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'snappy'
);

-- 分区演进(无需重写数据!)
ALTER TABLE local.db.users
ADD PARTITION FIELD bucket(16, id);  -- 增加 bucket 分区

-- Schema 演进
ALTER TABLE local.db.users ADD COLUMN phone STRING AFTER email;
ALTER TABLE local.db.users RENAME COLUMN email TO email_address;
ALTER TABLE local.db.users DROP COLUMN phone;

DML 操作

sql
-- 插入
INSERT INTO local.db.users VALUES (1, 'Alice', 28, 'alice@example.com');

-- 更新(行级更新)
UPDATE local.db.users SET age = 29 WHERE id = 1;

-- 删除(行级删除)
DELETE FROM local.db.users WHERE id = 1;

-- Merge
MERGE INTO local.db.users AS t
USING updates AS s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.name = s.name, t.age = s.age
WHEN NOT MATCHED THEN INSERT *;

时间旅行

sql
-- 按快照 ID 查询
SELECT * FROM local.db.users VERSION AS OF 1234567890;

-- 按时间戳查询
SELECT * FROM local.db.users TIMESTAMP AS OF '2024-01-01 00:00:00';

-- 查看快照历史
SELECT * FROM local.db.users.snapshots;

-- 查看数据文件
SELECT * FROM local.db.users.files;

-- 查看清单文件
SELECT * FROM local.db.users.manifests;

java
// Flink SQL 使用 Iceberg
TableEnvironment tEnv = TableEnvironment.create(settings);

// 注册 Iceberg Catalog
tEnv.executeSql(
    "CREATE CATALOG iceberg_catalog WITH (" +
    "  'type'='iceberg'," +
    "  'catalog-type'='hadoop'," +
    "  'warehouse'='hdfs://namenode:9000/iceberg/warehouse'" +
    ")"
);

tEnv.executeSql("USE CATALOG iceberg_catalog");

// 流式写入 Iceberg
tEnv.executeSql(
    "INSERT INTO db.users " +
    "SELECT id, name, age FROM kafka_source"
);

隐藏分区

Iceberg 最强大的特性之一:查询时无需指定分区列。

sql
-- 传统 Hive 分区(需要知道分区列)
SELECT * FROM hive_table WHERE dt = '2024-01-01';  -- 必须指定 dt

-- Iceberg 隐藏分区(自动分区裁剪)
SELECT * FROM iceberg_table WHERE created_at >= '2024-01-01';
-- Iceberg 自动识别 created_at 对应 days(created_at) 分区,只扫描相关分区

支持的分区转换

转换说明示例
identity(col)直接按列值分区PARTITIONED BY (identity(dt))
bucket(N, col)哈希分桶PARTITIONED BY (bucket(16, user_id))
truncate(W, col)截断字符串/数字PARTITIONED BY (truncate(4, user_id))
year(col)按年分区PARTITIONED BY (year(created_at))
month(col)按月分区PARTITIONED BY (month(created_at))
days(col)按天分区PARTITIONED BY (days(created_at))
hours(col)按小时分区PARTITIONED BY (hours(created_at))

表维护

sql
-- 过期快照清理(保留最近 7 天)
CALL local.system.expire_snapshots(
    table => 'db.users',
    older_than => TIMESTAMP '2024-01-01 00:00:00',
    retain_last => 10
);

-- 删除孤立文件(不在任何快照中引用的文件)
CALL local.system.remove_orphan_files(table => 'db.users');

-- 合并小文件
CALL local.system.rewrite_data_files(
    table => 'db.users',
    strategy => 'sort',
    sort_order => 'id ASC NULLS LAST'
);

-- 重写清单文件(优化元数据)
CALL local.system.rewrite_manifests(table => 'db.users');

Iceberg vs Delta Lake 深度对比

维度IcebergDelta Lake
隐藏分区✅ 支持❌ 不支持
分区演进✅ 无需重写数据❌ 需要重写
Schema 演进✅ 更完整✅ 基本支持
多引擎支持✅ 最广泛✅ 主要 Spark
行级删除✅ Copy-on-Write / MOR
并发控制乐观并发乐观并发
元数据扩展性更好(文件级统计)
生态成熟度

选型建议

如果你的数据平台需要支持多个计算引擎(Spark + Flink + Trino),优先选择 Iceberg,它的多引擎兼容性最好。如果主要用 Spark,Delta Lake 的 Spark 集成体验更流畅。

本站内容由 褚成志 整理编写,仅供学习参考