Flink 消费 Kafka 数据时,有些场景需要获取 Kafka Partition、Offset
可以使用 JSONKeyValueDeserializationSchema 反序列化器获取 Kafka 数据的 Metadata 信息
代码片段
1 2 3
| public static DataStream<ObjectNode> getWithSchema(StreamExecutionEnvironment env, String topic) { return env.addSource(new FlinkKafkaConsumer<>(topic, new JSONKeyValueDeserializationSchema(true), kafkaProperties).setStartFromGroupOffsets()); }
|
JSONKeyValueDeserializationSchema 反序列化器获得的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| { "value":[ { "xwhen":1628419647453, "xcontext":{ "$lib":"JS", "$debug":2, "$is_login":false, "$first_visit_language":"zh-cn", "$first_visit_time":"2021-08-08 18:47:27.453", "$lib_version":"4.5.5.1", "$platform":"JS" }, "appid":"66f27704d3162247", "xwho":"JSe81d8850b1da3aa471658c6f99ac1a0fe81d", "xwhat":"$profile_set_once" } ], "metadata":{ "offset":0, "topic":"ubt_data_collection_202105", "partition":1 } }
|