kafka查看消费者组 kafka查看消费者组消费情况

Kafka面试题

RabbitMQ和Kafka的主要区别如下:

Kafka是分布式发布-消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka分区并发: Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。

kafka查看消费者组 kafka查看消费者组消费情况kafka查看消费者组 kafka查看消费者组消费情况


kafka查看消费者组 kafka查看消费者组消费情况


kafka查看消费者组 kafka查看消费者组消费情况


消息发送

解决方案

1.配置ack=all/-1,tries > 1,unclean.leader.election.enable = false

producer发送完消息,等待follower同步完成再返回,如果异常则重试,副本数量可能影响吞吐量

不允许选举ISR的副本作为leader

2.配置min.insync.replicas>1

消费

先commit再处理消息,如果处理消息的时候异常了,但offset已经提交了,这条消息对于消费者来说丢失了

减少刷盘的间隔

kafka如何保证不重复消费又不丢失数据

2.第二条,每次写入数据的时候,要求 Leader 写入成功以外,至少一个 ISR 里的 Follower 也写成功。

pull模式

push模式

缺点:速率固定,忽略了consumer的消费能力,可能导致拒绝服务或者网络阻塞等情况

1. Broker注册 Broker是分布式部署并且相互之间相互,但是需要有一个注册系统能够将整个集群中的Broker管理起来 /brokers/ids

2. Topic注册 在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护 /borkers/topics

3. 生产者负载均衡 由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

4. 消费者负载均衡 与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

5. 分区与消费者 的关系 在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,内容就是该 消息分区 上 消费者的Consumer ID。

6. 消息消费进度Offset 记录 在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门进行记录,其路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

内容就是Offset的值。

7. 消费者注册 消费者在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者启动时,都会到Zookeeper的指定下创建一个属于自己的消费者,例如/consumers/[group_id]/ids/[consumer_id],完成创建后,消费者就会将自己的Topic信息写入该临时。

Kafka不基于内存,而是硬盘存储,因此消息堆积能力更强

1.顺序写磁盘(相比磁盘的随机写快很多)。如果你是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身也是不多的。

2.利用Page Cache(页高速缓冲存储器,简称页高缓)空中接力的方式来实现高效读写,作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是作系统自己管理的缓存。原理就是Page Cache可以把磁盘中的数据缓存到内存中,把对磁盘的访问改为对内存的访问。

3.零拷贝 零拷贝技术是一种避免CPU将数据从一块存储拷贝到另一块存储的技术。Kafka使用零拷贝技术将数据直接从磁盘到网卡设备缓冲区中,而不需要经过应用程序的转发。

通常应用程序将磁盘上的数据传送至网卡需要经过4步:

-CPU会将数据从内核模式到用户模式下的缓冲区;

-调用write(),将数据从用户模式下到内核模式下的Socket缓冲区;

-将数据从内核模式的Socket缓冲区到网卡设备。

上面的步骤中,第2、3步将数据从内核模式经过用户模式再绕回内核模式,浪费了两次过程。采用零拷贝技术,Kafka可以直接请求内核把磁盘中的数据到Socket缓冲区,而不用再经过用户模式

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有consumer如何达成一致,来分配 Topic 的每个分区。

Rebalance 的触发条件有3个

Rebalance 发生时,Group 下所有 consumer 实例都会协调在一起共同参与,kafka 能够保证尽量达到最公平的分配。但是 Rebalance 过程对 consumer group 会造成比较的影响。在 Rebalance 的过程中 consumer group 下的所有消费者实例都会停止工作,等待 Rebalance 过程完成。

GroupCoordinator(协调者):协调消费者组完成消费者Rebalance的重要组件,每一个broker都会启动一个GroupCoodinator,Kafka 按照消费者组的名称将其分配给对应的GroupCoodinator进行管理;每一个GroupCoodinator只负责管理一部分消费者组,而非集群中全部的消费者组。通常是partition的leader的broker

如果C1消费消息超时,出入rebalance,重新分配后该消息被其他消费者消费,此时C1消费完成提交offset,导致错误

ISR :In-Sync Replicas 副本同步队列

AR :Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数两个维度, 当前的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

在 Kafka 中,每个 主题分区下的每条消息都被赋予了一个的 ID 数值,用于标识它在分区中的位置。这个 ID 数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能被修改。

auto.offset.reset:消费规则,默认earliest 。

latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

Kafka 副本当前分为副本和追随者副本。只有Leader副本才能 对外提供读写服务,响应s端的请求。Follower 副本只是采用拉(PULL)的方 式,被动地同步Leader副本中的数据,并且在Leader副本所在的 Broker 宕机后,随时准备应聘 Leader 副本。

kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

