一、准备基础环境
1、需安装Java环境并配置环境变量 https://jdk.java.net/java-se-ri/11
2、下载kafka_2.13-2.6.1压缩包 http://kafka.apache.org/downloads
3、配置防火墙,开放相关端口
二、修改配置文件
进入kafka目录下的config文件夹下,修改配置文件server.properties内容为:.
# broker的id号,同一个集群中每个节点设置为不同的idbroker.id=0# 监听协议及地址listeners=SASL_PLAINTEXT://192.168.0.1:9092# 认证鉴权security.inter.broker.protocol=SASL_PLAINTEXTsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAIN# 数据目录log.dirs=/home/kafka/kafka-logs# zookeeper地址zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
三、启动服务脚本
修改启动脚本,配置认证的用户名密码
编辑bin目录中kafka-server-start.sh,加入以下启动参数
创建topic、producer、consumer的脚本都需要加入以下参数
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.13-2.6.1/config/kafka_server_jaas.conf"fi
kafka_server_jaas.conf文件内容如下
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="chexiaotong"user_admin="chexiaotong"user_kafka="chexiaotong";};KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="chexiaotong";};然后进入bin目录,使用如下脚本启动Kafka节点./kafka-server-start.sh -daemon /home/kafka/kafka_2.13-2.6.1/config/server.properties
其它节点配置启动类似
四、设置开机启动
创建文件/etc/rc.d/init.d/kafka,写入以下内容
#!/bin/bashexport JAVA_HOME=/usr/local/jdk-11export PATH=$JAVA_HOME/bin:$PATH## kafka --- this script is used to start and stop kafka## chkconfig: - 80 12# description: kafka# processname: kafkaSTARTEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-start.shSTOPEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-stop.shCONF=/home/kafka/kafka_2.13-2.6.1/config/server.propertiescase $1 instart)$STARTEXEC -daemon $CONF;;stop)$STOPEXEC;;status)jps;;restart)$STOPEXEC$STARTEXEC -daemon $CONF;;*)echo "require start|stop|status|restart";;esac
添加可执行权限
chmod +x /etc/rc.d/init.d/kafka注册为系统服务
chkconfig --add kafka添加开机自启动
chkconfig kafka on其他命令使用
创建topic
./kafka-topics.sh --create --bootstrap-server 192.168.0.1:9092 --replication-factor 1 --partitions 1 --topic test发布消息
./kafka-console-producer.sh --producer.config ../config/producer.properties --bootstrap-server 192.168.0.1:9092 --topic test消费消息
./kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server 192.168.0.1:9092 --topic test --from-beginning五、Kafka其他相关概念
kafka实例(broker),或者说服务器节点。
消息(event records也叫record或者message),一条消息有key,value,timestamp和一些可选的headers。
生产者(producer),发布消息。
消费者(comsumer),订阅消息。
主题(topic),用于消息归类。概念上类似文件系统的文件夹,消息是这个文件夹中的文件,或者可以理解为类似于别的消息系统的队列。
分区(partition),主题是分区的,一个主题可以有多个分区,可以分布在不同的broker中,kafka保证单个分区的消息是有序的。
副本(replica),为了容错和高可用,每个主题可以被复制。复制的对象是分区,也就是说分区可以被复制为多个,统称为副本,副本数量可配置。
日志(log) ,存储消息的地方,分区的具体实现,日志持久化到文件系统。
主题与分区示意图

基本脚本使用
启动
bin/zookeeper-server-start.sh config/zookeeper.properties创建主题,创建一个名字叫test,单个分区,1个副本的主题
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看主题
bin/kafka-topics.sh --list --bootstrap-server localhost:9092发送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test消费消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning