Doris Kafka Load

实时消费 Kafka 某 Topic 数据到 Doris 表 Task 创建方法及案例。


Kafka Topic创建

Kafka Topic : dorisDemo

创建Kafka Topic

1
kafka-topics --create --zookeeper cdh9:2181/kafka --partitions 3 --replication-factor 2 --topic dorisDemo

查看Kafka Topic

1
kafka-topics --zookeeper cdh9:2181/kafka --list

创建Doris导入任务

语法

1
2
3
4
5
6
7
8
9
10
CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
[COLUMN TERMINATED BY "column_separator" ,]
[COLUMN (col1, col2, ...) ,]
[WHERE where_condition ,]
[PARTITION (part1, part2, ...)]
[PROPERTIES ("key" = "value", ...)]
FROM [DATA_SOURCE]
[(data_source_properties1 = 'value1',
data_source_properties2 = 'value2',
...)]

sql案例

1
2
3
4
5
6
7
8
9
10
11
CREATE ROUTINE LOAD routine_load_kafka ON demo_kafka_load_detail
COLUMNS TERMINATED BY ",",
COLUMNS (a, b, c)
PROPERTIES(
"desired_concurrent_number"="1",
"max_error_number"="1000"
)
FROM KAFKA(
"kafka_broker_list"="10.0.15.131:9092",
"kafka_topic"="dorisDemo"
);

查看任务状态

1
SHOW ALL ROUTINE LOAD \G

手动向Kafka生产数据

查询Doris load表数据

查看任务状态

Doris ROUTINE LOAD 操作

暂停导入任务

1
PAUSE ROUTINE LOAD FOR [job_name];

恢复导入任务

使用RESUME语句后,任务会短暂的进入 NEED_SCHEDULE 状态,表示任务正在重新调度,一段时间

后会重新恢复至 RUNING 状态,继续导入数据。

1
RESUME ROUTINE LOAD FOR [job_name];

停止导入任务

使用STOP语句后,此时导入任务进入 STOP 状态,数据停止导入,任务消亡,无法恢复数据导入。

1
STOP ROUTINE ``LOAD` `FOR` `[job_name];

Doris Routine Load Task DEMO 案例

DEMO1:Json字段和Doris Table字段需要顺序一致的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE ROUTINE LOAD example_db.kafka_json_to_doris_2 ON demo_ubt_detail
COLUMNS(xwhen,appid,xwho,xwhat,ds,$country,$province,$city,$browser_version,$os_version,$model,$os,$brand,$device_type,$browser,xwhat_id,isChangeXwho,distinct_id,offset,$web_crawler,$is_first_day,$screen_width,$is_first_time,$platform,$ip,$screen_height,$user_agent,$language,$lib,$debug,$is_login,$is_time_calibrated,$lib_version,platform_extra,$time_zone,$session_id)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092",
"kafka_topic" = "ubtDoris1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.auto.offset.reset" = "earliest"
);

DEMO2:Json字段和Doris Table字段顺序不一致的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE ROUTINE LOAD dwd_homedo_real.dwd_smb_account_info ON dwd_smb_account_info
COLUMNS(user_id, createtime, agentId, login_type, accountName, mobile, login_account, accountId, roleIdAudit, modified_time, roleIds, login_password, accountRealName, disabled, otherPermissions, operatorId, email, fullFileName, memberId)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval"="20",
"max_batch_rows"="300000",
"max_batch_size"="209715200",
"strict_mode"="false",
"format"="json",
"jsonpaths" = "[\"$.user_id\",\"$.createtime\",\"$.agentId\",\"$.login_type\",\"$.accountName\",\"$.mobile\",\"$.login_account\",\"$.accountId\",\"$.roleIdAudit\",\"$.modified_time\",\"$.roleIds\",\"$.login_password\",\"$.accountRealName\",\"$.disabled\",\"$.otherPermissions\",\"$.operatorId\",\"$.email\",\"$.fullFileName\",\"$.memberId\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092",
"kafka_topic" = "ods_homedo_shop_sysuser_account",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.auto.offset.reset" = "earliest"
);

DEMO3:从Json字段取部分字段并做筛选后到Doris

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
-- Json 数据
{
"id": 7247316,
"tagGroupId": 30,
"tagId": 5619893,
"promotionId": 27039,
"insertTime": "2021-05-17 14:35:03.037",
"updateTime": "2021-05-17 14:35:03.037",
"deleteTime": "1900-01-01 00:00:00.0",
"mark": 1,
"version": 0,
"sysModifyDate": 5634101229,
"tagValue": "明星品",
"eventTime": "2021-05-17 14:35:04.603"
}


-- Doris Table DDL
DROP TABLE IF EXISTS dwd_homedo_real.dwd_homedo_promotion_flag;
CREATE TABLE IF NOT EXISTS dwd_homedo_real.dwd_homedo_promotion_flag(
promotionId VARCHAR(1000) NULL,
tagGroupId VARCHAR(1000) NULL,
tagId VARCHAR(1000) NULL,
tagValue VARCHAR(1000) NULL,
mark VARCHAR(1000) NULL
)
DUPLICATE KEY(promotionId)
DISTRIBUTED BY HASH(promotionId) BUCKETS 3;


-- Routine Load SQL
CREATE ROUTINE LOAD dwd_homedo_real.dwd_homedo_promotion_flag ON dwd_homedo_promotion_flag
COLUMNS(promotionId, tagGroupId, tagId, tagValue, mark),
WHERE mark > 0
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval"="20",
"max_batch_rows"="300000",
"max_batch_size"="209715200",
"strict_mode"="false",
"format"="json",
"jsonpaths" = "[\"$.promotionId\",\"$.tagGroupId\",\"$.tagId\",\"$.tagValue\",\"$.mark\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list"="10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092",
"kafka_topic"="ods_Homedo_t_Tag_Promotion",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.auto.offset.reset" = "earliest"
);

