Doris Unique 模型使用 Sequence 功能解决数据更新乱序问题

Unique模型有按照Key取最新值的去重功能,

因为Routine Load Task 是累计一定数据量/到达时间周期批量导入,

但是不保证每个批量的数据的顺序,

所以当一批数据中有同Key的数据时,会出现乱序覆盖的问题。


以下是这个场景使用 sequence 功能的解决方案:

创建测试表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
-- dwd_homedo_real.dwd_agent_agent_test1 definition

CREATE TABLE dwd_homedo_real.`dwd_agent_agent_test1` (
`id` bigint(20) NULL COMMENT "",
`accountid` bigint(20) NULL COMMENT "",
`typeid` bigint(20) NULL COMMENT "",
`name` varchar(500) NULL COMMENT "",
`legalperson` varchar(500) NULL COMMENT "",
`telephone` varchar(500) NULL COMMENT "",
`registeraddress` varchar(500) NULL COMMENT "",
`address` varchar(500) NULL COMMENT "",
`postal` varchar(500) NULL COMMENT "",
`buslicensefilename` varchar(500) NULL COMMENT "",
`taxregcertificatefilename` varchar(500) NULL COMMENT "",
`orgcodefilename` varchar(500) NULL COMMENT "",
`taxnumber` varchar(500) NULL COMMENT "",
`bankname` varchar(500) NULL COMMENT "",
`remark` varchar(500) NULL COMMENT "",
`bidcount` bigint(20) NULL COMMENT "",
`packagecount` decimal(18, 2) NULL COMMENT "",
`serviceid` bigint(20) NULL COMMENT "",
`statusflag` bigint(20) NULL COMMENT "",
`submitid` bigint(20) NULL COMMENT "",
`auditid` bigint(20) NULL COMMENT "",
`audittime` varchar(500) NULL COMMENT "",
`inserttime` varchar(500) NULL COMMENT "",
`updatetime` varchar(500) NULL COMMENT "",
`deletetime` varchar(500) NULL COMMENT "",
`mark` bigint(20) NULL COMMENT "",
`version` bigint(20) NULL COMMENT "",
`tag` bigint(20) NULL COMMENT "",
`firstsuccessaudittime` varchar(500) NULL COMMENT "",
`isthreeinone` bigint(20) NULL COMMENT "",
`bankaccountnumber` varchar(500) NULL COMMENT "",
`financepayamount` decimal(18, 2) NULL COMMENT "",
`audittype` bigint(20) NULL COMMENT "",
`systemremark` varchar(500) NULL COMMENT "",
`sysmodifydate` bigint(20) NULL COMMENT "",
`provinceid` bigint(20) NULL COMMENT "",
`cityid` bigint(20) NULL COMMENT "",
`areaid` bigint(20) NULL COMMENT "",
`platform` varchar(500) NULL COMMENT "",
`managementprovinceid` bigint(20) NULL COMMENT "",
`managementcityid` bigint(20) NULL COMMENT "",
`managementareaid` bigint(20) NULL COMMENT "",
`managementaddress` varchar(500) NULL COMMENT "",
`eventtime` datetime NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2",
"function_column.sequence_type" = 'Datetime'
);

