Flume 负载均衡 KafkaSource KafkaSink

Flume使用的版本为1.6.0。

Flume配置负载均衡,Kafka Source -> Avro -> Avro -> Kafka Sink 场景。


Kafka Source Sink To Kafka

结构信息

Flume负载均衡KafkaSourceKafkaSink

配置信息

Agent节点配置

flume_analysys_loadbalance_agent.conf

sink port ip 为目标机器 ip

Tips:拦截器一定要配置!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
agentX.sources = sX
agentX.channels = chX
agentX.sinks = sk0 sk1 sk2

agentX.sources.sX.channels = chX

agentX.sources.sX.type = org.apache.flume.source.kafka.KafkaSource
agentX.sources.sX.zookeeperConnect = ark1:2181/kafka
agentX.sources.sX.topic = post_66f27704d3162247
agentX.sources.sX.groupId = flume
agentX.sources.sX.kafka.consumer.timeout.ms = 100

# Configure interceptors
agentX.sources.sX.interceptors = i1
agentX.sources.sX.interceptors.i1.type = static
agentX.sources.sX.interceptors.i1.key = topic
agentX.sources.sX.interceptors.i1.preserveExisting = false
agentX.sources.sX.interceptors.i1.value = flume-test01

agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 10000
agentX.channels.chX.transactionCapacity = 10000
agent1.channels.chX.keep-alive = 60

# Configure sinks
agentX.sinks.sk0.channel = chX
agentX.sinks.sk0.type = avro
agentX.sinks.sk0.hostname = 10.0.15.182
agentX.sinks.sk0.port = 44441
agentX.sinks.sk1.channel = chX
agentX.sinks.sk1.type = avro
agentX.sinks.sk1.hostname = 10.0.15.181
agentX.sinks.sk1.port = 44441
agentX.sinks.sk2.channel = chX
agentX.sinks.sk2.type = avro
agentX.sinks.sk2.hostname = 10.0.15.133
agentX.sinks.sk2.port = 44441

# Configure loadbalance
agentX.sinkgroups = g1
agentX.sinkgroups.g1.sinks = sk0 sk1 sk2
agentX.sinkgroups.g1.processor.type = load_balance
agentX.sinkgroups.g1.processor.backoff=true
agentX.sinkgroups.g1.processor.selector=round_robin
#agentX.sinkgroups.g1.processor.selector=random
load balance 0 节点配置

flume_loadbalance_collector0.conf

bind配置的ip为本机ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
agent0.sources = s0
agent0.channels = ch0
agent0.sinks = sk0

agent0.sources.s0.channels = ch0
agent0.sources.s0.type = avro
agent0.sources.s0.bind = 10.0.15.182
agent0.sources.s0.port = 44441

agent0.channels.ch0.type = memory
agent0.channels.ch0.capacity = 10000
agent0.channels.ch0.transactionCapacity = 10000
agent0.channels.ch0.keep-alive = 60

#sink0
agent0.sinks.sk0.type = org.apache.flume.sink.kafka.KafkaSink
agent0.sinks.sk0.topic = flume-test01
agent0.sinks.sk0.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092
agent0.sinks.sk0.requiredAcks = 1
agent0.sinks.sk0.batchSize = 20
agent0.sinks.sk0.channel = ch0
load balance 1 节点配置

flume_loadbalance_collector1.conf

bind配置的ip为本机ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
agent1.sources = s1
agent1.channels = ch1
agent1.sinks = sk1

agent1.sources.s1.channels = ch1
agent1.sources.s1.type = avro
agent1.sources.s1.bind = 10.0.15.181
agent1.sources.s1.port = 44441

agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 10000
agent1.channels.ch1.transactionCapacity = 10000
agent1.channels.ch1.keep-alive = 60

#sink1
agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sk1.topic = flume-test01
agent1.sinks.sk1.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092
agent1.sinks.sk1.requiredAcks = 1
agent1.sinks.sk1.batchSize = 20
agent1.sinks.sk1.channel = ch1
load balance 2 节点配置

flume_loadbalance_collector2.conf

bind配置的ip为本机ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
agent2.sources = s2
agent2.channels = ch2
agent2.sinks = sk2

agent2.sources.s2.channels = ch2
agent2.sources.s2.type = avro
agent2.sources.s2.bind = 10.0.15.133
agent2.sources.s2.port = 44441

agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 1000
agent2.channels.ch2.transactionCapacity = 100
agent2.channels.ch2.keep-alive = 60

#sink2
agent2.sinks.sk2.type = org.apache.flume.sink.kafka.KafkaSink
agent2.sinks.sk2.topic = flume-test01
agent2.sinks.sk2.brokerList = 10.0.15.130:9092,10.0.15.131:9092,10.0.15.132:9092
agent2.sinks.sk2.requiredAcks = 1
agent2.sinks.sk2.batchSize = 20
agent2.sinks.sk2.channel = ch2

启动命令

启动顺序:load balance节点 -> agent节点

load balance 0 节点启动
1
./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector0.conf -n agent0 -Dflume.root.logger=INFO,console
load balance 1 节点启动
1
./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector1.conf -n agent1 -Dflume.root.logger=INFO,console
load balance 2 节点启动
1
./flume-ng agent -c /opt/software/flume/apache-flume-1.6.0-bin/conf -f /opt/software/flume/apache-flume-1.6.0-bin/jobs/flume_loadbalance_collector2.conf -n agent2 -Dflume.root.logger=INFO,console
agent 节点启动
1
./flume-ng agent -c /opt/soft/apache-flume-1.6.0-bin/conf -f /opt/soft/apache-flume-1.6.0-bin/jobs/flume_analysys_loadbalance_agent.conf -n agentX -Dflume.root.logger=INFO,console