Flume使用的版本为1.6.0。
几种Flume的使用场景Config配置。
netcat Source Sink To Console
用于测试,控制台输入,控制台打印。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| agent1.sources = kafkaSource agent1.channels = mc1 agent1.sinks = avro-sink
agent1.sources.kafkaSource.channels = mc1 agent1.sinks.avro-sink.channel = mc1
#source agent1.sources.kafkaSource.type = netcat agent1.sources.kafkaSource.bind = localhost agent1.sources.kafkaSource.port = 44444
#channel1 agent1.channels.mc1.type = memory agent1.channels.mc1.capacity = 10000 agent1.channels.mc1.transactionCapacity = 10000 agent1.channels.mc1.keep-alive = 60
#sink1 agent1.sinks.avro-sink.type = logger
|
Kafka Source Sink To Console
用于测试,消费Kafka的数据,打印到控制台。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| agent1.sources = logsource agent1.channels = mc1 agent1.sinks = avro-sink
agent1.sources.logsource.channels = mc1 agent1.sinks.avro-sink.channel = mc1
#source agent1.sources.logsource.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.logsource.zookeeperConnect = cdh9:2181/kafka agent1.sources.logsource.topic = ubtDemo agent1.sources.logsource.groupId = flume agent1.sources.logsource.kafka.consumer.timeout.ms = 100
#channel1 agent1.channels.mc1.type = memory agent1.channels.mc1.capacity = 10000 agent1.channels.mc1.transactionCapacity = 10000 agent1.channels.mc1.keep-alive = 60
#sink1 agent1.sinks.avro-sink.type = logger
|
启动命令:
1
| ./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume-kafka-source1.conf -n agent1 -Dflume.root.logger=INFO,console
|
Kafka Source Sink To Kafka
消费Kafka数据,Sink Kafka,数据的转发,
我们的应用场景:Source Kafka版本过低且不方便更新,所以使用Flume做消息的高低版本适配。
Flume 1.6.0 同时使用Kafka Source 、Kafka Sink时存在Bug:
配置的Sink Topic name会被Source Topic name覆盖失效,数据不会Sink到目标Kafka Topic,会循环Sink到Source Kafka Topic。
增加拦截器,使Sink Topic name 生效,该问题解决。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| agent1.sources = kafkaSource agent1.channels = mc1 agent1.sinks = avro-sink
agent1.sources.kafkaSource.channels = mc1 agent1.sinks.avro-sink.channel = mc1
#source agent1.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.kafkaSource.zookeeperConnect = cdh9:2181/kafka agent1.sources.kafkaSource.topic = ubtDemo agent1.sources.kafkaSource.groupId = flume agent1.sources.kafkaSource.kafka.consumer.timeout.ms = 100
#增加拦截器,使Sink Topic name 生效,解决Source Topic name覆盖失效问题 agent1.sources.kafkaSource.interceptors = i1 agent1.sources.kafkaSource.interceptors.i1.type = static agent1.sources.kafkaSource.interceptors.i1.key = flumetest agent1.sources.kafkaSource.interceptors.i1.preserveExisting = false agent1.sources.kafkaSource.interceptors.i1.value = sinkTopic
#channel1 agent1.channels.mc1.type = memory agent1.channels.mc1.capacity = 10000 agent1.channels.mc1.transactionCapacity = 10000 agent1.channels.mc1.keep-alive = 60
#sink1 agent1.sinks.avro-sink.type = org.apache.flume.sink.kafka.KafkaSink #agent1.sinks.avro-sink.topic = flumetest agent1.sinks.avro-sink.brokerList = cdh9:9092 agent1.sinks.avro-sink.requiredAcks = 1 agent1.sinks.avro-sink.batchSize = 20 agent1.sinks.avro-sink.channel = mc1
|
启动命令:
1
| ./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume-kafka-source-kafka-sink.conf -n agent1 -Dflume.root.logger=INFO,console
|