分区的 Leader 副本选举对用户是完全透明的,它是由 Controller 完成的。你需要回答的是,在哪些场景下,需要执行分区 Leader 选举。每一种场景对应于一种选举策略。当前,Kafka 有 4 种分区 Leader 选举策略。

集群 partition 备份 Kafka 支持设置针对每个 partition 备份,可以将 partition 备份到不同的 broker 上,其中 leader partition 负责读写,其他 follower 仅负责同步,当 leader 挂掉后会从 follower 中选取新的 leader 。

消息消费顺序 一个 partition 同一时刻在一个 consumer group 中只能有一个 consumer 实例在消费,从而保证了消费顺序。consumer group 中的 consumer 实例的数量不能比一个 topic 中的 partition 的数量多,否则,多出来的 consumer 无法消费到消息。Kafka 的消息在单个 partition 上是可以保证顺序的,但是在整体上无法保证顺序消费

消息消费模式 关于消费模式,Kafka 通过 消费组的概念可以灵活设置。如常见的 队列模式 即 所有的 consumer 在同一个 consumer group 下。发布模式 则设置多个 consumer group 进行消费即可

acks:消息的确认机制,默认值是0。

acks=0:如果设置为0,生产者不会等待kafka的响应。

acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。

ClickHouse物化视图丢数据问题排查

在ClickHouse中,物化视图一般起到数据的预处理(聚合)的作用,可以理解为是对原表的insert动作,定义额外的执行处理逻辑。相当于触发器的功能。

我们目前的部署方式是,1分片1副本,只有两台机器。物化视图及kafka引擎表都是建在其中一台上。

Kafka------->Kafka引擎表----------->物化视图(1)-------->ODS事实表----------->物化视图(2)----------------->统一的事实表

目前出现的问题是:ods部分数据,在统一的事实表中不存在,存在数据丢失。

首先想到的是,物化视图(2)出现问题,查看是否出现异常。

1、查看clickho一般的消费代码是这样的use日志

ERROR日志中未见明显异常。

INFearliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费O日志中相关的日志如下:

看到这日志基本就定位出来问题了。我们topic默认是10个分区,此处kafka消费者信息提示只是消费:5,6,7,8,9这5个分区。推测同分组下还有别的消费者。

2、查看kafka topic的分组消费情况

发现同组下确实是有别的消费者。

两个都部署了kafka引擎表及物化视图(1),但是物化视图(2)只有在一台机器上有部署。导致部分数据丢失,没有直接到统一的事实表中。

rabbitmq和kafka的区别

一旦上述数据写入作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

1、消息协议:RabbitMQ使用AMQP(高级消息队列协议),而Kafka使用其自定义的协议。AMQP是一种标准协议,可以提供更强的互作性,但Kafka的自定义协议可能具有更高的性能。

kafka出现消息重复消费的原因:

2、消息格式:RabbitMQ支持多种消息格式,如JSON、XML等,而Kafka只支持二进制格式。这使得RabbitMQ在处理复杂消息时更为灵活。

3、息持久性:RabbitMQ支持消息的持久化,可以将消息存储在磁盘上,以确保消息不会在崩溃时丢失。而Kafka也支持消息的持久化,但它的设计目标是为了实现高吞吐量,因此可能会牺牲一些持久化性能。

4、消息确认机制:RabbitMQ支持消息的确认机制,可以确保消息已经被消费者接收。而Kafka使用基于消费者的组的确认机制,只有在消费者组中的所有消费者都成功消费消息时,才会确认消息已经消费。

5、可扩展性:Kafka比RabbitMQ更具有可扩展性,可以更容易地添加更多的以扩展消息处理能力。

消息队列系统的功能:

1、消息发送:应用程序可以将消息发送到消息队列中,以便另一个应用程序在需要时读取它。

2、消息接收:应用程序可以从消息队列中接收消息,以便进行处理。

3、消息存储:消息队列系统可以将消息存储在队列中,以便在需要时进行读取或处理。

4、消息传递:消息队列系统可以确保消息在发送和接收之间可靠地传递,并处理任何传输错误或丢失。

5、消息处理:应用程序可以读取消息并处理它,以便进行后续作。

kafka入门:一个开源的、轻量级、高吞吐、高可用的分布式消息系统

3) 当流数据到达时能够被及时处理。

随着信息技术的快速发展及互联网用户规模的急剧增长,计算机所存储的信息量正呈爆炸式增长,目前数据量已进入大规模和超大规模的海量数据时代, 如何高效地存储、分析、处理和挖掘海量数据 已成为技术研究领域的热点和难点问题。而 如何采集和运营管理、分析这些数据 也是大数据处理中一个至关重要的组成环节,这就需要相应的基础设施对其提供支持。针对这个需求,当前业界已有很多开源的消息系统应运而生,kafka就是一款当然非常流行的消息系统。

