单节点部署三台Kafka

单节点部署三台Kafka、启动及测试


下载地址

http://archive.cloudera.com/kafka/kafka/4/kafka-2.2.1-kafka4.1.0.tar.gz

解压

1
tar -zxvf kafka-2.2.1-kafka4.1.0.tar.gz -C ../app

部署Kafka之前,检测Zookeeper是ok的

1
2
[zk: localhost:2181(CONNECTED) 0] ls /
[zktest, tunan, zookeeper, kafka]

编辑config/server.properties文件

1
2
3
4
5
6
7
8
#broker.id=0
#log.dirs=/tmp/kafka-logs

broker.id=0
host.name=hadoop
port=9090
log.dirs=/home/hadoop/tmp/kafka-logs00
zookeeper.connect=hadoop:2181/kafka

复制Kafka文件夹为三份

1
2
3
cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka01
cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka02
mv kafka_2.11-2.2.1-kafka-4.1.0/ kafka03

修改kafka02的config/server.properties文件

1
2
3
4
5
broker.id=2
host.name=hadoop
port=9092
log.dirs=/home/hadoop/tmp/kafka-logs02
zookeeper.connect=hadoop:2181/kafka

启动(三台都需要输入命令)

1
bin/kafka-server-start.sh -daemon config/server.properties

创建topic

1
2
3
4
5
6
./kafka-topics.sh \
--create \
--zookeeper hadoop:2181/kafka \
--partitions 3 \
--replication-factor 2 \
--topic test

查看指定topic的状况

1
2
3
4
./kafka-topics.sh \
--describe \
--zookeeper hadoop:2181/kafka \
--topic test

测试

启动生产者

1
2
3
./kafka-console-producer.sh \
--broker-list hadoop:9090,hadoop:9091,hadoop:9092 \
--topic test

启动消费者

1
2
3
4
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop:9090,hadoop:9091,hadoop:9092 \
--from-beginning \
--topic test

发送数据

1
2
3
>a
>a
...

接收数据

1
2
3
a
a
...

关闭kafka集群(每台都要执行)

修改kafka-server-stop.sh

1
2
3
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
修改为
PIDS=$(jps -lm | grep -i 'kafka'| awk '{print $1}')

命令详解:
使用jps -lm命令列出所有的java进程,然后通过管道,利用grep -i ‘kafka.Kafka’命令将kafka进程筛出来,最后再接一管道命令,利用awk将进程号取出来。

分别执行kafka-server-stop.sh