Flink 获取 Kafka Metadata

Flink 消费 Kafka 数据时,有些场景需要获取 Kafka Partition、Offset

可以使用 JSONKeyValueDeserializationSchema 反序列化器获取 Kafka 数据的 Metadata 信息


代码片段

1
2
3
public static DataStream<ObjectNode> getWithSchema(StreamExecutionEnvironment env, String topic) {
return env.addSource(new FlinkKafkaConsumer<>(topic, new JSONKeyValueDeserializationSchema(true), kafkaProperties).setStartFromGroupOffsets());
}

JSONKeyValueDeserializationSchema 反序列化器获得的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
"value":[
{
"xwhen":1628419647453,
"xcontext":{
"$lib":"JS",
"$debug":2,
"$is_login":false,
"$first_visit_language":"zh-cn",
"$first_visit_time":"2021-08-08 18:47:27.453",
"$lib_version":"4.5.5.1",
"$platform":"JS"
},
"appid":"66f27704d3162247",
"xwho":"JSe81d8850b1da3aa471658c6f99ac1a0fe81d",
"xwhat":"$profile_set_once"
}
],
"metadata":{
"offset":0,
"topic":"ubt_data_collection_202105",
"partition":1
}
}