Skip to content

ClickHouse — 列式 OLAP

什么是 ClickHouse

ClickHouse 是由 Yandex 开源的列式数据库管理系统,专为 OLAP(联机分析处理)场景设计。它以极致的查询性能著称,单表查询速度可达每秒数十亿行。

核心优势

  • 列式存储:只读取查询涉及的列,I/O 极少
  • 向量化执行:SIMD 指令批量处理数据
  • 数据压缩:列式存储压缩率高(通常 5-10x)
  • 分布式查询:支持分片和副本
  • 实时写入:支持高频批量写入

适用场景:日志分析、用户行为分析、监控指标、广告统计


核心架构

┌─────────────────────────────────────────────────────────────┐
│                    ClickHouse 集群                           │
│                                                             │
│  ┌──────────────────────────────────────────────────────┐  │
│  │                  分布式表(Distributed)               │  │
│  │  查询路由到各分片,结果合并返回                        │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                             │
│  Shard 1                    Shard 2                        │
│  ┌──────────────────┐       ┌──────────────────┐          │
│  │  Replica 1       │       │  Replica 1       │          │
│  │  MergeTree 表    │       │  MergeTree 表    │          │
│  └──────────────────┘       └──────────────────┘          │
│  ┌──────────────────┐       ┌──────────────────┐          │
│  │  Replica 2       │       │  Replica 2       │          │
│  │  MergeTree 表    │       │  MergeTree 表    │          │
│  └──────────────────┘       └──────────────────┘          │
└─────────────────────────────────────────────────────────────┘

MergeTree 表引擎

MergeTree 是 ClickHouse 最重要的表引擎,是其他引擎的基础。

基本 MergeTree