Kafka是一款开源的、轻量级的、分布式、可分区和具有备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。作为一个流式处理平台,必须具备以下3个关键特性:

1) 能够允许发布和流数据。

2) 存储流数据时提供相应的容错机制。

消息流系统kafka的基本结构包括生产者和消费者,以及kafka集群。

生产者负责生产消息,将消息写入Kafka集群;消费者从Kafka集群中拉取消息。

消息是Kafka通信的基本单位 ,由一个 固定长度的消息头 和一个 可变长度的消息体 构成。

Kafka将 一组消息 抽象归纳为一个主题(Topic),也就是说,一个主题是对消息的一个分类。 生产者将消息指定主题发送到kafka集群,消费者主题或主题的某些分区进行消费。

Kafka将一组消息归纳为一个主题,而 每个主题又被分成一个或多个分区(Partition) 。每个分区由一系列有序、不可变的消息组成,是一个有序队列。 每个分区在物理上对应为一个文件夹 ,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号值为分区的总数减1。

分区使得Kafka在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越broker的刷盘高,但这要根据集群实际环境及业务场景而定。同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础。

疑问和 :分区如何保证消息被顺序消费?每个分区内的消息是有序的,但不同分区间如何保证?猜测是分区从存储空间上比较大,分区个数少。顺序消费的主要因素在分区内的消息,分区间的可以忽略。高吞吐率顺序写磁盘估计也是这个原因。

Kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。 每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是Kafka高吞吐率的一个重要保证 。同时与传统消息系统不同的是,Kafka并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储,因此 Kafka提供两种删除老数据的策略 ,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置。

每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同上,以提高可用性。

从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的 分区数 可以在Kafka启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。

为什么副本要分Leader和Follower? 如果没有Leader副本,就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。

为解决这个问题,Kafka选择分区的一个副本为Leader,该分区其他副本为Follower,只有 Leader副本 才负责处理客户端 读/写请求 ,Follower副本从Leader副本同步数据。

引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。Follower副本从Leader副本同步消息,对于n个副本只需n-1条通路即可,这样就使得系统更加简单而高效。

副本Follower与Leader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。

疑问 :leader副本和follower副本是如何选出来的?通过zookeeper选举的嘛?

Kafka在ZooKeeper中动态维护了一个 ISR(In-sync Replica) ,即保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的id。 如果一个Follower副本宕机或是落后太多 ,则该Follower副本将 从ISR列表中移除 。 本书用宕机 来特指某个失效的情景,包括但不限于被关闭,如被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等。

任何发布到分区的消息会被直接追加到日志文件的尾部(分区目录下以“.log”为文件名后缀的数据文件),而每条 消息 在日志文件中的位置都会对应一个按序递增的 偏移量 。偏移量是一个分区下严格有序的 逻辑值 ,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka并没有提供额外索引机制到存储偏移量。

消费者可以通过控制消息偏移量来对消息进行消费 ,如消费者可以指定消费的起始偏移量。 为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存 。需要说明的是,消费者对消息偏移量的作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中, 而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。 当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。

推测 :一个主题有多个分区,一个分区有多个副本。一个主题(一类消息)有多个分区(消息被分段),一个分区(每段消息)有多个副本(每段消息的副本数)。消息一旦发给kafka,就会分配一个偏移量,在多个副本中的偏移量是一样的。这样的话,消费者通过偏移量消费时对于多个副本就没有异性。

Kafka集群由一个或多个Kafka实例构成,每一个Kafka实例称为(Broker),通常也称为Kafka(Kafka)。在生产环境中Kafka集群一般包括一台或多台,我们可以在一台上配置一个或多个。 每一个都有的标识id,这个id是一个非负整数 。在一个Kafka集群中,每增加一个就需要为这个配置一个与该集群中其他不同的id, id值可以选择任意非负整数即可,只要保证它在整个Kafka集群中,这个id就是的名字,也就是在启动时配置的broker.id对应的值。

生产者(Producer)负责将消息发送给,也就是向Kafka发送消息的客户端。

消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在Kafka中 每一个消费者都属于一个特定消费组 (ConsumerGroup),可以为每个消费者指定一个消费组,以groupId代表消费组名称,通过group.id配置设置。 如果不指定消费组 ,则该消费者属于默认消费组test-consumer-group。

每个消费者有一个全局的id ,通过配置项client.id指定, 如果客户端没有指定消费者的id, Kafka会自动为该消费者生成一个全局的id,格式为${groupId}-${hostName}-${timestamp}-${UUID前8位字符}。 同一个主题的一条消息只能被同一个消费组下某一个消费者消费 ,但不同消费组的消费者可同时消费该消息。 消费组是Kafka用来实现对一个主题消息进行广播和单播的手段 ,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。