创建Routine Load Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
CREATE ROUTINE LOAD dwd_homedo_real.dwd_agent_agent_test1 ON dwd_agent_agent_test1
COLUMNS(
id,
accountid,
typeid,
name,
legalperson,
telephone,
registeraddress,
address,
postal,
buslicensefilename,
taxregcertificatefilename,
orgcodefilename,
taxnumber,
bankname,
remark,
bidcount,
packagecount,
serviceid,
statusflag,
submitid,
auditid,
audittime,
inserttime,
updatetime,
deletetime,
mark,
version,
tag,
firstsuccessaudittime,
isthreeinone,
bankaccountnumber,
financepayamount,
audittype,
systemremark,
sysmodifydate,
provinceid,
cityid,
areaid,
platform,
managementprovinceid,
managementcityid,
managementareaid,
managementaddress,
eventtime
),
ORDER BY eventtime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval"="20",
"max_batch_rows"="300000",
"max_batch_size"="209715200",
"strict_mode"="false",
"format"="json",
"jsonpaths" = "[\"$.id\",\"$.accountId\",\"$.typeId\",\"$.name\",\"$.legalPerson\",\"$.telephone\",\"$.registerAddress\",\"$.address\",\"$.postal\",\"$.busLicenseFileName\",\"$.taxRegCertificateFileName\",\"$.orgCodeFileName\",\"$.taxNumber\",\"$.bankName\",\"$.remark\",\"$.bidCount\",\"$.packageCount\",\"$.serviceId\",\"$.statusFlag\",\"$.submitId\",\"$.auditId\",\"$.auditTime\",\"$.insertTime\",\"$.updateTime\",\"$.deleteTime\",\"$.mark\",\"$.version\",\"$.tag\",\"$.firstSuccessAuditTime\",\"$.isThreeInOne\",\"$.bankAccountNumber\",\"$.financePayAmount\",\"$.auditType\",\"$.systemRemark\",\"$.sysModifyDate\",\"$.provinceId\",\"$.cityId\",\"$.areaId\",\"$.platform\",\"$.managementProvinceId\",\"$.managementCityId\",\"$.managementAreaId\",\"$.managementAddress\",\"$.eventTime\"]",
"strip_outer_array" = "false"
)
FROM KAFKA
(
"kafka_broker_list"="10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092",
"kafka_topic"="ods_Homedo_t_Agent_Agent",
"kafka_partitions" = "0,1",
"kafka_offsets" = "3158,3160"
);

Tips

两个要点:

1.建表时要增加以下配置,这个DEMO使用的是Datetime,即使用时间来进行排序。

​ “function_column.sequence_type” = ‘Datetime’

​ 指定了上述配置后,建表会自动创建一个隐藏字段用于排序

​ 可以通过下面的命令来查看隐藏字段

1
2
3
-- 当前session开启查看可以隐藏字段
mysql> SET show_hidden_columns=true;
Query OK, 0 rows affected (0.00 sec)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
-- 查看表结构
-- 可以看到隐藏字段 __DORIS_SEQUENCE_COL__ ,类型为 DATETIME
mysql> desc dwd_homedo_real.`dwd_agent_agent_test1` all;
+-----------------------+---------------+---------------------------+---------------+------+-------+---------+---------+---------+
| IndexName | IndexKeysType | Field | Type | Null | Key | Default | Extra | Visible |
+-----------------------+---------------+---------------------------+---------------+------+-------+---------+---------+---------+
| dwd_agent_agent_test1 | UNIQUE_KEYS | id | BIGINT | Yes | true | NULL | | true |
| | | accountid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | typeid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | name | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | legalperson | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | telephone | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | registeraddress | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | address | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | postal | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | buslicensefilename | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | taxregcertificatefilename | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | orgcodefilename | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | taxnumber | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | bankname | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | remark | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | bidcount | BIGINT | Yes | false | NULL | REPLACE | true |
| | | packagecount | DECIMAL(18,2) | Yes | false | NULL | REPLACE | true |
| | | serviceid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | statusflag | BIGINT | Yes | false | NULL | REPLACE | true |
| | | submitid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | auditid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | audittime | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | inserttime | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | updatetime | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | deletetime | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | mark | BIGINT | Yes | false | NULL | REPLACE | true |
| | | version | BIGINT | Yes | false | NULL | REPLACE | true |
| | | tag | BIGINT | Yes | false | NULL | REPLACE | true |
| | | firstsuccessaudittime | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | isthreeinone | BIGINT | Yes | false | NULL | REPLACE | true |
| | | bankaccountnumber | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | financepayamount | DECIMAL(18,2) | Yes | false | NULL | REPLACE | true |
| | | audittype | BIGINT | Yes | false | NULL | REPLACE | true |
| | | systemremark | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | sysmodifydate | BIGINT | Yes | false | NULL | REPLACE | true |
| | | provinceid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | cityid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | areaid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | platform | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | managementprovinceid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | managementcityid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | managementareaid | BIGINT | Yes | false | NULL | REPLACE | true |
| | | managementaddress | VARCHAR(500) | Yes | false | NULL | REPLACE | true |
| | | eventtime | DATETIME | Yes | false | NULL | REPLACE | true |
| | | __DORIS_SEQUENCE_COL__ | DATETIME | Yes | false | NULL | REPLACE | false |
+-----------------------+---------------+---------------------------+---------------+------+-------+---------+---------+---------+
45 rows in set (0.01 sec)

2.Routine Load Task 需要指定 ORDER BY 的字段映射关系,这里使用的是ORDER BY eventtime。