【Linux存储系列教程】kafka消息队列(中间件)

一、关于kafka

1.kafka是什么?

kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

2.kafka的作用

  • 消息系统
    • kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、可扩展性、可恢复性等功能。
    • 提供消息顺序性保障、回溯消费功能
  • 存储系统
    • 支持将消息持久化到磁盘
  • 流处理平台
    • 提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作

3.kafka基本结构

kafka01

  • producer生产者
    • 发送消息的一方
    • 生产者负责创建消息,然后将其投递 到kafka
  • consumer消费者
    • 消费者,也就是接收消息的一方。
    • 消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
  • consumer Group(CG)消费者组
    • 消费者组,由多个consumer组成。
    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
    • 消费者组之间互不影响
  • brokerkafka服务器节点或实例
    • 一个kafka集群由多个broker组成
    • 一个broker可以容纳多个topic
  • topic主题
    • Kafka中的消息以主题为单位进行归类,
    • 生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费
  • partition分区
    • 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区Topic-Partition。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志Log文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量offsetoffset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
  • replica副本
    • Kafka为分区引入了多副本Replica机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。

二、kafka的复制机制

Kafka使用ISR的方式进行数据复制,则有效地权衡了数据可靠性和性能之间的关系。

AR、ISR、OSR、HW、LEO

分区中的所有副本统称为ARAssigned Replicas。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISRIn-Sync Replicas,ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。

leader副本同步滞后过多的副本不包括leader副本组成OSROut-of-Sync Replicas,由此可见,AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISROSR集合为

leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本追上leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会不过这个原则也可以通过修改相应的参数配置来改变

ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量offset,消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

三、kafka消息确认机制 ACK

producer发送消息后,leader将消息同步给follower,然后返回ACKproducer,表示消息已收到,此时才可以继续发送下一条消息。

  • kafka提供了以下3种ACK级别:
    • 0leader接收到消息马上返回ack,此时可能还没有写入磁盘,可能丢失数据
    • 1leader将消息写入磁盘后,马上返回ack,此时可能还没同步follower,同样可能丢失数据
    • -1(all)leaderfollower都将数据写入磁盘后,返回ack。但是如果在写入磁盘后,ack尚未发送,此时leader发生故障,会导致数据写入重复

四、kafka的partition分区方式

consumer采用pull方式主动从broker拉取数据,此时会传入timeout参数,如果当前没有数据可消费,consumer会等待一段时间,直到timeout超期才返回
1topic有多个partition1consumer-group有多个consumer,这其中就涉及到partition的分配问题。

  • kafka提供2种分配方式:RangeRoundRobin
    • range
      • 原理是将partition数/consumer数,来决定每个consumer分配几个partition。如果除不尽,则前面几个consumer会多1partition
    • RoundRobin
      • 轮询每个consumer,逐一分配

五、kafka集群部署

在部署kafka之前,需要提前部署zookeeper点我跳转到zookeeper部署教程。

1.部署jdk环境

过程省略,请查看zookeeper部署教程:https://www.wsjj.top/archives/107

2.安装kafka

A.下载kafka

如果提示没有wget命令,请安装yum install -y wget

[root@zk1 ~]# wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz

B.安装kafka

[root@zk1 ~]# tar xf kafka_2.13-3.3.1.tgz -C /usr/local/
[root@zk1 ~]# ls /usr/local/
bin  games    jdk1.8.0_191      lib    libexec  share  zookeeper
etc  include  kafka_2.13-3.3.1  lib64  sbin     src    zookeeper37
[root@zk1 ~]# mv /usr/local/kafka_2.13-3.3.1 /usr/local/kafka33

C.配置环境变量

[root@zk1 ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export KAFKA_HOME=/usr/local/kafka33
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin	#自行修改上一期教程添加的字段

[root@zk1 ~]# source /etc/profile

D.创建kafka日志目录

[root@zk1 ~]# mkdir /usr/local/kafka33/kafka-logs

E.修改kafka配置文件

[root@zk1 ~]# vim /usr/local/kafka33/config/server.properties
#配置文件并不完整,以下仅展示修改的地方。
broker.id=0		#默认保持0即可
listeners=PLAINTEXT://192.168.140.10:9092	#去掉注释,填写本机IP
log.dirs=/usr/local/kafka33/kafka-logs	#修改日志存放路径
zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181	#指定zookeeper地址,我们有三台。

F.把kafka目录拷贝给另外2台机器

[root@zk1 ~]# for i in 11 12; do scp -r /usr/local/kafka33/ root@192.168.140.$i:/usr/local/; done

G.拷贝环境文件给另外2台机器

[root@zk1 ~]# for i in 11 12
> do
> scp -r /etc/profile root@192.168.140.$i:/etc/
> done

H.修改另外2台服务器上的配置文件

[root@zk2 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=1	#修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.11:9092	#修改监听IP为本机
[root@zk3 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=2	#修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.12:9092	#修改监听IP为本机
重新加载环境配置文件
[root@zk2 ~]# source /etc/profile
[root@zk3 ~]# source /etc/profile

I.启动服务

[root@zk1 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk2 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk3 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties

J.查看是否成功运行

[root@zk1 ~]# jps
3170 Jps
2182 QuorumPeerMain
3035 Kafka
[root@zk2 ~]# jps
2770 Kafka
2853 Jps
1966 QuorumPeerMain
[root@zk3 ~]# jps
2632 Kafka
2715 Jps
1838 QuorumPeerMain

3.在zookeeper中查看kafka的注册信息

[root@zk1 ~]# /usr/local/zookeeper37/bin/zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
#可以看到多了很多目录
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.10","version":5,"timestamp":"1683542941484"}
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.11:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.11","version":5,"timestamp":"1683543144766"}
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.12:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.12","version":5,"timestamp":"1683543145833"}

4.测试topic操作

[root@zk1 ~]# kafka-topics.sh --create --topic test1 --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
Created topic test1.
[root@zk1 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092 
test1	#查看当前系统所有topic
[root@zk1 ~]# kafka-topics.sh --describe --topic test1 --bootstrap-server 192.168.140.10:9092
Topic: test1	TopicId: kJ1ilOuQTMOJO8RQasCRrA	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
#查看详细信息
[root@zk1 ~]# kafka-topics.sh --delete --topic test1 --bootstrap-server 192.168.140.10:9092
#删除topic

5.测试生产者、消费者

如果您这里报错,请检查刚刚是否已经删除了topic

[root@zk1 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test1	#生产者
>1
>2
>3
>4
>5
>6
>
[root@zk1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test1 --from-beginning	#消费者
1
2
3
4
5
6