sql
CREATE TABLE user_events (
    user_id     UInt64,
    event_type  LowCardinality(String),  -- 低基数字符串优化
    page_url    String,
    amount      Float64,
    event_time  DateTime,
    dt          Date
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(dt)          -- 按月分区
ORDER BY (user_id, event_time)     -- 主键(排序键),影响查询性能
PRIMARY KEY (user_id)              -- 稀疏索引(可以与 ORDER BY 不同)
SETTINGS index_granularity = 8192; -- 索引粒度(每8192行一个索引点)

ReplacingMergeTree(去重)

sql
-- 相同 ORDER BY 的行,后台异步合并时保留最新版本
CREATE TABLE user_profiles (
    user_id     UInt64,
    name        String,
    age         UInt8,
    updated_at  DateTime
)
ENGINE = ReplacingMergeTree(updated_at)  -- 按 updated_at 保留最新
ORDER BY user_id;

-- 查询时强制去重(合并可能未完成)
SELECT * FROM user_profiles FINAL WHERE user_id = 1;

SummingMergeTree(预聚合)

sql
-- 相同 ORDER BY 的行,后台合并时自动求和
CREATE TABLE daily_stats (
    dt          Date,
    user_id     UInt64,
    pv          UInt64,
    uv          UInt64,
    revenue     Float64
)
ENGINE = SummingMergeTree((pv, uv, revenue))  -- 指定需要求和的列
ORDER BY (dt, user_id);

AggregatingMergeTree(聚合状态)

sql
-- 存储聚合中间状态,支持增量聚合
CREATE TABLE user_daily_agg (
    dt          Date,
    user_id     UInt64,
    pv          AggregateFunction(count, UInt64),
    avg_amount  AggregateFunction(avg, Float64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (dt, user_id);

-- 写入(使用 -State 函数)
INSERT INTO user_daily_agg
SELECT dt, user_id, countState(), avgState(amount)
FROM user_events
GROUP BY dt, user_id;

-- 查询(使用 -Merge 函数)
SELECT dt, user_id, countMerge(pv), avgMerge(avg_amount)
FROM user_daily_agg
GROUP BY dt, user_id;

CollapsingMergeTree(折叠去重)

sql
-- 通过 sign 列标记插入(+1)和删除(-1),合并时抵消
CREATE TABLE orders (
    order_id    UInt64,
    user_id     UInt64,
    amount      Float64,
    sign        Int8  -- +1 插入,-1 删除
)
ENGINE = CollapsingMergeTree(sign)
ORDER BY order_id;

-- 插入
INSERT INTO orders VALUES (1, 100, 99.9, 1);

-- 更新(先删除旧值,再插入新值)
INSERT INTO orders VALUES (1, 100, 99.9, -1);  -- 删除旧值
INSERT INTO orders VALUES (1, 100, 199.9, 1);  -- 插入新值

常用 SQL

sql
-- 查询优化:使用分区裁剪
SELECT count(), sum(amount)
FROM user_events
WHERE dt >= '2024-01-01' AND dt < '2024-02-01'  -- 分区裁剪
  AND user_id = 12345;                            -- 主键过滤

-- 近似计算(极快)
SELECT
    uniq(user_id) AS uv,              -- 近似去重计数(HyperLogLog)
    uniqExact(user_id) AS exact_uv,   -- 精确去重(慢)
    quantile(0.5)(amount) AS median,  -- 近似中位数
    topK(10)(page_url) AS top_pages   -- Top-K
FROM user_events
WHERE dt = today();

-- 时间序列分析
SELECT
    toStartOfHour(event_time) AS hour,
    count() AS cnt,
    sum(amount) AS revenue
FROM user_events
WHERE dt = today()
GROUP BY hour
ORDER BY hour;

-- 漏斗分析
SELECT
    countIf(event_type = 'view') AS view_cnt,
    countIf(event_type = 'click') AS click_cnt,
    countIf(event_type = 'purchase') AS purchase_cnt,
    countIf(event_type = 'click') / countIf(event_type = 'view') AS click_rate,
    countIf(event_type = 'purchase') / countIf(event_type = 'click') AS purchase_rate
FROM user_events
WHERE dt = today();

-- 留存分析(使用 retention 函数)
SELECT
    dt,
    retention(
        event_type = 'login',
        event_type = 'login'  -- 次日留存
    ) AS retention
FROM user_events
GROUP BY dt;

-- 窗口函数
SELECT
    user_id,
    event_time,
    amount,
    sum(amount) OVER (PARTITION BY user_id ORDER BY event_time
                      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cum_amount,
    row_number() OVER (PARTITION BY user_id ORDER BY amount DESC) AS rn
FROM user_events;

数据写入

sql
-- 批量插入(推荐,每次至少 1000 行)
INSERT INTO user_events VALUES
    (1, 'click', '/home', 0, now(), today()),
    (2, 'purchase', '/product/123', 99.9, now(), today());

-- 从查询结果插入
INSERT INTO user_events
SELECT user_id, event_type, page_url, amount, event_time, toDate(event_time)
FROM staging_table;

-- 从文件导入
clickhouse-client --query "INSERT INTO user_events FORMAT CSV" < data.csv

-- 从 Kafka 读取(Kafka 表引擎)
CREATE TABLE kafka_source (
    user_id     UInt64,
    event_type  String,
    amount      Float64,
    event_time  DateTime
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'user_events',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'JSONEachRow';

-- 物化视图:自动将 Kafka 数据写入 MergeTree
CREATE MATERIALIZED VIEW kafka_mv TO user_events AS
SELECT * FROM kafka_source;

物化视图

sql
-- 创建物化视图(实时预聚合)
CREATE MATERIALIZED VIEW hourly_stats
ENGINE = SummingMergeTree()
ORDER BY (hour, event_type)
AS SELECT
    toStartOfHour(event_time) AS hour,
    event_type,
    count() AS cnt,
    sum(amount) AS total_amount
FROM user_events
GROUP BY hour, event_type;

-- 查询物化视图(极快)
SELECT hour, event_type, sum(cnt), sum(total_amount)
FROM hourly_stats
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, event_type
ORDER BY hour;

性能调优

sql
-- 查看查询执行计划
EXPLAIN SELECT count() FROM user_events WHERE dt = today();

-- 查看查询日志
SELECT query, query_duration_ms, read_rows, read_bytes
FROM system.query_log
WHERE type = 'QueryFinish'
ORDER BY query_duration_ms DESC
LIMIT 10;

-- 查看表大小
SELECT
    table,
    formatReadableSize(sum(bytes)) AS size,
    sum(rows) AS rows,
    count() AS parts
FROM system.parts
WHERE active AND database = 'default'
GROUP BY table
ORDER BY sum(bytes) DESC;

-- 强制合并(减少 Part 数量)
OPTIMIZE TABLE user_events FINAL;
xml
<!-- config.xml 性能配置 -->
<max_memory_usage>10000000000</max_memory_usage>  <!-- 10GB -->
<max_threads>16</max_threads>
<max_block_size>65536</max_block_size>
<max_insert_block_size>1048576</max_insert_block_size>

本站内容由 褚成志 整理编写,仅供学习参考