Doris Routine Load 官方帮助文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
Name: 'ROUTINE LOAD'
Description:
例行导入(Routine Load)功能,支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入文本格式(CSV)的数据。
语法:
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
1. [db.]job_name
导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
2. tbl_name
指定需要导入的表的名称。
3. load_properties
用于描述导入数据。语法:
[column_separator],
[columns_mapping],
[where_predicates],
[partitions]
1. column_separator:
指定列分隔符,如:
COLUMNS TERMINATED BY ","
默认为:\t
2. columns_mapping:
指定源数据中列的映射关系,以及定义衍生列的生成方式。
1. 映射列:
按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。
假设目的表有三列 k1, k2, v1。源数据有4列,其中第1、2、4列分别对应 k2, k1, v1。则书写如下:
COLUMNS (k2, k1, xxx, v1)
其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
2. 衍生列:
以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。
衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。
接上一个示例,假设目的表还有第4列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
3. where_predicates
用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。
例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:
WHERE k1 > 100 and k2 = 1000
4. partitions
指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
示例:
PARTITION(p1, p2, p3)
4. job_properties
用于指定例行导入作业的通用参数。
语法:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
目前我们支持以下参数:
1. desired_concurrent_number
期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。
这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。
例:
"desired_concurrent_number" = "3"
2. max_batch_interval/max_batch_rows/max_batch_size
这三个参数分别表示:
1)每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。
2)每个子任务最多读取的行数。必须大于等于200000。默认是200000。
3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB。
这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
例:
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
3. max_error_number
采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
被 where 条件过滤掉的行不算错误行。
4. strict_mode
是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"
5. timezone
指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。
6. format
指定导入数据格式,默认是csv,支持json格式。
7. jsonpaths
jsonpaths: 导入json方式分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入,具体可参考示例。
8. strip_outer_array
布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。
5. data_source
数据源的类型。当前支持:
KAFKA
6. data_source_properties
指定数据源相关的信息。
语法:
(
"key1" = "val1",
"key2" = "val2"
)
1. KAFKA 数据源
1. kafka_broker_list
Kafka 的 broker 连接信息。格式为 ip:host。多个broker之间以逗号分隔。
示例:
"kafka_broker_list" = "broker1:9092,broker2:9092"
2. kafka_topic
指定要订阅的 Kafka 的 topic。
示例:
"kafka_topic" = "my_topic"
3. kafka_partitions/kafka_offsets
指定需要订阅的 kafka partition,以及对应的每个 partition 的起始 offset。
offset 可以指定从大于等于 0 的具体 offset,或者:
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
如果没有指定,则默认从 OFFSET_END 开始订阅 topic 下的所有 partition。
示例:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
4. property
指定自定义kafka参数。
功能等同于kafka shell中 "--property" 参数。
当参数的 value 为一个文件时,需要在 value 前加上关键词:"FILE:"。
关于如何创建文件,请参阅 "HELP CREATE FILE;"
更多支持的自定义参数,请参阅 librdkafka 的官方 CONFIGURATION 文档中,client 端的配置项。
示例:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
1.使用 SSL 连接 Kafka 时,需要指定以下参数:
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg"
其中:
"property.security.protocol" 和 "property.ssl.ca.location" 为必须,用于指明连接方式为 SSL,以及 CA 证书的位置。
如果 Kafka server 端开启了 client 认证,则还需设置:
"property.ssl.certificate.location"
"property.ssl.key.location"
"property.ssl.key.password"
分别用于指定 client 的 public key,private key 以及 private key 的密码。

2.指定kafka partition的默认起始offset
如果没有指定kafka_partitions/kafka_offsets,默认消费所有分区,此时可以指定kafka_default_offsets指定起始 offset。默认为 OFFSET_END,即从末尾开始订阅。
值为
1) OFFSET_BEGINNING: 从有数据的位置开始订阅。
2) OFFSET_END: 从末尾开始订阅。
示例:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
7. 导入数据格式样例
整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03。
字符串类(CHAR/VARCHAR)(无引号):I am a student, a
NULL值:\N
Examples:
1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。指定列分隔符和 group.id 和 client.id,并且自动默认消费所有分区,且从有数据的位置(OFFSET_BEGINNING)开始订阅
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
2. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
3. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式,时区为 Africa/Abidjan
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%doris%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
4. 简单模式导入json
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
支持两种json数据格式:
1){"category":"a9jadhx","author":"test","price":895}
2)[
{"category":"a9jadhx","author":"test","price":895},
{"category":"axdfa1","author":"EvelynWaugh","price":1299}
]
5. 精准导入json数据格式
CREATE TABLE `example_tbl` (
`category` varchar(24) NULL COMMENT "",
`author` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`price` double REPLACE
) ENGINE=OLAP
AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p0 VALUES [("-2147483648"), ("20200509")),
PARTITION p20200509 VALUES [("20200509"), ("20200510")),
PARTITION p20200510 VALUES [("20200510"), ("20200511")),
PARTITION p20200511 VALUES [("20200511"), ("20200512")))
DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
PROPERTIES (
"storage_type" = "COLUMN",
"replication_num" = "1"
);
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
json数据格式:
[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
说明:
1)如果json数据是以数组开始,并且数组中每个对象是一条记录,则需要将strip_outer_array设置成true,表示展平数组。
2)如果json数据是以数组开始,并且数组中每个对象是一条记录,在设置jsonpath时,我们的ROOT节点实际上是数组中对象。