推论: kafka消息是按照消息类型(主题),在一个消费者组中只能消费一次。也就是一个消费者组只消费一类型的消息。如果某个服务要消费一类消息,必须将自己置为不同的消费者组。

Kafka利用ZooKeeper保存相应元数据信息, Kafka元数据信息包括如信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。 Kafka在启动或运行过程当中会在ZooKeeper上创建相应 来保存元数据信息, Kafka通过机制在这些注册相应来元数据的变化 ,从而由ZooKeeper负责管理维护Kafka集群,同时通过ZooKeeper我们能够很方便地对Kafka集群进行水平扩展及数据迁移。

Kafka 设计详解之队列

在 上文 中我们介绍了 Kafka 日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。的网络通信,本文打算详细分析 Kafka 的核心 — 队列 的设计和实现,来对 Kafka 进行更深一步的了解。

队列是一种先进先出(FIFO)的数据结构,它是 Kafka 中最重要的部分,负责收集生产者生产的消息,并将这些消息传递给消费者。要实现一个队列有多种方式,Kafka 作为一个消息队列中间件,在设计队列时主要要考虑两个问题:

乍一看到这个问题,我们会想,内存的读取速度远快于磁盘,如果追求性能,内存也充足的话,当然是将生产者产生的消息数据写到内存(比如用一个数组或者链表来存储队列数据),供消费者消费。真的是这样吗?

下面我们依次分析下写内存和写磁盘文件的优缺点,首先,内存的优点是读写速度非常快,但是,如果我们的目标是设计「大数据量」下的「高吞吐量」的消息队列,会有以下几个问题:

接下来我们来分析一下磁盘,写磁盘文件方式存储队列数据的优点就是能规避上述内存的缺点,但其有很的缺点,就是读写速度慢,如果纯依靠磁盘,那消息队列肯定做不到「高吞吐量」这个目标。

分析了内存跟磁盘的优缺点,好像我们还是只能选写内存,但我们忽视了磁盘的两个情况:一是磁盘慢是慢在随机读写,如果是顺序读写,他的速度能达到 600MB/1.必须要求至少一个 Follower 在 ISR 列表里。sec(RAID-5 磁盘阵列),并不慢,如果我们尽可能地将数据的读写设计成顺序的,可以大大提升性能。二是 现代的作系统会(尽可能地)将磁盘里的文件进行缓存 。

有了作系统级别的文件缓存,那用磁盘存储队列数据的方式就变得有优势了。首先,磁盘文件的数据会有文件缓存,所以不必担心随机读写的性能;其次,同样是使用内存,磁盘文件使用的是作系统级别的内存,相比于在 Ja 内存堆中存储队列,它没有 GC 问题,也没有 Ja 对象的额外内存开销,更可以规避应用重启后的内存 load 数据耗时的问题,而且,文件缓存是作系统提供的,因为我们只要简单的写磁盘文件,系统复杂性大大降低。

因此,Kafka 直接使用磁盘来存储消息队列的数据。

刚才我们已经决定用磁盘文件来存储队列数据,那么要如何选择数据结构呢?一般情况下,如果需要查找数据并随机访问,我们会用 B+ 树来存储数据,但其时间复杂度是 O(log N),由于我们设计的是消息队列,我们可以完全顺序的写收到的生产者消息,消费者消费时,只要记录下消费者当前消费的位置,往后消费就可以了,这样可以对文件尽可能的进行顺序读写,同时,时间复杂度是O(1)。其实,这跟我们写日志的方式很像,每条日志顺序 append 到日志文件。

之前我们已经确定采用直接顺序写磁盘文件的方式来存储队列数据,下面我们来剖析下具体的实现细节。

现在我们知道一个队列(Log)是由多个队列段文件(LogSegment)组成的,那么 Kafka 是如何将这些文件逻辑上连接从而组成一条有序队列的呢?在生成每个队列段文件时,Kafka 用该段的初始位移来对其命名,如在新建一个队列时,会初始化个队列段文件,那么其文件名就是0,设每个段的大小是固定值 L,那么第二个段文件名就是 L,第 N 个就是 (N - 1) L。这样,我们就可以根据文件名对段文件进行排序,排序后的顺序就是整个队列的逻辑顺序。

了解了队列的基本实现,下面我们就来分析下队列的核心作—读和写。

写作发生在生产者向队列生产消息时,在上篇文章讲网络通信时我们已经说到,所有的客户端请求会根据协议转到一个 Handler 来具体处理,负责写作的 Handler 叫 ProducerHandler,整个写请求的流程如下:

