rocketmq 启动_rocketmq启动顺序
Rocketmq的k8s配置(1nameserv + 1brocker)
RockerMQ在k8s的部署有两种方式, 一种是使用operator 在k8s集群中部署,可参考 operation项目 ; 一种是编写简单的k8s配置文件,在rocketmq的docker项目中有提供模板。
rocketmq 启动_rocketmq启动顺序
rocketmq 启动_rocketmq启动顺序
这里我们希望使用单机版k8s部署一套低配置rockerMQ, 仅启动一个nameserv和1个broker,我们将使用 rocketmq-docker项目 提供的模板来完成。
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq
spec:
replicas: 1
selector:
matchLabels:
template:
metadata:
labels:
spec:
containers:
- name: b比如我个线程消费了offset为0的消息,那么offsetTable中的offset更新为1roker
command: ["sh","mqbroker", "-n","localhost:9876"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 10909
- containerPort: 101
env:
- name: JAVA_OPT
value: -server -XX:ParallelGCThreads=1
name: brokeroptlogs
- mountPath: /home/rocketmq/store
name: brokeroptstor在多线程并发消费的场景下e
- name: namesrv
command: ["sh","mqnamesrv"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9876
name: namesrvoptlogs
volumes:
- name: brokeroptlogs
emptyDir: {}
- name: brokeroptstore
emptyDir: {}
- name: namesrvoptlogs
emptyDir: {}
emptyDir: {}
apiVersion: v1
kind: Serv
metadata:
name: rocketmqserv
spec:
type: NodePort
ports:
- name: namesrv
targetPort: 9876
nodePort: 32000
selector:
notes: 签名异常问题
Caused by: org.apache.rocketmq.aclmon.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not ailable
手动方案I,在tool.sh 中${JAVA_HOME}/jre/lib/ext后加上ext文件夹的路径(jdk路径)
最终方案: 手动的方式,很不方便,经过检查,实际问题是由于路径上的${JAVA_HOME}变量为空,导致无法找到etx路径。所以,我们通过k8s的方式传入JAVA_HOME环境便令就可以了。如下图:
3.2 添加组
组 可以用来实现消费的loadbalance,同一组的消费者分享所有的读队列。
创建组使用updateSubGroup 命令,所需参数如下:
执行命令新建一个授权服务的消费组
./mqadmin updateSubGroup -b localhost:101 -n localhost:9876 -g GID_authorize
执行结果:
RocketMQ的事务消息
image: apaccketmq/rocketmq:4.6.0RocketMQ的事务消息,是指发送消息和其他需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个作同时成功或者同时失败。
我的集群部署在一台机器上,所以我改变了tcp端口并启动成功,如果你在不同机器上进行主从部署并且启动失败,你应该访问rocketmq错误日志获取更多信息。RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的作,根据作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下:
1)发送方向RocketMQ发送“待确认”消息。
2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时阶段消息发送完成。
4)发送方根据本地执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将阶段消息标记为可投递,方将能够收到该消息;收到Rollback状态则删除阶段的消息,方接收不到该消息。
5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,在经过固定时间段后将对“待确认”消息发起回查请求。
6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地执行结果返回Commit或Roolback状态。
7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。
但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改阶段消息的状态,这样会造成磁盘Catch的页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。
客户端有三个类来支持用户实现事务消息,
个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回LocalTransactionState.ROLLBACK_MESSAGE或者
LocalTransactionState.COMMIT_MESSAGE状态。
第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。
第三个类是TransactionCheckListener,实现步骤5)中MQ的回查请求,返回LocalTransactionState.ROLLBACK_MESSAGE或者LocalTransactionState.COMMIT_MESSAGE
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit作生成消息索引,消息对消费者可见)。
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”。
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态。
(3) 根据本地事务状态,重新Commit或者Rollback。
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通 过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法—
在执行二阶段Commit作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
TxConsumer类实现
rocketmq 101端口的ip怎么修改
- mountPatConsumeMessageOrderlyServ#processConsumeResulth: /home/rocketmq/logs我想架设个奇迹SF 可是从来没接触过 不知道该怎么下手?架设SF需要具备什么自己的PC机可以做但是确定你是固定的IP 申请条 50M 左右的宽带
mq的cluster不显示丛机
本来的集群消费,就会变成像广播消费一样重复消费,并发生混乱设置RocketMQ集群。
sle不可见且不我正在尝试Build一个Rocket问题:consumer从broker拉取的待消费消息时批量的(默认情况下pullBatchSize=32),并发消费时,offset的更新不是按大小顺序的,比如拉取消息m1到m10,m1可能是消费完成的,那提交的offset的正确性如何保证?m10 offset的更新不会导致m1会误认为已消费完成。MQ集群,其中包含一个名称,一个主和两个从。
RocketMQ-禁用TLSv1.0
首先从内存中移除该消息队列的消息进度,然后调用 comPullFromWhere 从磁盘中读取该消息队列的消费进度,创建一个PullRequest对象。检测命令:连接上证明tsl1协议打开
默认情况下是失败3次重试,可通过retryTimesWhenSendFailed定义重试次数;openssl s_client -connect 127.0.0.1:9876 -tls1
修改两个地方
1、启动配置参数
mq启动的配置文件中加入以下参数(启动namesrv和Broker服务 #mqnamesrv和mqbroker启动文件分别调用了runserver.sh和runbroker.sh文件)
2、ja配置文件中修改参数(JAVA_HOME/jre/lib/security/ja.security配置文件)
添加一个 TLSv1, 就可以了
修改完成后重启mq服务,再次检测
openssl s_client -connect 127.0.0.1:9876 -tls1
openssl命令详细
转自:
RocketMQ消费者消息队列负载均衡
先从整体流程上简单梳理一下消息队列负载的过程。
进行负载均衡是在RebalanceServ线程中启动的,一个MQInstance持有一个RebalanceServ实现,并随着MQInstance的启动而启动。
从上面可以看出,MQinstance遍历已注册的消费者,对消费者执行doRebalance方法。
上面是遍历信息对每个主题的队列进行重新负载。接下来将执行 rebalanceByTopic 方法,会根据广播模式或集群模式分别采用不同的方法进行处理。在此处,只解释集群模式下的方法。
对该主题下的队列信息和该消费组内当前所有的消费者ID进行排序,确保一个消费组的成员看到的顺序是一致的,防止同一个消费队列不会被多个消费者消息队列负载由Rebalance线程默认每隔20s进行一次消息队列负载,获取主题队列信息mqSet与消费组当前所有消费者cidAll,然后按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消息队列,同一个消息消费队列同一时间只会分配给一个消费者。此时,可以计算当前消费者分配到消息队列,对比原先的负载队列与当前的分配队列。如果新队列中不包含原来的队列,则停止原先队列消息消费并移除,如果原先队列中不包含新分配队列则创建PullRequest。分配。
allocateResult 记录的是当前消费者的所分配的消息队列
调用 updateProcessQueueTableInRebalance 对比消息队列是否发生变化
然后通过 removeUnnecessaryMessageQueue方法判断是否该mq从缓存中移除。
之后,开始遍历本次负载分配给该消费者的消息队列结合mqSet。如果processQueueTable中没有包含该消息队返回 Null列,表示这是本次新增加的消息队列。
从上面看出,主要有三种计算消息进度的方法,有些大同小异。
首先从磁盘中获取该消息队列的消费进度,如果大于0,说明该消息队列已经被消费过了,下次消费从该位置继续消费。如果等于-1,尝试去作消息存储时间戳作为消费者启动的时间戳,如果能找到则返回找到的偏移量,找不到则返回0;如果小于-1,则说明该消息进度文件中存储了错误的偏移量,返回-1。
在该方法的,会调用 dispatchPullRequest 方法,将PullRequest加入到PullMessageServ中,以唤醒PullMessageServ线程,进行消息拉取。
到这里,消费者负载均衡方面就结束了。
RocketMQ原理解析
RocketMQ原理解析
Name是没有状态的,即Name中的Broker和topic等状态信息(通过其他角色上报获取)都是保存在内存中的,不会持久化存储(可通过配置实现),集群可以横向扩展。主要功能如下:
a.接收Broker(和sle)启动时的注册路由信息;
b.为producer和consumer提供路由服务,即通过topic名字获取所有broker的路由信息;
c.接收broker发送的心跳信息,如果心跳的时间戳过期Name关闭与broker的连接。
Broker向Name注册topic配置信息,配置信息格式如下:
Broker的消息存储
Rocketmq的消息的存储是由consumeQueue和 commitLog 配合完成的,commitLog保存消息的物理数据,consumeQueue是消息的逻辑队列,类似于索引,存储的是指向物理存储的地址。在一个Broker上,只有一个commitLog,所有consumeQueue共享同一个commitLog。
如topic的名字是Topic-Lance,配置的读写队列有queue-1和queue-2,那么Topic-Lance和queue-1组成一个consumeQueue,Topic-Lance和queue-2组成另一个consumeQueue。
如broker-A(包含queue-0,queue-1,queue-2), broker-B(包含queue-0,queue-1)两台broker机器都配置了Topic-Lance,那么broker启动的时候,注册到Name的Topic-Lance的路由有broker-A-queue-0,broker-A-queue-1,broker-A-queue-2,broker-B-queue-0,broker-B-queue-1共5个consumeQueue。
为了提高读写性能,commitLog采取顺序写,随机读(通过pagecache机制批量从磁盘读取到内存,加速后续的读取速度),consumeQueue大部分读入内存(如果consumeQueue因为重启等因素丢失,可以通过commitLog重建)
a.Prod获取该主题下的队列信息和该消费组内当前所有的消费者ID。每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象。ucer发送消息时(必须制定topic),首先从本地的Producer中获取topic->broker的路由信息,如果没有,则从nameserver中获取topic->broker路由,并缓存到本地;
c.Producer定时将Producer的group信息发送到对应的broker上;
d.Producer发送消息到Master的broker上,通过Broker的主从copy到sle的broker上。
发送实现轮询方式:
a.向Name注册Consumer;
b.定时从Name获取topic路由信息;
c.定时清理下线的broker;
d.向所有broker发送心跳;
e.动态调整消费线程池;
f.负责负载均衡服务RebalanceServ。
RocketMQ是基于pull模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息,长轮询拉取消息后回调MessageListener接口实现完成消费。
关于RocketMQ长轮询可参考:
RocketMQ默认保存3天,commit log刷盘间隔,默认1秒
......
1.Exception的情况,一般重复16次 10s、30s、1mins、2mins、3mins等,可以通过设置transactionCheckMax设置;
2.超时情况(Consumer说明:端没有返回CONSUME_SUCCESS,也没有返回RECONSUME_LATER),MQ会无限制的发送给Consumer端,默认超时时间时15分钟。
rocketmqcontroller模式主从无法切换
高可用性设计,主故障检测。
1、高可用性设计rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下::RocketMQ的架构中设计了Controller的主从模式,旨在提供高可用性和容错能力。主负责处理集群管理相DefaultMQPullConsumerImpl#shutdown关的请求,而备用则处于备援状态,接收主的状态同步。
2、主故障检测:RocketMQ的备用Controller会周期性地检测主的健康状况。一旦发现主出现故障,备用将会立即启动,并接管主的工作。
Centos7 自己写了一个ja程序的jar包,需要写一个脚本,然后设置成开机自启动,应该怎么做?
- name: namesrvoptstore首先: centos 7默认 /etc/rc.local是没有执行权限的,所以:
第二步:把如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。你要开机启动的脚本加到这个文件。保存就可以了
RocketMQ的消息重试
步: chmod+x /etc/rc.local对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时并处理消费失败的情况,避免阻塞现象的发生。
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
2)配置方式
消费失败后,重试配置方式集群消费方式下,消息消费失败后期望消息重试,需要在消息接口的实现中明确进行配置
(三种方式任选一种):
抛出异常
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回ConsumeConcurrentlyStatus.CONSUME_SUCC3.发现ESS,此后这条消息将不会再重试。
自定义消息重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置重试次数,重试时间间隔将按照如下策略:
重试次数小于等于 16 次,则重试时间间隔同上表描述。
重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
注意:
消息重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了
MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
配置采用覆盖的方式生效,即启动的 Consumer 实例会覆盖之前的启动实例的配置。
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
RocketMQ之offset确认机制
TransactionalMessageUtil.buildOpTopic();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查作。本文要探讨的offset指的是上图中的Queue Offset。
为了保存消费的消费进度,避免重复消费,我们需要将offset保存下来。
针对集群消费,offset保存在broker,在客户端使用RemoteBrokerOffsetStore。
针对广播消费,offset保存在本地,在客户端使用LocalFileOffsetStore。
,比较重要的一点是,保存的offset指的是下一条消息的offset,而不是消费完一条消息的offset。
比如,你消费了上图中个Queue的offset为0的消息,其实保存的offset为1,表示下次我从offset=1的位置进行消费。
在broker端,通过ConsumerOffsetMar中的offsetTable来保存Topic下各个ConsumerGroup的消费进度。
从offsetTable的双层Map结构也是能够看出,我上面说的消费进度,细指为ConsumerGroup在Topic下每个queue的消费进度。
offsetTable毕竟只是内存结构,因此ConsumerOffsetMar继承了ConfigMar实现了持久化功能。
实现了encode,decode,configFilePath三个模板方法。用于指定序列化,反序列化的逻辑以及保存位置
其中序列化,反序列化的逻辑很简单,就是使用到了我们的FastJson。
保存文件名为consumerOffset.json。
broker启动时从本地文件加载
org.apache.rocketmq.broker.BrokerController#initialize
定时触发,持久化到磁盘
org.apache.rocketmq.broker.BrokerController#initialize
BrokerController#shutdown
用于consumer定时同步offset
拉取消息时会顺带确认offset没错,就这么简单,哈哈,好运
事务回查触发,暂不深入研究
本文只讨论PUSH模式的集群消费,本地的offset缓存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
因为consumer每次重启都会重新拉取offset,只是一个临时存储,因此RemoteBrokerOffsetStore的offsetTable的设计没有像ConsumerOffsetMar那么复杂。
consumer启动后会进行次rebalance,并且之后都会定期rebalance。
在rebalance分配好messagequeue之后,会根据messagequeue生成processqueue进行消息拉取。
而在进行消息拉取前,有一个关键的作, 拉取对应messagequeue的offset 。
RebalanceImpl#updateProcessQueueTableInRebalance
其中获取消息拉取初始位置有三种策略
CONSUME_FROM_LAST_OFFSET 的offset
CONSUME_FROM_FIRST_OFFSET 个offset
CONSUME_FROM_TIMESP 根据时间戳获取offset
但是从源码中可以看出来,实际上的逻辑和我们想象的有点不同,上面三个的逻辑的触发前提是,从broker拉取不到offset进度。
这应该是为了防止重复消费以及少消费,毕竟rocketmq是业务相关的mq。
在consumer端,针对offsetTable的更新,当然通过消费消息触发。
ConsumeMessageConcurrentlyServ#processConsumeResult
针对并发消费的offset,更新值来源于ProcessQueue#removeMessage方法
removeMessage的逻辑,用到了滑动窗口的算法。
比如10条消息,offset为 0 - 9。
然后我第二个线程消费了offset为5的消息,removeMessage返回的offset还是为1
只有前面的消息全被消费了,窗口才会滑动
顺序消费,暂不研究。
最终的offset以broker为准,因此本地的offset要定期持久化到offset。
主要持久化逻辑在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
persistAll和persist逻辑大致相同,核心逻辑都是通过updateConsumeOffsetToBroker持久化到broker。
触发持久化逻辑的时机有以下4个
MQInstance#startScheduledTask
DefaultMQPushConsumerImpl#shutdown
当一个queue不再属于当前consumer的时候,需要同步进步给broker,以便于新拿到queue的consumer从未消费的消息开始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
拉取消息的时候会顺带commit offset
DefaultMQPushConsumerImpl#pullMessage
PullMessageProcessor#processRequest
正常情况下,消息消费失败不会影响窗口滑动,因为针对消费失败的消息,client会进行sendback。
sendback之后,消息经过延迟之后会发往Topic=%RETRY%{CONSUMERGROUP}的Retry队列
ConsumeMessageConcurrentlyServ#processConsumeResult
而sendMessageBack失败的消息,会重新封装成另一个ConsumeRequest在本地再次消费。
这些失败的消息会从之前的consumeRequest移除,因此也就影响到了ProcessQueue#removeMessage的返回值。
但是这是一个优化,重试之后窗口大概率上还是会正常滑动。
如何保证并发消费提交偏移量正确?
基于TreeMap的滑动窗口
如何保证消息消费不丢失?
滑动窗口+broker远端保存+sendback+本地重试兜底
如果broker保存了offset
那么从对应offset重新拉取消息
如果broker没有保存offset,或者其他情况丢失
那么根据配置的策略,从对应的offset开始拉取
版权声明:图片、内容均来源于互联网 如有侵权联系836084111@qq.com 删除