ClickHouse Kafka 数据接入 —— 实时数据导入 Task 创建

Kafka 接入 ClickHouse,数据实时进入 ClickHouse 表

这个功能和 Doris 和 Routine Load 功能类似,只是有些操作更复杂一些(Offset 操作)

相比于 Doris Routine Load,ClickHouse 的 Kafka 接入更灵活一些


创建 kafka_source

kafka_source 表直接对接 Kafka Topic,是 Kafka 消息订阅引擎

消费的消息会被自动追踪,因此每个消息在不同的消费组里只会记录一次,这句话非常重要

所以我们不能随便对 kafka_source 表进行查询操作,

因为每条消息在一个消费组内只记录一次,所以查询操作会导致下游数据的丢失

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE kafka_source (
ts UInt64,
level String,
message String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092',
kafka_topic_list = 'ck_test',
kafka_group_name = 'test01',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2;

创建 target 表

target 表为 Kafka -> ClickHouse 的目标表

1
2
3
4
5
6
7
8
9
CREATE TABLE target
(
ts DateTime,
level String,
message String
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(ts)
ORDER BY level;

创建物化视图

source_target_mv 物化视图为 kafka_source 和 target 之间的连接器

控制数据从哪来、到哪去

还可以进行数据落地前的逻辑加工

1
2
3
4
5
6
CREATE MATERIALIZED VIEW source_target_mv TO target AS
SELECT
toDateTime(ts) AS ts,
level,
message
FROM kafka_source;

Tips

1.不要手动对 kafka_source 表进行任何查询操作!!

2.Kafka 数据接入时,会对 Json 中的 Key 字段进行赋值,

因为 Value 为空时,Json 的 Key 会进行缺省,缺省的 Key 对应的字段落表时值为空。