消息队列在项目中的应用总结

前言

写这篇文章的目的是这样的,我自己做的项目是用到了MQ来完成最终一致性,并且我发现了我的代码的逻辑和网上的那些商城的逻辑是不同的。然后面试的时候,似乎面试官问我的都是网上的那些商城的逻辑,然后顺着他们的逻辑走,问我会出现什么问题,或者是出现了某某问题应该怎么解决,然后我在这里栽了两次跟头了,所以这篇博客就需要好好梳理下别人到底是怎么做的,然后把他们遇到的问题。

为了之后的叙述方便,下文以A服务代替下单服务,B服务代替库存服务,C服务代替优惠券服务,D服务代替积分服务。如果不使用MQ的话,正常的逻辑就是A服务会调用BCD,并且等待BCD完成之后继续执行。(注意,这里没有用到分布式事务,所以会出现一堆问题,下面来解决)

为什么要用消息队列

消息队列的三大特性,无非就是削峰、解耦和异步。

解耦

以上面的为例,目前A服务需要调用BCD这三个服务。随着时间的推进,可能之后还会有EFG服务,也需要被A服务调用,那么这个时候如果你是写代码的,该怎么办呢?简单,我继续在BCD后面接着调用呗。这就造成了A服务与这些服务之间造成了强耦合

这还没完,除了每次新增服务需要修改代码之外,还需要考虑其它服务的成功与否。比如A服务在调用E服务的时候,发现E服务失败了,那么它该怎么办呢?

这时MQ的好处就凸显出来了,A服务只需要发送消息给消息队列,就可以不用管了。然后接下来的BCDEFG这些服务订阅相关的消息,然后进行处理就可以了,如果你又新增了一个服务,那这个服务只需要也去订阅这个消息就可以了,不用和A发生关系,A甚至都不需要知道这些服务的存在。

异步

还是A服务调用BCD这三个服务的场景。如果B服务需要消耗40ms,C服务需要消耗50ms,D服务需要消耗60ms,如果A服务是按照顺序调用它们的话,那么消耗的时间就是150ms。当然实际中我们肯定不可能这么做,而是用的线程池、异步编排等技术来让这些服务并发的调用。

如果你要使用消息队列,那么就相当于,A服务只需要去往MQ发送消息(这个速度是非常非常快的,<10ms),然后直接就可以给用户返回了。之后的各个服务会自己去消息队列中取数据,然后进行处理,此时花费的时间再久也不关心了。

削峰

这个就最好理解了。要是请求非常大,那么A也只是把消息发给消息队列,之后别的BCD服务,只会用它们的最大速度来处理消息,并不会导致系统崩溃。

消息队列的缺点

上来先说了消息队列的三个优点,那它的缺点呢?

  1. 平白无故多了一个组件,那么势必会造成可用性的降低。
  2. 系统的复杂度势必要提高了,比如你的消息的重复性消费问题、消息的丢失问题等等等等。
  3. 面试中问的最多的一个问题(其实我自己做的系统里是不会出现这个问题的),就是当A成功了之后,它就直接返回了。但是如果BCD这些服务有一个或者多个它失败了,这个该怎么解决呢?

小结

消息队列有它的好处,也会不可避免让整个系统更加复杂。但是如果实际中我们可以解决掉消息队列的这些缺点(这些问题也是面试官特别爱问的问题),就可以开心使用了。

主流消息队列产品的横向对比

我实际中使用的消息队列产品是rabbitMQ,当时选择它最主要的目的就是因为市面上关于它的资料众多。

实际上rabbitMQ它的吞吐量是不高的(当然对于我这个小项目是绰绰有余的),但是它的延迟是最低的,又有自己的管理界面,非常方便。

Kafka是apache软件基金会下的开源的实现,而且社区活跃度很高,基本上apache就是它的金字招牌,吞吐量和可用性都非常好。

而RocketMQ是阿里旗下的开源产品,用java编写,也具有高可用性、高吞吐量和低延迟性。

消息队列问题的解决

如何保证消息的时序性?

发送消息的时候,发送的是消息1,消息2,消息3,那如果消费者要消费,然后取出的顺序是213,这种情况怎么办呢?

对于时序性很重要的数据,我们可以让一个queue对应一个消费者,也就是每一个queue对应一个消费者。

对,就是这么简单。然后我自己的实际项目中,就只有一个queue,所以我自然是完全不需要担心时序性的问题。然后被面试官问的一脸懵逼。

如何保证消息的可靠性呢?

我们先来捋一捋消息的传递过程,然后看看在哪些步骤中可能会发生丢失。

  1. 生产者向消息队列发送了消息。
  2. 消息队列收到消息,存在内存中。
  3. 消费者需要这个消息,所以消息队列会把消息发送给消费者。

