Flume使用的版本为1.6.0。
Flume配置负载均衡,Kafka Source -> Avro -> Avro -> Kafka Sink 场景。
Kafka Source Sink To Kafka
结构信息

配置信息
Agent节点配置
flume_analysys_loadbalance_agent.conf
sink port ip 为目标机器 ip
Tips:拦截器一定要配置!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| agentX.sources = sX agentX.channels = chX agentX.sinks = sk0 sk1 sk2
agentX.sources.sX.channels = chX
agentX.sources.sX.type = org.apache.flume.source.kafka.KafkaSource agentX.sources.sX.zookeeperConnect = ark1:2181/kafka agentX.sources.sX.topic = post_66f27704d3162247 agentX.sources.sX.groupId = flume agentX.sources.sX.kafka.consumer.timeout.ms = 100
# Configure interceptors agentX.sources.sX.interceptors = i1 agentX.sources.sX.interceptors.i1.type = static agentX.sources.sX.interceptors.i1.key = topic agentX.sources.sX.interceptors.i1.preserveExisting = false agentX.sources.sX.interceptors.i1.value = flume-test01
agentX.channels.chX.type = memory agentX.channels.chX.capacity = 10000 agentX.channels.chX.transactionCapacity = 10000 agent1.channels.chX.keep-alive = 60
# Configure sinks agentX.sinks.sk0.channel = chX agentX.sinks.sk0.type = avro agentX.sinks.sk0.hostname = 10.0.15.182 agentX.sinks.sk0.port = 44441 agentX.sinks.sk1.channel = chX agentX.sinks.sk1.type = avro agentX.sinks.sk1.hostname = 10.0.15.181 agentX.sinks.sk1.port = 44441 agentX.sinks.sk2.channel = chX agentX.sinks.sk2.type = avro agentX.sinks.sk2.hostname = 10.0.15.133 agentX.sinks.sk2.port = 44441
# Configure loadbalance agentX.sinkgroups = g1 agentX.sinkgroups.g1.sinks = sk0 sk1 sk2 agentX.sinkgroups.g1.processor.type = load_balance agentX.sinkgroups.g1.processor.backoff=true agentX.sinkgroups.g1.processor.selector=round_robin #agentX.sinkgroups.g1.processor.selector=random
|
load balance 0 节点配置
flume_loadbalance_collector0.conf
bind配置的ip为本机ip
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| agent0.sources = s0 agent0.channels = ch0 agent0.sinks = sk0
agent0.sources.s0.channels = ch0 agent0.sources.s0.type = avro agent0.sources.s0.bind = 10.0.15.182 agent0.sources.s0.port = 44441
agent0.channels.ch0.type = memory agent0.channels.ch0.capacity = 10000 agent0.channels.ch0.transactionCapacity = 10000 agent0.channels.ch0.keep-alive = 60
#sink0 agent0.sinks.sk0.type = org.apache.flume.sink.kafka.KafkaSink agent0.sinks.sk0.topic = flume-test01 agent0.sinks.sk0.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092 agent0.sinks.sk0.requiredAcks = 1 agent0.sinks.sk0.batchSize = 20 agent0.sinks.sk0.channel = ch0
|
load balance 1 节点配置
flume_loadbalance_collector1.conf
bind配置的ip为本机ip
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| agent1.sources = s1 agent1.channels = ch1 agent1.sinks = sk1
agent1.sources.s1.channels = ch1 agent1.sources.s1.type = avro agent1.sources.s1.bind = 10.0.15.181 agent1.sources.s1.port = 44441
agent1.channels.ch1.type = memory agent1.channels.ch1.capacity = 10000 agent1.channels.ch1.transactionCapacity = 10000 agent1.channels.ch1.keep-alive = 60
#sink1 agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sk1.topic = flume-test01 agent1.sinks.sk1.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092 agent1.sinks.sk1.requiredAcks = 1 agent1.sinks.sk1.batchSize = 20 agent1.sinks.sk1.channel = ch1
|
load balance 2 节点配置
flume_loadbalance_collector2.conf
bind配置的ip为本机ip
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| agent2.sources = s2 agent2.channels = ch2 agent2.sinks = sk2
agent2.sources.s2.channels = ch2 agent2.sources.s2.type = avro agent2.sources.s2.bind = 10.0.15.133 agent2.sources.s2.port = 44441
agent2.channels.ch2.type = memory agent2.channels.ch2.capacity = 1000 agent2.channels.ch2.transactionCapacity = 100 agent2.channels.ch2.keep-alive = 60
#sink2 agent2.sinks.sk2.type = org.apache.flume.sink.kafka.KafkaSink agent2.sinks.sk2.topic = flume-test01 agent2.sinks.sk2.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092 agent2.sinks.sk2.requiredAcks = 1 agent2.sinks.sk2.batchSize = 20 agent2.sinks.sk2.channel = ch2
|
启动命令
启动顺序:load balance节点 -> agent节点
load balance 0 节点启动
1
| ./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector0.conf -n agent0 -Dflume.root.logger=INFO,console
|
load balance 1 节点启动
1
| ./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector1.conf -n agent1 -Dflume.root.logger=INFO,console
|
load balance 2 节点启动
1
| ./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector2.conf -n agent2 -Dflume.root.logger=INFO,console
|
agent 节点启动
1
| ./flume-ng agent -c /opt/soft/apache-flume-1.6.0-bin/conf -f /opt/soft/apache-flume-1.6.0-bin/jobs/flume_analysys_loadbalance_agent.conf -n agentX -Dflume.root.logger=INFO,console
|