Doris 使用 AGGREGATE 模型进行数据局部字段更新

使用 AGGREGATE 模型进行数据局部字段更新DEMO。


创建测试库

1
create database test;

创建测试表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE test.`REPLACE_TEST` (
a varchar(1000),
b varchar(1000) REPLACE_IF_NOT_NULL,
c varchar(1000) REPLACE_IF_NOT_NULL,
d varchar(1000) REPLACE_IF_NOT_NULL
) ENGINE=OLAP
AGGREGATE KEY(`a`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`a`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

向测试表插入数据

1
2
3
4
5
6
insert into test.`REPLACE_TEST`
values
("A", "1", "2", "3"),
("B", "4", "5", "6");

select * from test.`REPLACE_TEST`;

创建 Kafka Routine Load Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE ROUTINE LOAD test.REPLACE_TEST ON REPLACE_TEST
COLUMNS(a, b, c, d)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval"="20",
"max_batch_rows"="300000",
"max_batch_size"="209715200",
"strict_mode"="false",
"format"="json",
"jsonpaths" = "[\"$.a\",\"$.b\",\"$.c\",\"$.d\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list"="10.10.110.235:9092",
"kafka_topic"="doris_replace_test",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.auto.offset.reset" = "earliest"
);

确认 Kafka Routine Load Task 作业状态

1
2
use test;
SHOW ROUTINE FOR test.REPLACE_TEST;

向 Kafka Topic 插入数据

1
2
3
4
5
6
kafka-console-producer --topic doris_replace_test --broker-list 10.10.110.235:9092

-- Json 数据
{"a":"C","b":"7","c":"8","d":"9"}
{"a":"C","c":"10","d":"11"}
{"a":"C","d":"20"}

Kafka Routine Load Task 会自动识别Json数据中存在的字段,不存在的字段不会被更新。