之前我们说过,如果是顺序写,由于省掉了磁头寻址的时间,磁盘的性能还是很高的,我们看到 Kakfa 队列是以顺序方式写的,所以性能很高。但是,如果一台 Kafka 有很多个队列,而硬盘的磁头是有限的,所以还是得在不同的队列直接来回切换寻址,性能会有所下降。

队列的读作发送在消费者消费队列数据时,由于队列是线性的,只需要记录消费者上次消费到了哪里(offset),接下去消费就好了。那么首先会有一个问题,由谁来记消费者到底消费到哪里了?

一般情况下,我们会想到让服务端来记录各个消费者当前的消费位置,当消费者来拉数据,根据记录的消费位置和队列的当前位置,要么返回新的待消费数据,要么返回空。让服务端记录消费位置,当遇到网络异常时会有一些问题,比如服务端将消息发给消费者后,如果网络异常消费者没有收到消息,那么这条消息就被「跳过」了,当然我们可以借鉴二阶段提交的思想,服务端将消息发送给消费者后,标记状态为「已发送」,等消费者消费成功后,返回一个 ack 给服务端,服务端再将其标记为「成功消费」。不过这样设计还是会有一个问题,如果消费者没有返回 ack 给服务端,此时这条消息可能在已经被消费也可能还没被消费,服务端无从得知,只能根据人为策略跳过(可能会漏消息)或者重发(可能存在重复数据)。另一个问题是,如果有很多消费者,服务端需要记录每条消息的每个消费者的消费状态,这在大数据的场景下,非常消耗性能和内存。

Kafka 将每个消费者的消费状态记录在消费者本身(隔一段时间将消费状态同步到 zookeeper),每次消费者要拉数据,就给服务端传递一个 offset,告诉服务端从队列的哪个位置开始给我数据,以及一个参数 length,告诉服务端最多给我多大的数据(批量顺序读数据,更高性能),这样就能使服务端的设计复杂度大大降低。当然这解决不了一致性的问题,不过消费者可以根据自己程序特点,更灵活地处理事务。

下面就来分析整个读的流程:

分布式系统中不可避免的会遇到一致性问题,主要是两块:生产者与队列服务端之间的一致性问题、消费者与队列服务端之间的一致性问题,下面依次展开。

当生产者向服务端投递消息时,可能会由于网络或者其他问题失败,如果要保证一致性,需要生产者在失败后重试,不过重试又会导致消息重复的问题,一个解决方案是每个消息给一个的 id,通过服务端的主动去重来避免重复消息的问题,不过这一机制目前 Kafka 还未实现。目前 Kafka 提供配置,供用户不同场景下选择允许漏消息(失败后不重试)还是允许重复消息(失败后重试)。

由于在消费者里我们可以自己控制消费位置,就可以更灵活的进行个性化设计。如果我们在拉取到消息后,先增加 offset,然后再进行消息的后续处理,如果在消息还未处理完消费者就挂掉,就会存在消息遗漏的问题;如果我们在拉取到消息后,先进行消息处理,处理成功后再增加 offset,那么如果消息处理一半消费者挂掉,会存在重复消息的问题。要做到完全一致,的办法是将 offset 的存储与消费者放一起,每消费一条数据就将 offset+1。

本文介绍了 Kafka 的队列实现以及其读写过程。Kafka 认为作系统级别的文件缓存比 Ja 的堆内存更省空间和高效,如果生产者消费者之间比较「和谐」的话,大部分的读写作都会落在文件缓存,且在顺序读写的情况下,硬盘的速度并不慢,因此选择直接写磁盘文件的方式存储队列。在队列的读写过程中,Kafka 尽可能地使用顺序读写,并使用零拷贝来优化性能。,Kafka 让消费者自己控制消费位置,提供了更加灵活的数据消费方式。

一文解密Kafka,Kafka源码设计与实现原理剖析,真正的通俗易懂

2.端口: Config / server.properties 文件包含端口 ID,可以查到正在侦听端口 9092,因此直接指定它。

Apache Kafka (简称Kafka )最早是由Linkedln开源出来的分布式消息系统,现在是Apache旗下的一个子项目,并且已经成为开册、领域应用最广泛的消息系统之 Kafka社区也非常活跃,从 版本开始, Kafka 的标语已经从“一个高吞吐量、分布式的消息系统”改为“一个分布式的流平台”

每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

关于Kafka,我打算从入门开始讲起,一直到它的底层实现逻辑个原理以及源码,建议大家花点耐心,从头开始看,相信会对你有所收获。

作为 个流式数据平台,最重要的是要具备下面 个特点

