【Linux存储系列教程】kafka消息队列(中间件)
一、关于kafka
1.kafka是什么?
kafka
是一个多分区、多副本且基于zookeeper
协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
2.kafka的作用
- 消息系统
kafka
具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、可扩展性、可恢复性等功能。- 提供消息顺序性保障、回溯消费功能
- 存储系统
- 支持将消息持久化到磁盘
- 流处理平台
- 提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作
3.kafka基本结构
producer
生产者- 发送消息的一方
- 生产者负责创建消息,然后将其投递 到
kafka
中
consumer
消费者- 消费者,也就是接收消息的一方。
- 消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
consumer Group(CG)
消费者组- 消费者组,由多个
consumer
组成。 - 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
- 消费者组之间互不影响
- 消费者组,由多个
broker
kafka服务器节点或实例- 一个
kafka
集群由多个broker
组成 - 一个
broker
可以容纳多个topic
- 一个
topic
主题- Kafka中的消息以主题为单位进行归类,
- 生产者负责将消息发送到特定的主题(发送到
Kafka
集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费
partition
分区- 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区
Topic-Partition
。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志Log
文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量offset
。offset
是消息在分区中的唯一标识,Kafka
通过它来保证消息在分区内的顺序性,不过offset
并不跨越分区,也就是说,Kafka
保证的是分区有序而不是主题有序。
- 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区
replica
副本Kafka
为分区引入了多副本Replica
机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中leader
副本负责处理读写请求,follower
副本只负责与leader副本的消息同步。副本处于不同的broker
中,当leader
副本出现故障时,从follower
副本中重新选举新的leader副本对外提供服务。Kafka
通过多副本机制实现了故障的自动转移,当Kafka
集群中某个broker
失效时仍然能保证服务可用。
二、kafka的复制机制
Kafka
使用ISR
的方式进行数据复制,则有效地权衡了数据可靠性和性能之间的关系。
AR、ISR、OSR、HW、LEO
分区中的所有副本统称为AR
Assigned Replicas
。所有与leader副本保持一定程度同步的副本(包括leader
副本在内)组成ISRIn-Sync Replicas
,ISR集合是AR集合中的一个子集。消息会先发送到leader
副本,然后follower
副本才能从leader
副本中拉取消息进行同步,同步期间内follower
副本相对于leader
副本而言会有一定程度的滞后。
与
leader
副本同步滞后过多的副本不包括leader副本组成OSROut-of-Sync Replicas
,由此可见,AR=ISR+OSR
。在正常情况下,所有的follower
副本都应该与leader
副本保持一定程度的同步,即AR=ISR
,OSR
集合为空。
leader
副本负责维护和跟踪ISR集合中所有follower
副本的滞后状态,当follower
副本落后太多或失效时,leader
副本会把它从ISR集合中剔除。如果OSR集合中有follower
副本追上了leader
副本,那么leader
副本会把它从OSR集合转移至ISR集合。默认情况下,当leader
副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader
,而在OSR集合中的副本则没有任何机会不过这个原则也可以通过修改相应的参数配置来改变。
ISR与HW和LEO也有紧密的关系。HW是
High Watermark
的缩写,俗称高水位,它标识了一个特定的消息偏移量offset
,消费者只能拉取到这个offset
之前的消息。
LEO是
Log End Offset
的缩写,它标识当前日志文件中下一条待写入消息的offset
,分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。
三、kafka消息确认机制 ACK
producer
发送消息后,leader
将消息同步给follower
,然后返回ACK
给producer
,表示消息已收到,此时才可以继续发送下一条消息。
kafka
提供了以下3种ACK
级别:0
:leader
接收到消息马上返回ack
,此时可能还没有写入磁盘,可能丢失数据1
:leader
将消息写入磁盘后,马上返回ack
,此时可能还没同步follower
,同样可能丢失数据-1(all)
:leader
和follower
都将数据写入磁盘后,返回ack
。但是如果在写入磁盘后,ack
尚未发送,此时leader
发生故障,会导致数据写入重复
四、kafka的partition分区方式
consumer
采用pull
方式主动从broker
拉取数据,此时会传入timeout
参数,如果当前没有数据可消费,consumer
会等待一段时间,直到timeout
超期才返回
1
个topic
有多个partition
,1
个consumer-group
有多个consumer
,这其中就涉及到partition
的分配问题。
kafka
提供2
种分配方式:Range
和RoundRobin
range
- 原理是将
partition数
/consumer数
,来决定每个consumer
分配几个partition
。如果除不尽,则前面几个consumer
会多1
个partition
。
- 原理是将
RoundRobin
- 轮询每个
consumer
,逐一分配
- 轮询每个
五、kafka集群部署
在部署
kafka
之前,需要提前部署zookeeper
!点我跳转到zookeeper
部署教程。
1.部署jdk环境
过程省略,请查看
zookeeper
部署教程:https://www.wsjj.top/archives/107
2.安装kafka
A.下载kafka
如果提示没有
wget
命令,请安装yum install -y wget
[root@zk1 ~]# wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
B.安装kafka
[root@zk1 ~]# tar xf kafka_2.13-3.3.1.tgz -C /usr/local/
[root@zk1 ~]# ls /usr/local/
bin games jdk1.8.0_191 lib libexec share zookeeper
etc include kafka_2.13-3.3.1 lib64 sbin src zookeeper37
[root@zk1 ~]# mv /usr/local/kafka_2.13-3.3.1 /usr/local/kafka33
C.配置环境变量
[root@zk1 ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export KAFKA_HOME=/usr/local/kafka33
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin #自行修改上一期教程添加的字段
[root@zk1 ~]# source /etc/profile
D.创建kafka
日志目录
[root@zk1 ~]# mkdir /usr/local/kafka33/kafka-logs
E.修改kafka
配置文件
[root@zk1 ~]# vim /usr/local/kafka33/config/server.properties
#配置文件并不完整,以下仅展示修改的地方。
broker.id=0 #默认保持0即可
listeners=PLAINTEXT://192.168.140.10:9092 #去掉注释,填写本机IP
log.dirs=/usr/local/kafka33/kafka-logs #修改日志存放路径
zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181 #指定zookeeper地址,我们有三台。
F.把kafka
目录拷贝给另外2台机器
[root@zk1 ~]# for i in 11 12; do scp -r /usr/local/kafka33/ root@192.168.140.$i:/usr/local/; done
G.拷贝环境文件给另外2台机器
[root@zk1 ~]# for i in 11 12
> do
> scp -r /etc/profile root@192.168.140.$i:/etc/
> done
H.修改另外2台服务器上的配置文件
[root@zk2 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=1 #修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.11:9092 #修改监听IP为本机
[root@zk3 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=2 #修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.12:9092 #修改监听IP为本机
重新加载环境配置文件
[root@zk2 ~]# source /etc/profile
[root@zk3 ~]# source /etc/profile
I.启动服务
[root@zk1 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk2 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk3 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
J.查看是否成功运行
[root@zk1 ~]# jps
3170 Jps
2182 QuorumPeerMain
3035 Kafka
[root@zk2 ~]# jps
2770 Kafka
2853 Jps
1966 QuorumPeerMain
[root@zk3 ~]# jps
2632 Kafka
2715 Jps
1838 QuorumPeerMain
3.在zookeeper
中查看kafka
的注册信息
[root@zk1 ~]# /usr/local/zookeeper37/bin/zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
#可以看到多了很多目录
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.10","version":5,"timestamp":"1683542941484"}
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.11:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.11","version":5,"timestamp":"1683543144766"}
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.12:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.12","version":5,"timestamp":"1683543145833"}
4.测试topic
操作
[root@zk1 ~]# kafka-topics.sh --create --topic test1 --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
Created topic test1.
[root@zk1 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092
test1 #查看当前系统所有topic
[root@zk1 ~]# kafka-topics.sh --describe --topic test1 --bootstrap-server 192.168.140.10:9092
Topic: test1 TopicId: kJ1ilOuQTMOJO8RQasCRrA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
#查看详细信息
[root@zk1 ~]# kafka-topics.sh --delete --topic test1 --bootstrap-server 192.168.140.10:9092
#删除topic
5.测试生产者、消费者
如果您这里报错,请检查刚刚是否已经删除了
topic
[root@zk1 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test1 #生产者
>1
>2
>3
>4
>5
>6
>
[root@zk1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test1 --from-beginning #消费者
1
2
3
4
5
6