kakfa-必知必会

概述

分布式流处理平台,它提供了一种高吞吐量、可持久化的消息传递系统.kafka本身没有管理配置和注册功能,所以kafka集群强依赖zookeeper,用于存储meta数据以及选举.

Kafka 的设计目标是提供高吞吐量分布式处理能力,而不是保证消息的顺序,很多时候消息的顺序问题要在消费者上进行处理.

kafka增加和减少服务器都会在Zookeeper节点上触发相应2的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端(consumer)也会捕获这些事件来进行新一轮的处理.

概念

producer/consumer

生产者和消费者,严格来说不属于kafka这个软件的一部分,但是kafka是一个消息队列系统,所以一定要有生产者和消费者的组成.

  • 生产者是向kafka 的topic push 消息,每条消息都被追加(append)到partition中的客户端应用程序,消息的追加属于顺序写磁盘(比随机写磁盘效率高,保证吞吐率).它可以指定topic或partition发送消息.
  • 消费者是从kafka topic中读取消息的客户端应用程序,它可以通过订阅 Topic 来指定它们感兴趣的消息来源。当消费者订阅一个 Topic 后,它可以选择从特定的 Partition 中读取消息.

Topic

可以理解为一个队列,一个topic里可以有多个partition

broker

一个kafka节点就是一个broker,replication也是以broker为单位存储,

Partition

  • partition是一个topic的逻辑片段或子分区

  • 每个topic可以被分为多个Partition,每个partition是一个有序且持久化的消息队列

  • 消息被发布到topic时,Kafka 会根据一定的策略将不同的消息均匀地分配到不同的 Partition 中

    每个消息都有一个键key,kafka使用这个key来决定将消息发送到哪个partition.key可以为空,如果为空就是用默认的分区策略: 轮询

    对于有键的消息,Kafka 使用的分区策略是根据消息键进行哈希计算,将具有相同键的消息分配到同一个 Partition 中,以保证具有相同键的消息被顺序地写入同一个 Partition

  • partition让kafka具备了水平扩展并行处理的能力.

    • 水平扩展

      将数据分散到多个partition中,并将这些partition分布在多个broker上.但数据量增加时,就可以简单地增加partition和broker的数量,提高整个系统的吞吐和容量.

    • 并行处理

      每个partition都是独立的消息队列,消费者可以并行地读取不同partition中的消息.这样多个消费者可以同时处理不同partition中的消息,实现并行处理.

      • 不同partition之间的消息是无序的,假如你的应用程序对消息的顺序有严格的要求,应该在消费者端对消息进行处理,比如消息中添加时间戳,消费者根据消息的时间戳对消息进行排序处理.
  • 消费者可以独立地从不同的partition中读取消息,实现高吞吐高并发.

    kafka写入同一个partition中的消息的有序的,而consumer对单一partition的读取自然也是有序的.

    但是如果有多个consumer并行地消费多个partition中时候,消息的顺序就无法保证.这是因为kafka只保证了partition内的顺序性而不保证partition之间的顺序性.

replication

kafka的副本机制提供了高可用容错性.每个partition可以有多个副本,其中一个选为leader,其余为follower.leader和follower都对应一个broker.

  • leader: 负责处理读写请求
  • follwer: 负责备份和故障转移.

offet

偏移量是一个与每条消息相关联的唯一标识符,用于表示消息在特定的partition中的位置.

消费者组

由多个消费者组成,共同消费一个或多个Topic中的消息.主要目的是实现负载均衡容错性

消费者组与offset

在消费者组中,Kafka 会自动管理消费者的偏移量。当消费者启动或重新启动时,它会向 Kafka 提交自己的偏移量,并从该偏移量处开始消费消息。Kafka 会跟踪每个消费者组在每个 Partition 上的偏移量,并确保每个消费者在消费时都从正确的位置开始。需要注意的是,消费者组中的每个消费者必须有一个唯一的消费者 ID,以便 Kafka 能够跟踪每个消费者的偏移量

配置文件

/path-to-kafka/config/server.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口,默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

log.dirs

Kafka 使用日志文件来持久化存储消息

每个 Topic 的每个 Partition 都有一个对应的日志文件,用于存储该 Partition 中的消息。

当消息被写入到 Kafka 中时,它们首先被追加到当前的日志文件中,当日志文件达到一定大小或时间限制时,会被切分为较小的日志段(log segments)。

要注意,这个路径存放的是持久化数据,也就是系统中真实的消息,实际生产中对该目录的处理应该小心,不能随意删除并做好备份.

log.dirs 参数可以配置一个或多个目录路径,用逗号分隔。Kafka 将按照指定的顺序在这些目录中创建日志文件和日志段。如果其中一个目录不可用(例如,磁盘故障),Kafka 会自动切换到下一个可用的目录进行日志的写入和读取。这样可以提高 Kafka 的可用性和容错性。

系统日志

默认位置: /path-to-kafka/logs

有别于log.dirs,前者保存kafka的消息数据,系统日志指kafka自身的运行日志,主要有:

  • server.log: kafka的运行日志
  • state-change.log: 用于记录 Kafka 服务器的状态变化信息。它包含了 Kafka 服务器的启动、关闭、分区分配、副本分配等重要事件的记录
  • controller.log: kafka>=0.8

系统角色与选举

kafka在0.8版本之前依赖zookeeper来选举.

  • Controller: kafka>=0.8,kafka引入了自己的选举机制,不再依赖zookeeper.负责管理整个集群的状态和元数据,包括leader选举,分区分配,副本管理,topic管理等人物,每个kafka集群只有一个controller节点,由kafka集群内部的选举机制自动选举产生.

  • Leader: 负责接收生产者push的消息以及消费者的pull请求.一个partition只能有一个leader.

    Controller 的选举由 Kafka 集群内部的选举机制自动完成,而 Leader 的选举是在分区级别进行的,由 Controller 负责管理。

    手动指定controller节点:

    设置 controller.quorum.voters 参数,将其值设置为你希望成为 Controller 的 Broker ID 列表。然后重启 Kafka 集群,选举机制将会在这些 Broker 中选出一个作为 Controller。

  • follower: 处于被动复制状态,作为leader的副本.同样是partition级别的角色.

常用命令

下面所说的新版旧版,新版:>=0.9;旧版:<=0.8

1、查看topic列表
1
/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
2、查看指定topic信息
1
/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe
3、修改topic分区
1
./kafka-topics.sh --zookeeper localhost:2181 -alter --partitions 4 --topic test
4、删除topic
1
./kafka-topics.sh --zookeeper localhost:2181  --topic test  --delete
5、创建topic
1
./kafka-topics.sh --zookeeper localhost:2181  --topic test --partitions 3 --replication-factor 1 --create
6、模拟客户端接收kafka消息
1
2
3
# --max-messages 10 是指定消费10条数据
# --from-beginning 指定从最开始消费(最老的数据),不配置,默认消费最新
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic MUC_ORG --max-messages 10
7、模拟客户端发送消息

直接用localhost可能会有报错,如果发生报错,可以将localhost改为IP地址

1
./kafka-console-producer.sh --broker-list localhost:9092 --topic MUC_ORG
8、查看分组列表
1
2
3
4
# 新版
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
# 旧版
./kafka-consumer-groups.sh --zookeeper 127.0.0.1:9092 --list
9、查看分组消费情况
1
2
3
4
# 新版0.9以上
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group testgroup --describe
# 旧版0.8以下
./kafka-consumer-groups.sh --zookeeper localhost:2181 --group testgroup --describe
10、topic消费offset
1
./kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group dev  --topic MUC_ORG

kakfa-必知必会
http://example.com/2024/02/03/kafka-must-know/
作者
Peter Pan
发布于
2024年2月3日
许可协议