消息系统:

消息系统 也叫作消息队列)主要有两种消息模型:队列和发布订Kafka使用消费组( consumer group )统 上面两种消息模型 Kafka使用队列模型时,它可以将处理 作为平均分配给消费组中的消费者成员

下面我们会从 个角度分析Kafka 的几个基本概念,并尝试解决下面 个问题

消息由生产者发布到 fk 集群后,会被消费者消费 消息的消费模型有两种:推送模型( pu和拉取模型( pull 基于推送模型的消息系统,由消息记录消费者的消费状态 消息在将消息推送到消费者后 标记这条消息为已消费

但这种方式无法很好地保证消息的处理语义 比如,消息把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息已经 这条消息标记为自己消费了,但实际上这条消息并没有被实际处理) 如果要保证消息的处理语义,消息发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要在消息中记录所有消息的消费状态,这种做法也是不可取的

Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式到多个消息上 其中一个会作为主副本( Leader ),其 作为备份副本( Follower ,也叫作从副本)

主副本会负责所有的客户端读写作,备份副本仅仅从主副本同步数据 当主副本 IH 现在故障时,备份副本中的 副本会被选择为新的主副本 因为每个分区的副本中只有主副本接受读写,所以每个服务端都会作为某些分区的主副本,以及另外一些分区的备份副本这样Kafka集群的所有服务端整体上对客户端是负载均衡的

消息系统通常由生产者「pro ucer 消费者( co sumer )和消息( broke 大部分组成,生产者会将消息写入消息,消费者会从消息中读取消息 对于消息而言,生产者和消费者都属于客户端:生产者和消费者会发送客户端请求给服务端,服务端的处理分别是存储消息和获取消息,服务端返回响应结果给客户端

新的生产者应用程序使用 af aP oduce 对象代表 个生产者客户端进程 生产者要发送消息,并不是直接发送给 务端 ,而是先在客户端 消息放入队列 然后 一个 息发送线程从队列中消息,以 盐的方式发送消息给服务端 Kafka的记 集器( Reco dACCUl'lUlato )负责缓存生产者客户端产生的消息,发送线程( Sende )负责读取 集器的批 过网络发送给服务端为了保证客户端 络请求 快速 应, Kafka 用选择器( Selecto 络连接 读写 理,使网络连接( Netwo kCl i.ent )处理客户端 络请求

追加消息到记录收集器时按照分区进行分组,并放到batches中,每个分区的队列都保存了将发送到这个分区对应上的 记录,客户端的发送线程可 只使用 Sende 线程迭 batches的每个分区,获取分区对应的主剧本,取出分区对应的 列中的批记录就可以发送消息了

消息发送线程有两种消息发送方式 按照分区直接发送 按照分区的目标发迭 设有两台, 题有 个分区,那么每台就有 个分区 ,消息发送线程迭代batches的每个分 接往分区的主副本发送消息,总共会有 个请求 所示,我 先按照分区的主副本进行分组, 属于同 个的所有分区放在一起,总共只有两个请求做法可以大大减少网络的开销

消息系统由生产者 存储系统和消费者组成 章分析了生产者发送消息给服务端的过程,本章分析消费者从服务端存储系统读取生产者写入消息的过程 首先我 来了解消费者的 些基础知识

作为分布式的消息系统, Kafka支持多个生产者和多个消费者,生产者可以将消息发布到集群中不同的不同分区上;「肖费者也可以消费集群中多个的多个分区上的消息 写消息时,多个生产者可以 到同 个分区 读消息时,如果多个消费者同时读取 个分区,为了保证将日志文件的不同数据分配给不同的消费者,需要采用加锁 同步等方式,在分区级别的日志文件上做些控制

相反,如果约定“同 个分区只可被 个消费者处理”,就不需要加锁同步了,从而可提升消费者的处理能力 而且这也并不违反消息的处理语义:原先需要多个消费者处理,现在交给一个消费者处理也是可以的 3- 给出了 种最简单的消息系统部署模式,生产者的数据源多种多样,它们都统写人Kafka集群 处理消息时有多个消费者分担任务 ,这些消费者的处理逻辑都相同, 每个消费者处理的分区都不会重复

因为分区要被重新分配,分区的所有者都会发生变 ,所以在还没有重新分配分区之前 所有消费者都要停止已有的拉取钱程 同时,分区分配给消费者都会在ZK中记录所有者信息,所以也要先删ZK上的数据 只有和分区相关的 所有者 拉取线程都释放了,才可以开始分配分区

如果说在重新分配分区前没有释放这些信息,再平衡后就可能造成同 个分区被多个消费者所有的情况 比如分区Pl 原先归消费者 所有,如果没有释放拉取钱程和ZK,再平衡后分区Pl 被分配给消费者 了,这样消费者 和消费者 就共享了分区Pl ,而这显然不符合 fka 中关于“一个分区只能被分配给 个消费者”的限制条件 执行再平衡作的步骤如下

消费者重新加入消费组,在分配到分区的前后,都会对消费者的拉取工作产生影响 消费者发送“加入组请求”之前要停止拉取消息,在收到“加入组响应”中的分区之后要重新开始拉取消息时,为了能够让客户端应用程序感知消费者管理的分区发生变化,在加入组前后,客户端还可以设置自定义的“消费者再平衡”,以便对分区的变化做出合适的处理

三、Kafaka的基本作

在 Kafka 中,用一个文件夹存储一条消息队列,成为一个 Log,每条消息队列由多个文件组成,每个文件称为一个 LogSegment,每当一个 LogSegment 的大小到达阈值,系统就会重新生成一个 LogSegment;当旧的 LogSegment 过期需要清理时(虽然磁盘空间相对于内存会宽裕很多,我们可以保存更长时间的消息数据,比如一周,以供消费者更灵活的使用,但还是需要定期清理太老的数据),系统会根据清理策略删除这些文件。

在启动Kafka之前,需要启动zookeeper,否则会报错!相关的启动指令如下:

在此配置中,只有一个 ZooKeeper 和 id 实例。 配置步骤如下:(注意,以下过程中的topicName表示创建主题的名称,可以自己定义。)

(1)创建Kafka主题

bin/kafka-topics.shacks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicName

创建主题后,会在 Kafka 终端窗口中获取通知,并在 config / server.properties 文件中的“/ tmp / kafka-logs /"中指定创建主题的日志。

(2)启动生产者以发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicName

生产者命令行客户端需要两个主要参数:

1.列表(broker-list): 要发送邮件的列表。 这种情况下,只有一个。

生产者在 config / producer.properties 文件中指定默认生产者属性。

(3)启动消费者以接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning

消费者在config / consumer.properties 文件中指定了默认消费者属性。 打开一个新终端并键入以下消息消息语法。

(4)在生产者终端输入数据测试

生产者将等待消息的输入并发布到 Kafka 集群。 默认情况下,每行数据都作为新消息发布。在生产者终端输入数据,这些数据都会在消费者终端显示。

Kafka总结

throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)

consumer 采用 pull(拉)模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数

据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有

数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 是 AR 中的一个子集。

可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。

处理顺序 :->序列化器->分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费主题的所有分区。

当前消费者需要提交的消费位移是offset+1

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。

Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留一个版本。

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 发生变化时,由负责通知所有broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由负责分区的重新分配。

Kafka 中有多种延时作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。

延时作创建之后会被加入延时作管理器(DelayedOperationPurgatory)来做专门的处理。延时作有可能会超时,每个延时作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在

初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

生产者必须提供的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

每次发送数据给前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该存于__transaction_state内,并将其状态置为BEGIN。

在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetC副本指定必须写作成功的最小副本数量,如果不能满足这个最小值,则生产者引发一个异常(NotEnoughReplicash或者NotEnoughReplicashAfterAppend)ommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所的还是真实的主题。

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所

有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

什么是kafka

Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为开源项目。

消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。

基本工作流程如上图所示,其中:

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

熟悉负载均衡的朋友应该知道,当我们向某个发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 、 1 、 all 。

要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:

图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致 !

kafka使用文件存储消息(append only log),这就直接决定kafka在性能上依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.

除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).

其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式

kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".

异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。

kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者ck的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

最少1次:可能会重传数据,有可能出现数据被重复处理的情况;

最多1次:可能会出现数据丢失情况;

恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".

"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

最少1次+消费者的输出中额外增加已处理消息编号:由于已处理消息编号的存在,不会出现重复处理消息的情况。

kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.

选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.

获取消息时,需要指定offset和ck尺寸,offset用来表示消息的起始位置,ck size用来表示获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取值,得到它在file中的相对位置,直接读取输出即可.

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

Consumer id Registry: 每个consumer都有一个的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中的offset.此znode为持久,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此表达了"一个partition"只能被group下一个consumer消费,同时当group下某个c为什么要使用 kafka,为什么要使用消息队列onsumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

当consumer启动时,所触发的作:

A) 首先进行"Consumer id Registry";

