Skip to content

Apache Doris — 实时 OLAP

什么是 Apache Doris

Apache Doris(原 Palo,百度开源)是一个现代化的 MPP 分析型数据库,支持实时数据导入和亚秒级查询。它高度兼容 MySQL 协议,使用标准 SQL,是国内最流行的实时 OLAP 引擎之一。

核心优势

  • 实时导入:秒级数据可见
  • MySQL 协议:直接用 MySQL 客户端连接
  • 高并发查询:支持数千 QPS
  • 向量化执行引擎:极致查询性能
  • 流批一体:同时支持实时和批量导入

架构设计

┌─────────────────────────────────────────────────────────────┐
│                      Doris 集群                              │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                  Frontend(FE)                       │  │
│  │  - SQL 解析和查询规划                                 │  │
│  │  - 元数据管理(基于 BDB JE)                          │  │
│  │  - 查询协调                                          │  │
│  │  - Leader FE + Follower FE(高可用)                  │  │
│  └──────────────────────────┬───────────────────────────┘  │
│                             │                               │
│  ┌──────────────────────────▼───────────────────────────┐  │
│  │                  Backend(BE)                        │  │
│  │  - 数据存储(列式,Segment 格式)                     │  │
│  │  - 查询执行(向量化)                                 │  │
│  │  - 数据导入                                          │  │
│  │  BE1  BE2  BE3  BE4  BE5                             │  │
│  └──────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

数据模型

Doris 有三种数据模型:

Duplicate Key(明细模型)

sql
-- 保留所有数据,不去重,适合日志、事件流水
CREATE TABLE user_events (
    event_time  DATETIME,
    user_id     BIGINT,
    event_type  VARCHAR(50),
    page_url    VARCHAR(200),
    amount      DECIMAL(10, 2)
)
DUPLICATE KEY(event_time, user_id)  -- 排序键(非唯一)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
    "replication_num" = "3",
    "storage_format" = "V2"
);

Aggregate Key(聚合模型)

