跳转至

第四章:表引擎

MergeTree 系列

MergeTree

-- 创建 MergeTree 表
CREATE TABLE events (
    event_id UInt32,
    event_type String,
    event_time DateTime,
    user_id UInt32,
    properties Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, event_id)
SETTINGS index_granularity = 8192;

-- 插入数据
INSERT INTO events VALUES
    (1, 'click', '2024-01-01 12:00:00', 100, {'page': 'home'}),
    (2, 'purchase', '2024-01-01 13:00:00', 100, {'amount': '100'});

-- 特点
-- 1. 支持分区
-- 2. 支持主键索引
-- 3. 支持数据副本
-- 4. 支持 TTL

ReplacingMergeTree

-- 自动去重
CREATE TABLE users (
    user_id UInt32,
    name String,
    updated_at DateTime,
    version UInt32
) ENGINE = ReplacingMergeTree(version)
ORDER BY user_id;

-- 插入数据(会去重)
INSERT INTO users VALUES (1, 'Alice', '2024-01-01 12:00:00', 1);
INSERT INTO users VALUES (1, 'Alice Updated', '2024-01-02 12:00:00', 2);

-- 查询需要使用 FINAL
SELECT * FROM users FINAL;

SummingMergeTree

-- 自动聚合求和
CREATE TABLE metrics (
    date Date,
    metric_name String,
    value Int64
) ENGINE = SummingMergeTree(value)
ORDER BY (date, metric_name);

-- 插入数据(会自动求和)
INSERT INTO metrics VALUES ('2024-01-01', 'page_views', 100);
INSERT INTO metrics VALUES ('2024-01-01', 'page_views', 50);
-- 查询结果: ('2024-01-01', 'page_views', 150)

AggregatingMergeTree

-- 聚合函数状态存储
CREATE TABLE aggregates (
    date Date,
    key String,
    value AggregateFunction(uniq, UInt32)
) ENGINE = AggregatingMergeTree()
ORDER BY (date, key);

-- 插入聚合状态
INSERT INTO aggregates
SELECT date, key, uniqState(user_id)
FROM events
GROUP BY date, key;

-- 查询聚合结果
SELECT date, key, uniqMerge(value)
FROM aggregates
GROUP BY date, key;

CollapsingMergeTree

-- 折叠合并
CREATE TABLE events_collapsing (
    event_id UInt32,
    event_type String,
    event_time DateTime,
    sign Int8  -- 1: 添加, -1: 删除
) ENGINE = CollapsingMergeTree(sign)
ORDER BY event_id;

-- 插入数据
INSERT INTO events_collapsing VALUES (1, 'click', '2024-01-01 12:00:00', 1);
INSERT INTO events_collapsing VALUES (1, 'click', '2024-01-01 12:00:00', -1);  -- 删除

Log 系列

TinyLog

-- 简单日志表
CREATE TABLE tiny_logs (
    id UInt32,
    message String
) ENGINE = TinyLog();

-- 特点:写入快,不支持并发读取

StripeLog

-- 条带日志表
CREATE TABLE stripe_logs (
    id UInt32,
    message String
) ENGINE = StripeLog();

-- 特点:支持并发读取

Log

-- 日志表
CREATE TABLE logs (
    id UInt32,
    message String
) ENGINE = Log();

集成引擎

Kafka

-- Kafka 引擎
CREATE TABLE kafka_source (
    event_id UInt32,
    event_type String,
    event_time DateTime
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse',
    kafka_format = 'JSONEachRow';

-- 创建物化视图消费数据
CREATE TABLE events_from_kafka (
    event_id UInt32,
    event_type String,
    event_time DateTime
) ENGINE = MergeTree()
ORDER BY event_id;

CREATE MATERIALIZED VIEW kafka_consumer TO events_from_kafka AS
SELECT * FROM kafka_source;

MySQL

-- MySQL 引擎
CREATE TABLE mysql_table (
    id UInt32,
    name String
) ENGINE = MySQL('localhost:3306', 'db', 'table', 'user', 'password');

PostgreSQL

-- PostgreSQL 引擎
CREATE TABLE postgres_table (
    id UInt32,
    name String
) ENGINE = PostgreSQL('localhost:5432', 'db', 'table', 'user', 'password');

HDFS

-- HDFS 引擎
CREATE TABLE hdfs_table (
    id UInt32,
    name String
) ENGINE = HDFS('hdfs://localhost:9000/data/file.csv', 'CSV');

S3

-- S3 引擎
CREATE TABLE s3_table (
    id UInt32,
    name String
) ENGINE = S3('https://bucket.s3.amazonaws.com/data/file.csv', 'access_key', 'secret_key', 'CSV');

特殊引擎

Distributed

-- 分布式表
CREATE TABLE events_distributed ON CLUSTER cluster (
    event_id UInt32,
    event_type String,
    event_time DateTime
) ENGINE = Distributed(cluster, default, events_local, rand());

-- 查询分布式表
SELECT * FROM events_distributed;

Dictionary

-- 字典表
CREATE DICTIONARY user_dict (
    user_id UInt32,
    name String,
    email String
) PRIMARY KEY user_id
SOURCE(MYSQL(
    host 'localhost'
    port 3306
    user 'root'
    password 'password'
    db 'db'
    table 'users'
))
LAYOUT(HASHED())
LIFETIME(MIN 300 MAX 600);

-- 使用字典
SELECT dictGet('user_dict', 'name', toUInt32(1));

View

-- 视图
CREATE VIEW active_users AS
SELECT user_id, COUNT(*) as event_count
FROM events
WHERE event_time > now() - INTERVAL 1 DAY
GROUP BY user_id;

Materialized View

-- 物化视图
CREATE MATERIALIZED VIEW daily_stats
ENGINE = SummingMergeTree()
ORDER BY date
AS SELECT
    toDate(event_time) as date,
    event_type,
    COUNT(*) as count
FROM events
GROUP BY date, event_type;

小结

表引擎要点:

  • MergeTree 系列:MergeTree、ReplacingMergeTree、SummingMergeTree
  • Log 系列:TinyLog、StripeLog、Log
  • 集成引擎:Kafka、MySQL、PostgreSQL、HDFS、S3
  • 特殊引擎:Distributed、Dictionary、View

下一章我们将学习 SQL 查询。