B) 然后在"Consumer id Registry"册一个watch用来当前group中其他consumer的"lee"和"join";只要此znode path下列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"下,注册一个watch用来broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

总结:

Ka如果是协调者发生故障,服务端会有自己的故障容错机制,选出管理消费组所有消费者的新协调者节,点消费者客户端没有权利做这个工作,它能做的只是等待一段时间,查询服务端是否已经选出了新的协调如果消费者查到现在已经有管理协调者的协调,就会连接这个新协调节,哉由于这个协调是服务端新选出来的,所以每个消费者都应该重新连接协调fka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。

Kafka动态维护了一个同步状态的副本的(a set of in-sync replicas),简称ISR,在这个中的都是和leader保持高度一致的,任何一条消息必须被这个中的每个读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个中的任何一个随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个,就可以允许在f个down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。

一个邪恶的想法:如果所有都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个是存活的,一旦所有都down了,这个就不能保证了。

实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

这是一个在可用性和连续性之间的权衡。如果等待ISR中的恢复,一旦ISR中的起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的恢复,这个的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。

这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的上而不是集中在某些上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.

优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个作为“controller”,当发现有down掉的时候它负责在游泳分区的所有中选择新的leader,这使得Kafka可以批量的高效的管理所有分区的主从关系。如果controller down掉了,活着的中的一个会备切换为新的controller.

