Java开发中的消息队列相关问题处理

作者:陆金龙    发表时间:2023-01-08 07:53   

关键词:RabbitMQ高可用  Kafka高可用  消息队列重复消费  消息丢失  消息队列积压  

内容基于石杉码农学院的石杉老师的视频整理。

1.理解集群、分布式

是指多台服务器集中在一起,实现同一业务。

分布是指将不同的业务,分布在不同的机器上执行。

分布式的每一个节点,都可以做集群,而集群并不一定就是分布式的。

2.如何保证消息队列的高可用性

如何保证消息队列的高可用。如果MQ挂了怎么办?

2.1 RabbitMQ实现高可用

RabbitMQ:不是分布式的。

RabbitMQ有单机模式(生产环境不会这么用),普通集群模式,镜像集群模式。

普通集群模式原理:多台机器部署多个RabbitMQ实例,队列分配到集群的某台机器上(机器A),这台机器包含queue元数据和实际的消息队列数据,其他机器上只包含queue元数据,其他机器需要从机器A拉数据。

特点:1.能提高消费的吞吐量。2.在Rabbit集群内部产生大量的数据传输。3.高可用性没有保障,如果机器A挂了,消息队列就不可用了。

镜像集群模式:多台机器部署多个RabbitMQ实例,每个机器上实例都有queue的完整数据。任何一个节点宕机了,consumer都可以到其他节点上消费数据。实质上保障了可用性。

缺点:不是分布式的。如果queue的数量很大,可能大到机器容量无法容纳。

如何使用:新增一个镜像集群的一个策略,要求数据同步到所有节点。

2.2 Kafka实现高可用

Kafka:是分布式的。

在支持高可用以前的版本,Kafka的分布式实现如下图所示,如果有一台机器宕机,会导致一部分数据的丢失。

kafka8以后,提供了副本机制,支持高可用。

每个partition有多个副本,分布在多台机器上,其中一个副本被选举为leader,其他机器为follower。写数据统一往leader上写,由leader同步到follower上。一台leader机器宕机,kafka会将这个partition的follower中的一台选举为新的leader。

3.如何保障消息消费时的重幂等性

幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。

如何避免消息队列消息的重复消费?

消息会出现重复发送的情况。

3.1 处理kafka的消息重复消费

kafka的消息消费过程示意图如下:

为什么会出现重复发送?

消费者不是消费完一条数据就立马提交offset的,而是定时定期提交一次offset。有可能是消费几条数据后,再提交一次最新那条数据的offset。

假设消费者在上一次提交offset后,依次接收到了offset为113、114的两条数据,准备完成两条数据处理后提交offset=114,但还未提交时消费者所在服务器被重启了,这时可能有113的数据已经完成处理,114的数据未完成处理。

重启后,消息队列基于已经提交的offset,将offset之后的数据进行重发,也就是将offset为113、114的数据又发送一遍。实际情况是113的数据消费者已经处理完毕,这样这条数据就发生了重复消费。

怎么处理重复发送问题?

思路1:在某个地方记录已经处理过的数据的id,内存set、redis等。接收到重复数据后丢弃掉。

思路2:基于数据库的唯一键,重复插入数据会报错,不会导致数据库出现重复数据。

4.处理消息丢失

怎么会出现消息丢失?

1.生产者写消息的过程,没有到mq在网络传输中就丢失。

2.消息写到mq,消费者没来得及消费,mq自己挂掉了。

3.消费者接收到消息,但没来得及处理,消费者挂了。

4.1 RabbitMQ处理消息丢失

1.生产者丢失情况

有事务模式和confirm模式。一般使用confirm模式,因为它是异步的,可以保证高的吞吐量。

(1)使用事务channel.txSelect,报错后执行回滚,重新发送。这种是同步的,导致阻塞。(2)channel设置confirm模式:发送完消息后不用等待。rabbitmq收到消息后会回调生产者的一个接口,告知已经收到消息。如果在rabbitmq接收消息时报错了,就会回调生产者接口告知失败。生产者在接收失败的回调中重复消息。