sql
-- 相同 Key 的数据自动聚合,适合预聚合报表
CREATE TABLE daily_stats (
    dt          DATE,
    user_id     BIGINT,
    pv          BIGINT SUM,          -- 求和
    uv          BIGINT REPLACE,      -- 替换(保留最新)
    max_amount  DECIMAL(10,2) MAX,   -- 最大值
    min_amount  DECIMAL(10,2) MIN    -- 最小值
)
AGGREGATE KEY(dt, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;

Unique Key(主键模型)

sql
-- 相同 Key 的数据保留最新版本,支持 UPDATE/DELETE
CREATE TABLE user_profiles (
    user_id     BIGINT,
    name        VARCHAR(100),
    age         INT,
    city        VARCHAR(50),
    updated_at  DATETIME
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
    "enable_unique_key_merge_on_write" = "true"  -- 写时合并(查询更快)
);

数据导入

Stream Load(HTTP 实时导入)

bash
# 从本地文件导入
curl -u root:password \
  -H "label:load_20240101" \
  -H "column_separator:," \
  -H "columns:user_id,name,age,city" \
  -T /data/users.csv \
  http://fe_host:8030/api/mydb/user_profiles/_stream_load

# 从 JSON 导入
curl -u root:password \
  -H "label:load_json_20240101" \
  -H "format:json" \
  -H "strip_outer_array:true" \
  -T /data/users.json \
  http://fe_host:8030/api/mydb/user_profiles/_stream_load

Broker Load(HDFS/S3 批量导入)

sql
LOAD LABEL mydb.load_20240101 (
    DATA INFILE("hdfs://namenode:9000/data/users/*.csv")
    INTO TABLE user_profiles
    COLUMNS TERMINATED BY ","
    (user_id, name, age, city)
)
WITH BROKER "hdfs_broker"
(
    "username" = "hadoop",
    "password" = ""
)
PROPERTIES (
    "timeout" = "3600",
    "max_filter_ratio" = "0.1"  -- 允许 10% 的错误行
);

-- 查看导入状态
SHOW LOAD WHERE LABEL = "load_20240101";

Routine Load(Kafka 实时消费)

sql
CREATE ROUTINE LOAD mydb.kafka_load ON user_events
COLUMNS TERMINATED BY ","
COLUMNS(user_id, event_type, amount, event_time)
PROPERTIES (
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_error_number" = "1000"
)
FROM KAFKA (
    "kafka_broker_list" = "kafka:9092",
    "kafka_topic" = "user_events",
    "property.group.id" = "doris-routine-load",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

-- 查看 Routine Load 状态
SHOW ROUTINE LOAD FOR mydb.kafka_load;

-- 暂停/恢复
PAUSE ROUTINE LOAD FOR mydb.kafka_load;
RESUME ROUTINE LOAD FOR mydb.kafka_load;

Insert Into(SQL 导入)

sql
-- 从其他表导入
INSERT INTO user_profiles
SELECT user_id, name, age, city, now()
FROM staging_table
WHERE age > 0;

-- 从外部表导入(Hive)
INSERT INTO user_profiles
SELECT * FROM hive_catalog.dw_db.dim_user;

查询优化

分区与分桶

sql
-- 范围分区(按日期)
CREATE TABLE user_events (
    event_time  DATETIME,
    user_id     BIGINT,
    amount      DECIMAL(10, 2)
)
DUPLICATE KEY(event_time, user_id)
PARTITION BY RANGE(event_time) (
    PARTITION p202401 VALUES LESS THAN ("2024-02-01"),
    PARTITION p202402 VALUES LESS THAN ("2024-03-01"),
    PARTITION p202403 VALUES LESS THAN ("2024-04-01")
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;

-- 动态分区(自动创建和删除分区)
PROPERTIES (
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-30",   -- 保留最近30天
    "dynamic_partition.end" = "3",       -- 提前创建3天
    "dynamic_partition.prefix" = "p",
    "dynamic_partition.buckets" = "32"
);

物化视图

sql
-- 创建物化视图(同步更新)
CREATE MATERIALIZED VIEW hourly_stats AS
SELECT
    date_trunc('hour', event_time) AS hour,
    event_type,
    count(*) AS cnt,
    sum(amount) AS total
FROM user_events
GROUP BY date_trunc('hour', event_time), event_type;

-- 查询时自动命中物化视图
SELECT event_type, sum(cnt)
FROM hourly_stats  -- 或直接查 user_events,优化器自动选择
WHERE hour >= '2024-01-01'
GROUP BY event_type;

Colocate Join(本地 Join)

sql
-- 相同分桶策略的表,Join 时无需 Shuffle
CREATE TABLE orders (
    order_id    BIGINT,
    user_id     BIGINT,
    amount      DECIMAL(10, 2)
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");  -- 同一 Colocate Group

CREATE TABLE user_profiles (
    user_id     BIGINT,
    name        VARCHAR(100)
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES ("colocate_with" = "user_group");  -- 同一 Colocate Group

-- 这个 Join 不需要 Shuffle(本地 Join)
SELECT o.order_id, u.name, o.amount
FROM orders o JOIN user_profiles u ON o.user_id = u.user_id;

外部数据源

sql
-- 创建 Hive Catalog(直接查询 Hive 数据)
CREATE CATALOG hive_catalog PROPERTIES (
    'type' = 'hms',
    'hive.metastore.uris' = 'thrift://hive-metastore:9083'
);

-- 查询 Hive 表
SELECT * FROM hive_catalog.dw_db.dwd_user_log
WHERE dt = '2024-01-01';

-- 创建 Iceberg Catalog
CREATE CATALOG iceberg_catalog PROPERTIES (
    'type' = 'iceberg',
    'iceberg.catalog.type' = 'hms',
    'hive.metastore.uris' = 'thrift://hive-metastore:9083'
);

-- 创建 MySQL 外部表
CREATE CATALOG mysql_catalog PROPERTIES (
    'type' = 'jdbc',
    'user' = 'root',
    'password' = 'password',
    'jdbc_url' = 'jdbc:mysql://mysql:3306/mydb',
    'driver_url' = 'mysql-connector-java-8.0.25.jar',
    'driver_class' = 'com.mysql.cj.jdbc.Driver'
);

常用运维命令

sql
-- 查看集群状态
SHOW FRONTENDS;
SHOW BACKENDS;

-- 查看表状态
SHOW TABLET STATUS FROM user_events;

-- 查看导入任务
SHOW LOAD ORDER BY CreateTime DESC LIMIT 10;

-- 查看查询 Profile
SET enable_profile = true;
SELECT count() FROM user_events;
SHOW QUERY PROFILE "/";

-- 表数据均衡
ADMIN REBALANCE DISK;

-- 查看表大小
SELECT
    table_name,
    ROUND(SUM(data_length) / 1024 / 1024, 2) AS data_mb
FROM information_schema.tables
WHERE table_schema = 'mydb'
GROUP BY table_name
ORDER BY data_mb DESC;

本站内容由 褚成志 整理编写,仅供学习参考