对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

许多分布式的消息系统自动的处理失败的请求,它们对一个是否着(alive)”有着清晰的定义。Kafka判断一个是否活着有两个条件:

符合以上条件的准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会所有“同步中”的,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。

只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。

Kafka保证只要有一个“同步中”的,“committed”的消息就不会丢失。

消息中间件之Kafka

Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka 上。

Consumer: 消费者,接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。

Consumer Group: 消费者,一个消费者组可以包含一个或者多个消费者。使用多分区 + 多消费者的方式,可以极大提高下游系统处理速度。同一消费者组中的消费者不会重复消费消息,不同的消费者组之间不会互相影响,都能收到全部消息。kafka就是通过消费组来实现P2P模式和广播模式的。

Broker: Kafka 。

Topic: Kafka中的消息维度,一个Topic类似一个queue。生产者将消息发送到特定的Topic,消费者通过Topic进行消费。

Partition: 分区,分区是属于Topic逻辑概念下的一个分区,每个分区只属于一个Topic,一个Topic通常有多个分区,每个分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件,消息在被追加到分区日志文件时,会分配一个特定的便宜了(offset)。

Offset: 分区中的消息的标识,用它来保证消息在分区内的顺序性,offset不跨分区,也就是说,Kafka保证消息在分区内的有序性,不保证消息在Topic下的有序性

Replication: 副本,是Kafka保证数据高可用的方式。同一Partition的数据可以在多个Broker(kafka)上存在多个副本,通常只有主副本提供读写服务,当主副本发生故障,Kafka会在Controller的管理下,选择新的副本作为主副本提供读写服务

Follower: 从副本,相对于主副本,从副本只同步主副本数据,不提供读写服务。

Record: 写入kafka中的消息,每个消息包含了key、value和timestamp。

生产者-消费者是一种设计模式,是在生产者和消费者之间添加一个中间件来达到解耦的目的。

Zookeeper是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服务、同步服务和命C1 消费分区 4, 5, 6名注册等能力。任何分布式服务都需要一种协调任务的方法,Kafka使用Zookeeper来进行任务协调,也有一些其他技术具有自己的内置任务协调机制。

Kafka将Broker、Topic和Partitin的元数据存储在Zookeeper上。

Kafka使用Zookeeper完成以下功能:

Controller是从Broker中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生变化,由Controller负责为该分区选举新的 leader 副本。当某个分区的同步副本发生变化时,由Controller负责通知所有Broker更新元数据信息。

Controller的选举依赖Zookeeper-调用read(),将数据从磁盘到内核模式的缓冲区;,成功竞选为的Broker会在Zookeeper中创建一个/controller临时。

选举过程: Broker首先尝试读取/controller中的brokerid值,如果brokerid值不为-1,表示已经存在Broker当选Controller,否则尝试创建/controller,创建成功后将当前brokerid写入/controller,作为 activeControllerId

主要职责: controller选举出来作为整个Broker集群的管理者,管理所有集群信息和元数据。

Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。

顺序写:

零拷贝:

PageCache: producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少

网络模型: Kafka基于NIO,采用Reactor线程模型,实现了自己的RPC通信。 一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。

批量与压缩: Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

文件结构:

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的作就不需要作磁盘 IO。

Kafka 充分利用二分法来查找对应 offset 的消息位置

和其他消息队列相比,Kafka的优势在哪里?

队列模型了解吗?Kafka 的消息模型知道吗?

Kafka 如何保证消息不重复消费?

解决方案:

参考1: Kafka性能篇:为何Kafka这么"快"?

参考2: Kafka原理篇:图解kakfa架构原理

版权声明:图片、内容均来源于互联网 如有侵权联系836084111@qq.com 删除