单节点部署三台Kafka、启动及测试
下载地址
http://archive.cloudera.com/kafka/kafka/4/kafka-2.2.1-kafka4.1.0.tar.gz
解压
1
| tar -zxvf kafka-2.2.1-kafka4.1.0.tar.gz -C ../app
|
部署Kafka之前,检测Zookeeper是ok的
1 2
| [zk: localhost:2181(CONNECTED) 0] ls / [zktest, tunan, zookeeper, kafka]
|
编辑config/server.properties文件
1 2 3 4 5 6 7 8
| #broker.id=0 #log.dirs=/tmp/kafka-logs
broker.id=0 host.name=hadoop port=9090 log.dirs=/home/hadoop/tmp/kafka-logs00 zookeeper.connect=hadoop:2181/kafka
|
复制Kafka文件夹为三份
1 2 3
| cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka01 cp -R kafka_2.11-2.2.1-kafka-4.1.0/ kafka02 mv kafka_2.11-2.2.1-kafka-4.1.0/ kafka03
|
修改kafka02的config/server.properties文件
1 2 3 4 5
| broker.id=2 host.name=hadoop port=9092 log.dirs=/home/hadoop/tmp/kafka-logs02 zookeeper.connect=hadoop:2181/kafka
|
启动(三台都需要输入命令)
1
| bin/kafka-server-start.sh -daemon config/server.properties
|
创建topic
1 2 3 4 5 6
| ./kafka-topics.sh \ --create \ --zookeeper hadoop:2181/kafka \ --partitions 3 \ --replication-factor 2 \ --topic test
|
查看指定topic的状况
1 2 3 4
| ./kafka-topics.sh \ --describe \ --zookeeper hadoop:2181/kafka \ --topic test
|
测试
启动生产者
1 2 3
| ./kafka-console-producer.sh \ --broker-list hadoop:9090,hadoop:9091,hadoop:9092 \ --topic test
|
启动消费者
1 2 3 4
| bin/kafka-console-consumer.sh \ --bootstrap-server hadoop:9090,hadoop:9091,hadoop:9092 \ --from-beginning \ --topic test
|
发送数据
接收数据
关闭kafka集群(每台都要执行)
修改kafka-server-stop.sh
1 2 3
| PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}') 修改为 PIDS=$(jps -lm | grep -i 'kafka'| awk '{print $1}')
|
命令详解:
使用jps -lm命令列出所有的java进程,然后通过管道,利用grep -i ‘kafka.Kafka’命令将kafka进程筛出来,最后再接一管道命令,利用awk将进程号取出来。
分别执行kafka-server-stop.sh