总结一下可能发生问题的地方:

  • 生产者把消息发送给MQ的时候,由于网络等原因,消息丢掉了;或者是MQ自己的原因,也没有保存下来。
  • MQ把信息是放到内存中的,突然一个断电,MQ的内存里的消息就没了。
  • 在MQ发往消费者的途中,由于网络等原因,消息丢掉了。
  • 消费者收到了消息,正准备处理,自己挂掉了。

生产者到MQ丢失

rabbitMQ是支持事务功能的,只要生产者发送数据前首先打开事务,然后再发送数据。如果rabbitmq没有收到消息,就可以回滚,然后重新发送消息。而如果消息队列收到了消息,那么就进行事务的提交。但是事务的这个对吞吐量的影响非常大,所以不推荐这么做。

还有一种叫confirm模式。每次写消息会分配一个id,然后如果写到了rabbitmq中,就会回送你一个ack,告知消息成功。如果失败了,MQ会调用你的接口,你可以在接口中进行消息的重新发送等操作。

MQ挂掉导致的丢失

MQ本身也支持持久化到磁盘的。那么仅仅当MQ把消息持久化到磁盘中,才会给客户端回送一个ACK,那么就能保证不会丢失了。

消费者丢失

只有当消费者确确实实把消息消费完了,才会去向MQ回送一个ACK,这样就不怕了。

如何保证消息不被重复消费呢?

这个其实是保证不了的,而是要靠服务自己设计成幂等性的,然后来保证。重要的服务,你可以每次都把执行的操作放到数据库里,然后如果消息二次传递过来了,就去数据库里看看有没有,没有的话就需要执行,有的话就忽视这个消息。而如果是不怎么重要的消息,可以配合Redis来做,设置一个过期时间就可以了。

如何保证MQ本身的高可用呢?

显然MQ在生产环境中必然不可能是单机部署的,否则一挂掉所有的服务都跟着完蛋了。高可用性非常重要,所以下面单独介绍。

如何保证消息不积压呢?

实际中一旦出现消息的积压,其实是非常严重的问题,基本就是消费者都不消费数据了,或者说消费非常慢。

如果一旦真的发生这种情况了,最容易想到的就是立马去看看消费者是不是出了问题,要赶紧修复消费者的问题。但是这个不是好的选择,因为就算修复了消费者,那也需要不少时间才能把堆积起来的消息给消费掉。一般的话只能紧急临时扩容了,简单来说就是租用机器,然后把堆积的数据分发给这些机器,赶紧把堆积给减少下来,然后让消费者慢慢去消费完成。

如果积压的消息,因为TTL等原因丢失了,那么只能通过手动查数据库等方式把这些消息手动建立出来,然后自己发给MQ了。

RabbitMQ的高可用性

单机

这个就不解释了….

普通集群

多台机器,每台机器上面启动一个rabbitmq的实例。但是你的queue,其实只在一台机器上有。

我们假设生产者向A主机的queue发送了一条消息,当一个消费者去问另外一台主机B的rabbitmq实例索要数据的时候,因为这台B主机根本就没有数据,但是它能够知道谁有这条数据,于是B主机就会向A主机拉取这条数据,然后发送给消费者。

缺点显而易见了,集群的内部会有大量的数据传输,而且真正的数据其实就在一台主机上放着,只要那台主机完了,你的数据也就丢了。

镜像集群

上面的普通集群的缺点是数据其实只保存在了一台主机上面,那镜像集群的改进就是每个主机都有所有的数据,这样任何一台挂掉都不会影响集群的可用性。写的时候需要把所有的消息同步到每个节点中去,然后读的时候就可以随便读了。缺点么也是很明显,就是消息要同步到所有的机器上去,对于带宽的压力是很大的,而且这个不是分布式的,没有扩展性可言,由于每台机器都会有全部的数据,那么就会有明显的木桶效应——集群的总量取决于最垃圾的那台主机的MQ的容量。

我的项目中的逻辑梳理

我项目中是这样的,下订单服务会调用扣库存服务,也就是它们俩是紧紧耦合在一起的。下订单的服务会等扣库存的服务完成了之后,才会继续往下执行。

只要扣库存失败了,那么整个订单也无法下成功,会回滚。逻辑上只会出现一种问题,就是我扣库存成功了,然后我订单下去遇到了一个异常,此时是可以通过诸如seata这种开源第三方库来完成的,我的做法是,当你的库存成功且订单失败的情况下,让订单服务去MQ里面发送一条消息,这条消息会被发给库存服务,让库存服务自己有空了去把刚刚扣除的库存给加回来。