Flume 应用配置

Flume使用的版本为1.6.0。

几种Flume的使用场景Config配置。


netcat Source Sink To Console

用于测试,控制台输入,控制台打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
agent1.sources = kafkaSource
agent1.channels = mc1
agent1.sinks = avro-sink

agent1.sources.kafkaSource.channels = mc1
agent1.sinks.avro-sink.channel = mc1

#source
agent1.sources.kafkaSource.type = netcat
agent1.sources.kafkaSource.bind = localhost
agent1.sources.kafkaSource.port = 44444

#channel1
agent1.channels.mc1.type = memory
agent1.channels.mc1.capacity = 10000
agent1.channels.mc1.transactionCapacity = 10000
agent1.channels.mc1.keep-alive = 60

#sink1
agent1.sinks.avro-sink.type = logger

Kafka Source Sink To Console

用于测试,消费Kafka的数据,打印到控制台。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
agent1.sources = logsource
agent1.channels = mc1
agent1.sinks = avro-sink

agent1.sources.logsource.channels = mc1
agent1.sinks.avro-sink.channel = mc1

#source
agent1.sources.logsource.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.logsource.zookeeperConnect = cdh9:2181/kafka
agent1.sources.logsource.topic = ubtDemo
agent1.sources.logsource.groupId = flume
agent1.sources.logsource.kafka.consumer.timeout.ms = 100

#channel1
agent1.channels.mc1.type = memory
agent1.channels.mc1.capacity = 10000
agent1.channels.mc1.transactionCapacity = 10000
agent1.channels.mc1.keep-alive = 60

#sink1
agent1.sinks.avro-sink.type = logger

启动命令:

1
./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume-kafka-source1.conf -n agent1 -Dflume.root.logger=INFO,console

Kafka Source Sink To Kafka

消费Kafka数据,Sink Kafka,数据的转发,

我们的应用场景:Source Kafka版本过低且不方便更新,所以使用Flume做消息的高低版本适配。

Flume 1.6.0 同时使用Kafka Source 、Kafka Sink时存在Bug:

配置的Sink Topic name会被Source Topic name覆盖失效,数据不会Sink到目标Kafka Topic,会循环Sink到Source Kafka Topic。
增加拦截器,使Sink Topic name 生效,该问题解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
agent1.sources = kafkaSource
agent1.channels = mc1
agent1.sinks = avro-sink

agent1.sources.kafkaSource.channels = mc1
agent1.sinks.avro-sink.channel = mc1

#source
agent1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafkaSource.zookeeperConnect = cdh9:2181/kafka
agent1.sources.kafkaSource.topic = ubtDemo
agent1.sources.kafkaSource.groupId = flume
agent1.sources.kafkaSource.kafka.consumer.timeout.ms = 100

#增加拦截器,使Sink Topic name 生效,解决Source Topic name覆盖失效问题
agent1.sources.kafkaSource.interceptors = i1
agent1.sources.kafkaSource.interceptors.i1.type = static
agent1.sources.kafkaSource.interceptors.i1.key = flumetest
agent1.sources.kafkaSource.interceptors.i1.preserveExisting = false
agent1.sources.kafkaSource.interceptors.i1.value = sinkTopic

#channel1
agent1.channels.mc1.type = memory
agent1.channels.mc1.capacity = 10000
agent1.channels.mc1.transactionCapacity = 10000
agent1.channels.mc1.keep-alive = 60

#sink1
agent1.sinks.avro-sink.type = org.apache.flume.sink.kafka.KafkaSink
#agent1.sinks.avro-sink.topic = flumetest
agent1.sinks.avro-sink.brokerList = cdh9:9092
agent1.sinks.avro-sink.requiredAcks = 1
agent1.sinks.avro-sink.batchSize = 20
agent1.sinks.avro-sink.channel = mc1

启动命令:

1
./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume-kafka-source-kafka-sink.conf -n agent1 -Dflume.root.logger=INFO,console