Flink 根据某个字段动态的把数据写入 Kafka 的某些 Topic
可能会想到多写几个 Sink 就可以了,的确,Flink 支持多个 Sink
但是如果需求是写入非常多的 Topic,那么代码会非常冗余
Flink 提供了高级的序列化模式,FlinkKafkaProducer 提供了 KafkaSerializationSchema 接口
这个模式允许分开的序列化 Key 和 Value,同时允许重写目标 Topic,因此一个 FlinkKafkaProducer 可以发送数据到多个 Topic
实现 KafkaSerializationSchema 接口
实现的需求为:将数据动态的 Sink 到 topic
字段值的 Topic 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.xxx.flink.utils;
import cn.analysys.ark.streaming.utils.JSONUtil; import com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; import java.nio.charset.StandardCharsets;
class MyKafkaSerialization implements KafkaSerializationSchema<String> { @Override public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) { JsonNode jsonNode = JSONUtil.stringToJson(s); String topic = jsonNode.get("topic").asText(); return new ProducerRecord<byte[], byte[]>(topic, s.getBytes(StandardCharsets.UTF_8)); } }
|
Flink Sink Diff Kafka Topic Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.xxx.flink.app;
import com.xxx.flink.etl.SinkDiffTopicTest; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Main {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3); Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", Constant.KAFKA_BOOTSTRAP_SERVERS); kafkaProperties.setProperty("enable.auto.commit", Constant.KAFKA_ENABLE_AUTO_COMMIT); kafkaProperties.setProperty("group.id", Constant.KAFKA_GROUP_ID); DataStream<String> ds = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties).setStartFromGroupOffsets());
ds.addSink(new FlinkKafkaProducer<>("", new MyKafkaSerialization(), kafkaProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)); env.execute(); } }
|