微系列:5、在Centos系统中,搭建Kafka集群

一、准备基础环境

1、需安装Java环境并配置环境变量  https://jdk.java.net/java-se-ri/11

2、下载kafka_2.13-2.6.1压缩包  http://kafka.apache.org/downloads

3、配置防火墙,开放相关端口

二、修改配置文件

进入kafka目录下的config文件夹下,修改配置文件server.properties内容为:.

# broker的id号,同一个集群中每个节点设置为不同的idbroker.id=0# 监听协议及地址listeners=SASL_PLAINTEXT://192.168.0.1:9092# 认证鉴权security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAIN# 数据目录log.dirs=/home/kafka/kafka-logs# zookeeper地址zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181

三、启动服务脚本

修改启动脚本,配置认证的用户名密码

编辑bin目录中kafka-server-start.sh,加入以下启动参数

创建topic、producer、consumer的脚本都需要加入以下参数

if [  "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/kafka/kafka_2.13-2.6.1/config/kafka_server_jaas.conf"fi

kafka_server_jaas.conf文件内容如下

KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="chexiaotong" user_admin="chexiaotong" user_kafka="chexiaotong";};KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="chexiaotong";};
然后进入bin目录,使用如下脚本启动Kafka节点./kafka-server-start.sh -daemon /home/kafka/kafka_2.13-2.6.1/config/server.properties

其它节点配置启动类似

四、设置开机启动

创建文件/etc/rc.d/init.d/kafka,写入以下内容

#!/bin/bash
export JAVA_HOME=/usr/local/jdk-11export PATH=$JAVA_HOME/bin:$PATH
## kafka  ---  this script is used to start and stop kafka## chkconfig:   - 80 12# description:  kafka# processname: kafka
STARTEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-start.shSTOPEXEC=/home/kafka/kafka_2.13-2.6.1/bin/kafka-server-stop.shCONF=/home/kafka/kafka_2.13-2.6.1/config/server.propertiescase $1 in          start)               $STARTEXEC -daemon $CONF              ;;          stop)              $STOPEXEC              ;;          status)              jps              ;;          restart)              $STOPEXEC              $STARTEXEC -daemon $CONF              ;;          *)              echo "require start|stop|status|restart"              ;;esac

添加可执行权限

chmod +x /etc/rc.d/init.d/kafka

注册为系统服务

chkconfig --add kafka

添加开机自启动

chkconfig kafka on

其他命令使用

创建topic

./kafka-topics.sh --create --bootstrap-server 192.168.0.1:9092 --replication-factor 1 --partitions 1 --topic test

发布消息

./kafka-console-producer.sh --producer.config ../config/producer.properties --bootstrap-server 192.168.0.1:9092 --topic test

消费消息

./kafka-console-consumer.sh --consumer.config ../config/consumer.properties --bootstrap-server 192.168.0.1:9092 --topic test --from-beginning

五、Kafka其他相关概念

kafka实例(broker),或者说服务器节点。

消息(event records也叫record或者message),一条消息有key,value,timestamp和一些可选的headers。

生产者(producer),发布消息。

消费者(comsumer),订阅消息。

主题(topic),用于消息归类。概念上类似文件系统的文件夹,消息是这个文件夹中的文件,或者可以理解为类似于别的消息系统的队列。

分区(partition),主题是分区的,一个主题可以有多个分区,可以分布在不同的broker中,kafka保证单个分区的消息是有序的。

副本(replica),为了容错和高可用,每个主题可以被复制。复制的对象是分区,也就是说分区可以被复制为多个,统称为副本,副本数量可配置。

日志(log) ,存储消息的地方,分区的具体实现,日志持久化到文件系统。

主题与分区示意图 

微系列:5、在Centos系统中,搭建Kafka集群

基本脚本使用

启动

bin/zookeeper-server-start.sh config/zookeeper.properties

创建主题,创建一个名字叫test,单个分区,1个副本的主题

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看主题

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

发送消息

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning