消息队列-④高可用篇
消息队列-④高可用篇
学习核心
消息队列-高可用篇
- 基于Kafka框架作为学习沉淀
- Kafka 高可用
- 多副本机制(*)
- 多副本机制核心
- 多副本下的写入机制
- 副本同步机制和优化(理解)
- 同步机制流程
- 潜在问题分析和同步机制优化
- 多副本机制(*)
- Kafka 高可用常见题型
- Kafka 高可用
学习资料
多副本机制(*)
1.多副本机制核心
Kafka受欢迎有一个很大的原因是它天然提供了容灾解决方案,可以应对机器故障等各种异常,这些异常很多时候是无法预防的,比如机房断电,机器硬盘损坏,甚至之前出现过的天津机房爆炸事故。
Kafka是通过副本机制来实现容灾,这种机制下即使发生了一定的异常,也可以保证系统正常运作和数据准确性。从扩展层面来说,有了多副本,就有如下优势:
- 高可用性:如果 Leader 副本所在的 Broker 宕机,Kafka 会自动从其它副本中选取一个新的 Leader,确保服务的持续性
- 容灾:多备份情况下数据丢失风险变小,即使部分副本数据丢失,只要有一个副本是完整的,数据就不会丢失
- 读性能提升:默认情况下,虽然数据的多个副本可以分布在不同的 broker 上,但是Kafka都是Leader提供读写,如果特定业务需要,也可以让消费者从Follower上读数据,增加读的并发度
Kafka 副本核心概念
Replica
:Replica是指Kafka集群中的一个副本,它可以是Leader副本或者Follower副本的一种。每个分区都有多个副本,其中一个是Leader副本,其余的是Follower副本。每个副本都保存了分区的完整数据,以保证数据的可靠性和高可用性
Leader
:Leader是指Kafka集群中的一个分区副本,它负责处理该分区的所有读写请求。Leader副本是唯一可以自主向分区写入数据的副本,它将写入的数据都会同步到所有的Follower副本中,以保证数据的可靠性和一致性
Follower
:Follower是指Kafka集群中的一个分区副本,Follower副本不能直接向分区写入数据,它只能从Leader副本中复制数据,并将数据同步到本地的副本中,以保证数据的可靠性和一致性。在Leader副本挂掉的时候,Follower副本有机会被选举为新的leader副本从而保证分区的可用性
副本机制的引入意味着每次数据写入时,数据不只是写入一台Broker,最终还会同步到其它Broker作为副本备份
如何创建多副本(针对用户创建的主题)
副本个数是在主题创建时指定:
- 如果是开发或者测试环境,可以只指定副本数量为1,此时意味着它没有备份
- 如果是生产环境,一般而言都是设置3个副本或者更多,这样可以实现容灾切换
创建topic,指定多副本
构建集群环境(3个broker),创建一个副本个数设置为2,有2个分区 的Topic,命令如下
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic tp-xxx --partitions 2 --replication-factor 2
每个分区配备了两个副本,数据不管写入指定的Broker,还会按照一定顺序相应同步到其他Broker。例如此处TopicA-Partition0 不仅被写入Broker10,相应地会同步一个副本到Broker11,以此类推
分区的副本数必须小于等于Broker数量,例如Borker只有一个,却想为分片创建2个副本,就会报错(例如单机版本下为分区创建副本)
# 进入容器
docker exec -it kafka-test /bin/bash
# 进入Kafka bin目录,输入相关指令进行测试
cd /opt/kafka/bin
# 为分片创建多副本
./kafka-topics.sh --create --bootstrap-server localhost:9092 --topic tp-xxx --partitions 2 --replication-factor 2
-- output(命令执行异常)
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2024-08-22 07:40:28,134] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
为Kafka内部主题配置多副本
上述案例中,在创建Topic时就可以指定多副本,但是Kafka还有一些非用户创建的主题,也就是内部主题这种主题不是由用户创建的,该如何指定多副本呢?
此处关注一个核心的内部主题:_consumer_offsets
(消费位移主题)
- 用于存储消费者的偏移量(offsets),以便消费者能够在重新启动或故障恢复后继续从上次的位置继续消费消息
- 每个消费者组都会在这个主题中有相应的记录,以存储其消费的偏移量
通过设置配置参数进行调整:如果是基于Docker方式安装的Kafka,对应配置文件在/opt/kafka/config/server.properties
该主题的副本数可以通过offsets.topic.replication.factor
参数配置,这个配置默认是1,也就是说如果不主动配置的话,这个消费偏移主题就只有1个副本,那么如果这个副本坏了,将丢失消费者的偏移提交数据,消费者重启后就无法从上次的位置消费消息。
因此为了更高可靠性,如果生产环节有多个broker,这个参数offsets.topic.replication.factor
需要大于1(例如常见的就是配置为3)
2.多副本下的写入机制
副本写入机制
Kafka的做法是选择其中一个副本作为Leader,Leader就相当于这个副本集合对外的代表,剩余的副本就是数据的备份。对于生产者而言,它只用和Leader打交道,Leader再和其它副本进行数据同步。
如何定义写入成功?
写入机制中涉及到一个"写入配置"(由生产侧的acks配置决定),数据是否写入成功取决于写入策略的配置(不同写入策略的答案是不一样的)
acks | 行为 | 可靠 | 性能 |
---|---|---|---|
0 | 生产者在发送消息后不会等待来自服务器的确认,所以生产者实际是不知道消息是否成功,也就无从去重试,生产可靠性是最低的 | 不可靠 | 最高 |
1 | 生产者会在消息发送后等待主节点的确认,但不会等待所有副本的确认 | 相对可靠 | 较高 |
all/-1 | 只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但延迟显然是最高的 | 可靠 | 稍低 |
// 配置生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置acks参数
props.put(ProducerConfig.ACKS_CONFIG,"all");// 设置为"0","1"或 "all"
// 创建Kafka生产者
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
基于上述分析,选择写入策略为all,可以保证进入kafka的存储数据基本不会丢失,一个副本挂掉之后也会有其他副本顶上,这便是kafka的可靠性所在。
但同样地,有得必有失,选择all的可靠性虽然会很高,但任何一个副本对应的机器,只要挂掉,就无法写入了,举个极端点的例子,一个Kafka集群有1024台机器,如果1台出问题了,整个集群的写入就不可用了,这会不会太严苛了?(因为all要求所有副本都成功写入,生产者才会受到确认,一旦集群的某个节点挂掉,这种模式下的写入操作就出现异常了)
实际上,这里的**“所有副本”是有条件的,所有是指跟上节奏、在ISR集合里的副本**,跟不上节奏的副本就会被剔除在外,这样在保证可靠性的同时,也有一定的容错性
ISR概念
一个分区会有多个副本,为了实现更好的管理,可以将Kafka中的副本划分成不同的集合:
(1)AR (Assigned Replicas):AR是指分区的所有副本(包括Leader副本和Follower副本),即整体的集合
(2)ISR (in-Sync Replicas):ISR是指与Leader副本保持同步的副本集合。ISR中的副本与Leader副本保持同步,即它们已经复制了Leader副本中的所有数据,或者与Leader副本之间的数据差异不超过一定的阈值(Follower副本能够落后Leader副本的最长时间间隔)。并且ISR副本集合是动态变化的,不是一成不变的。
(3)OSR (Out-of-Sync Replicas):OSR是指与Leader副本不同步的副本集合。OSR中的副本与Leader副本之间的数据差异超过了一定的阈值,或者它们还没有复制Leader副本中的所有数据。除非开启了Unclean选举,否则OSR中的副本不能被选举为新的Leader副本。简单来说,OSR集合=AR集合-ISR集合。
ISR其实就是跟上节奏的副本,也可以直接看作生效的副本。这个生效状态是随时变化的:
- 每个Partition都会由Leader 动态维护一个与自己基本保持同步的ISR列表
- 所谓动态维护,就是说如果一个Follower比一个Leader落后超过了给定阈值(默认是10s),则Leader会将其从ISR中移除。如果OSR列表内的Folower副本重新追上了Leader副本的进度,那么就将其添加到ISR列表当中
此处的“阈值”是指时间维度的落后(可能一般场景下会认为是落后多少条数据,但此处是基于时间维度概念),它的机制分析如下:
- (1)同步成功时,也就是通过拉取完全跟上Leader的LEO时,会记录本次更新时间为lastCaughtUpTimeMs
- (2)会定时检查是否是否跟上节奏
- 县体规则:确认
当前时间减去lastCaughtUpTimeMs
有没有超过设定的國值(默认是10s),如果超过则认为落后于ISR副本了
- 县体规则:确认
Leader 挂掉怎么办?
=》当Leader挂掉,Kafka就会从ISR列表中选择第一个副本升级为Leader,以保证分区的正常运行。除非开启了Unclean选举,否则OSR中的副本不能被选举为新的Leader副本
副本同步机制(了解)
有了副本之后,就可以将一份数据,备份到不同机器上,即使部分机器出现问题,数据也能找回来,具备可靠性和容灾性。但这里还有两个问题,需要去思考:
- (1)数据是如何从Leader同步到Follower,是推还是拉?
- (2)多副本下哪些数据对外可见?
1.数据怎么同步?("拉"、"HW")
Kafka中,数据是先写入Kafka Leader,然后由follower主动拉取Leader的数据进行同步
怎么拉?=》如果每次拉所有数据的话成本就太高了,因此是根据某个偏移来拉,此处引入LEO的概念
LEO是指下一条要写入的位置,根据LEO,Leader就知道某个Follower数据同步到哪里。根据所有ISR副本的LEO,实际就能知道目前数据的同步情况,在所有ISR副本中都同步的数据,才算是真正落地的数据,是不是描述起来比较绕?所以Kafka抽象了一个叫HW的概念来表示这些真正落地的数据。
对外展示:什么是HW?
HW全称是High Watermark,相当于是一条高水位线
在高水位线之下的消息,都已经被所有的ISR副本复制,属于已经落地的消息,这些确认的数据,才是可以对外展示的,也就是说这些高水位之下的消息才可以被消费者拉取到
任何等于或高于高水位线的消息,都可能还没有被所有的ISR副本复制,因此属于未落地消息,不会被消费者读取到
结合图示分析:
- (1)HW目前是在3号位置
- (2)在3号位置之前的0,1,2位置,是对外可见的,属于已落地消息
- (3)在3号位置及以后的3,4号位置,对外是不可见的,也就是无法拉取到
- (4)最后一条数据是在4号位置,下一条数据应该写入到5号位置,所以5号位置就是LEO所在
2.数据同步流程(拉模式)
结合上述分析,数据同步最后就体现在HW上,那么以更新HW为核心目的,重新梳理一遍数据同步过程。
举一个相对直观的例子,来说明 Kafka 副本同步的全流程,初始前置条件如下:
(1)有一个主题TopicA
,为了简单设定只有1个分片Partition 0
(2)主题A有两个副本,副本1(Leader副本)和副本2(Follower副本)
(3)副本1在Broker11上,副本2在Broker12上
(3)副本1和副本2的HW和LEO都处于10这个位置
具体如下图,消息同步分析过程说明如下:(此处副本简称为Leader、Follower)
(1)Follower 尝试主动拉取同步,发现此时数据是最新的,则无任何操作
(2)当有新数据写入Leader(Leader的LEO更新为11),Follower 再次尝试拉取,拉取成功,同步更新LEO(Follower的LEO更新为11)
(3)Follower 再次尝试拉取,Leader 感知到自己和副本的LEO相比之前更高了(比HW高),进而引起Leader副本的HW发生变化,因此更新HW(Leader的HW更新为11)并反馈,随后Follower接收反馈相应更新HW(Follower的HW更新为11)
(4)最终数据同步到最新状态(可以看到,其又回到初始态的概念状态,类似地多条消息发送也是基于此流程分析)
结合上述流程分析可知,消息同步都是由Follower主动同步拉取触发的,这些操作都是在Follower“拉”的时候做出的响应,基于此概念去理解相应的同步流程:
- Follower 拉数据,发现没有,不做任何操作
- Follower 拉数据,发现有新数据写入,拉取数据并同步更新LEO
- Follower 拉数据,Leader 感知 LEO 变高(对比现有的HW)则更新HW并反馈,Follower 接收反馈同步更新HW
- Follower 拉数据,发现已是最新,不做任何操作
“拉取模式”是Kafka很老版本(在0.11版本之前)的做法,整体思路和方式大体ok,但是在某些场景下会存在数据丢失的风险。结合上图所示的【步骤3】会发现,Leader、Follower的HW更新是先后发生的,此处存在时间差,因此存在数据丢失的风险
3.同步机制优化
问题推演
结合上述”拉模式“的同步流程分析可知,由于Leader、Follower的HW更新是先后发生的,由于时间差攻击进而可能存在数据丢失的风险。结合图示推演可能存在的场景问题,分析说明如下:
- 【1】Broker11 更新HW并反馈
- 【2】Broker12 还没来得及同步更新HW就发生宕机,重启恢复后导致丢失了当前位置以及该位置之后的所有消息(例如图示的消息B)
- 正常场景分析:如果Broker12 再次拉取,是可以从Broker11中重新把消息拉回来,然后继续正常操作
- 【3】异常场景发生:Broker12 拉取时发现Broker11(Leader)发生宕机,Broker12竞争成为新的Leader
- 【4】Broker11 重启恢复后发现自己成为了Follower,于是主动拉取新Leader(Broker12)的消息,此时发现对比HW位置不对劲(就会将HW调到和Leader一样的位置),于是彻底丢失了消息B
从结果分析来看,虽然Broker11和Broker12最终虽然达到数据一致的状态,但实际上却丢失了数据。针对这类问题,Kafka提出了Leader Epoch
方案,做了相应的升级优化
可以简单理解为:传统的拉模式场景中由于“Leader、Follower的HW先后更新存在时间差”因此可能导致“消息丢失问题”,因此引入Epoch机制
,让“Follower宕机时重启恢复”的日志截断机制不依赖于HW(所以Leader、Follower的HW先后更新的时间差问题并不影响原有的消息同步),而是依赖于Leader的<Epoch,Offset>
(<时代版本,对应时代版本首条消息的偏移量>
)
Epoch 记录的是“时代”版本号信息,可以理解为当出现Leader选举时版本就会增加(用于记录Leader更新换代的版本号),Offset则是记录对应时代的首条消息的记录的偏移量
优化思路
从上述场景问题分析可知,其实核心问题在于【步骤2】Broker12宕机重启恢复后HW截断了自己当前位置以及之后的消息,进而导致消息丢失。而后面的步骤中Broker11宕机后重启恢复成为Follower也就无法拉取到”丢失的消息B“。因此可以考虑切入点在【步骤2】中,如何不让HW从当前位置开始截断?
Leader Epoch 概念
Leader Epoch 可以认为是leader的版本,它由两部分数据组成<Epoch,Offset>
:
- Epoch(纪元):一个单调增加的版本号(当触发Leader选举则Epoch就会新增1,以表示一个新的纪元),小版本号的leader被认定是过期leader
- Offset(起始位移):Leader 副本在该Epoch值上写入的首条消息的位移
举例说明:假设有两个Leader Epoch<0,0>
、 <1,120>
:表示版本号是0,这个版本的 Leader 从位移0开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leade副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况
概念说明
(1)引入任期概念
(2)记录任期内第一个偏移位置
(3)每个副本有每届任期信息
(4)恢复为Leader,则不用截断
(5)恢复回Follower时根据Leader返回的任期第一个偏移位置截断,因为既然它是Leader了,肯定它是处于最佳状态,信任它即可,而不根据自己的HW截断
结合上述场景问题分析,此处对优化流程的场景进行说明:核心关注【步骤2】,它不会直接截断当前位置以及之后的消息,而是先向Leader确认偏移来决定截断的位置
- 【1】Broker11 更新HW并反馈
- 【2】Broker12 还没来得及同步更新HW就发生宕机,Broker12 重启恢复为Follower,此时它就会向Leader确认偏移,进而消息B得到保留
- 【3】异常场景发生:Broker12 拉取时发现Broker11(Leader)发生宕机,Broker12竞争成为新的Leader
- 【4】Broker11 重启恢复后发现自己成为了Follower,于是主动拉取时也去确认新Leader(Broker12)的偏移(根据Leader返回的任期第一个偏移位置截断),Broker11最新的Epoch是0,Leader 在相应位置的数据信息也是Epoch为0时写入的,因此不用截断
PS:图示中的 0=>0 、1=>2 指的是Epoch向量,记录任期变化:(当前Epoch)=>(当前Epoch起始位置(当前Epoch首条消息的位移))
基于上述流程分析,可以看到截断操作并不考虑HW,而是通过Epoch截断,解决了数据丢失问题。可以结合下图所示,忽略HW的变化,关注Epoch截断的流程
todo:Epoch流程还是相对比较绕,面试时候说实话一般也不会问太细,大体上理解这个流程,能简单说明白Epoch是表示时代,记录了时代起始偏移,通过时代起始偏移来判断是否需要截断,这样截断逻辑就可以和HW解耦了,解决了一些同步过程中的问题。