2.mq自己挂掉情况

给rabbitmq开启持久化机制。

把消息设置成持久化的。数据不丢。mq服务器挂了,重启后从磁盘上恢复消息队列数据。

存在还没来得及持久化,mq就挂了的情况,还是会丢失。但这种概率比较小。

3.消费者丢失情况

打开autoAck的机制。消费者接收到数据后,会自动通知mq,说已经消费了这条数据。

消费者发送确认后,不巧数据没有处理完,消费者宕机,导致消息丢失。

这种情况,需要将autoAck关闭。消费者处理完数据后,再主动向rabbitmq发送ack。如果数据没有处理完,发生宕机,由于ack也没有发出去,mq会保留这条数据,以便重发。

4.2 Kafa处理消息丢失

1.生产者丢失情况,或mq自己挂掉情况。

分布式和leader选举机制下。leader还没有来得及将数据同步给follower就宕机了,将follower切换为leader后,新leader上没有最新的数据,导致数据丢失。

解决方案:

要求起码设置4个参数:

(1)给topic设置replication.factor参数,值必须大于1,要求每个partition必须有至少2个副本。

(2)在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求leader感知到至少有一个follower在同步数据,这样leader挂了至少还有一个follower。

(3)在producer端设置acks=all,这个要求每条数据必须写入所有replica(副本)之后,才能认为是写入成功了。

(4)在producer端设置retries=MAX,这个要求一段写入失败,就无限重试。

进行上述的设置,可以保证生产者的数据必须写入并同步到所有副本,才能认为写入mq成功,否则生产者就会自动重新向mq发送这条数据,直到成功。这样在leader所在的broker发生故障后,leader切换时,数据不会丢失。

2.消费者丢失情况。

需要将自动提交offset的机制关闭。消费者处理完数据后,再手动向kafk提交offset。如果数据没有处理完,发生宕机,由于offset也没有发出去,mq会保留这条数据,以便重发。

5.保障消息顺序

需要保证消息顺序的场景:例如数据库的操作日志,先插入、再修改、再删除这类业务,顺序不能打乱。

顺序会错乱的俩场景

( 1 ) rabbitmq : 一个 queue , 多个 consumer 。

  

   解决: 拆分多个 queue , 每个 queue 一个 consumer ,不能乱序的消息依次写入同一个queue。或者就一个 queue 但是只对应一个 consumer , 然后这个 consumer 内部用内存队列做排队。

  

( 2 ) kafka :

问题1:一个 topic 多个partition, 一个 partition对应 一个 consumer ,也就是一个topic对应了多个consumer。

解决步骤1:生产者在写入的时候,可以指定一个key,比如指定某个订单的id为key。这个订单相关的数据,就会被分发到同一个partition中去。而写入同一个partition中的数据一定是有序的。

消费者从partition取出来的数据也是有序的。

问题2:或者一个topic一个partition, 一个 partition对应 一个 consumer , consumer 内部多线程。

解决步骤2:

分析:如果consumer对应单线程,吃力一条信息是几十ms,1秒钟只能处理几十条数据,吞吐量太低。所以consumer对应多个多线程是需要的。

解决方案:不能将consumer取到的数据随机分发给不同的线程处理。用consumer对应N个内存队列,每个队列对应一个线程。相同key的一组数据分发给同一个内存队列处理。

6.处理消息积压问题

如果consumer出现bug,导致服务器dangji宕机,MQ出现消息积压。

1)先修复 consumer 的问题, 确保其恢复消费速度, 然后将现有 consumer 都停掉。

2)新建一个 topic ,partition 是原来的 10 倍, 临时建立好原先 10 倍数量的queue。

3)写一个临时的分发数据的consumer程序,部署用来消费积压的数据。消费后不做耗时处理,直接写入临时建好的10倍数量的queue。 

4)临时征用10倍的机器来部署10批consumer,每一批consumer,消费一个临时queue数据(每一批consumer相当于正常情况下的处理速度)。