Flink 动态 Sink Kafka 不同 Topic

Flink 根据某个字段动态的把数据写入 Kafka 的某些 Topic

可能会想到多写几个 Sink 就可以了,的确,Flink 支持多个 Sink

但是如果需求是写入非常多的 Topic,那么代码会非常冗余

Flink 提供了高级的序列化模式,FlinkKafkaProducer 提供了 KafkaSerializationSchema 接口

这个模式允许分开的序列化 Key 和 Value,同时允许重写目标 Topic,因此一个 FlinkKafkaProducer 可以发送数据到多个 Topic


实现 KafkaSerializationSchema 接口

实现的需求为:将数据动态的 Sink 到 topic 字段值的 Topic 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.xxx.flink.utils;

import cn.analysys.ark.streaming.utils.JSONUtil;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

class MyKafkaSerialization implements KafkaSerializationSchema<String> {
@Override
public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
JsonNode jsonNode = JSONUtil.stringToJson(s);
String topic = jsonNode.get("topic").asText();
return new ProducerRecord<byte[], byte[]>(topic, s.getBytes(StandardCharsets.UTF_8));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.xxx.flink.app;

import com.xxx.flink.etl.SinkDiffTopicTest;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Main {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(3);

Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", Constant.KAFKA_BOOTSTRAP_SERVERS);
kafkaProperties.setProperty("enable.auto.commit", Constant.KAFKA_ENABLE_AUTO_COMMIT);
kafkaProperties.setProperty("group.id", Constant.KAFKA_GROUP_ID);

DataStream<String> ds = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProperties).setStartFromGroupOffsets());

ds.addSink(new FlinkKafkaProducer<>("", new MyKafkaSerialization(), kafkaProperties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

env.execute();
}
}