RabbitMQ入门介绍

项目中用到了,之前只是简单的使用,现在需要好好的复习一波。

简介

rabbitmq是时下一款热门的消息中间件。有着高可靠,易扩展和丰富的功能等特性。

消息队列中间件(Message Queue Middleware,简称MQ)一般具有两种模式,一种是P2P,另一种就是发布和订阅模式。

消息中间价另外一大好处是,它可以帮助你暂时存储消息,这样就达成了良好的异步消息传递。

MQ的优点:

  • 解耦
  • 存储
  • 削峰
  • 顺序保证(部分)
  • 异步通信
  • 可恢复性

rabbitmq优点总结:

  • 可靠性,通过持久化,传输确认和发布确认等机制来保证。
  • 路由灵活
  • 扩展性强
  • 可用性高
  • 支持多种协议
  • 提供了易用的界面

入门

相关概念

MQ本质上是一个生产者和消费者的队列,能够接受数据,存储数据和转发数据。

生产者生产的消息,可以分成两个部分:payload+label,其中的payload一般都是json数据。然后在存入队列的时候,label就会被丢弃。自然消费者在消费的时候,只会受到一个payload,并没有label。

而我们可以把消息队列本身看成是一个Broker,那么整体流程如下图所示:

image-20200723144217768

rabbitmq的队列可以有多个消费者,彼此之间通过轮询来平均分配。而rabbitmq本身也不支持队列层面的广播。

消息本身其实首先应该交给Exchange交换器,并且由交换器来把消息转发给队列。

RoutingKey路由键,在消息交给exchange的时候,会指定一个routingkey,交换器会根据它来决定应该把消息交给谁。

BindingKey绑定建,exchange通过这个key和队列进行关联。

所以,当生产者把消息给路由器的时候,会把routingkey交给路由器,路由器则需要根据自己的bindkey和routing是否匹配来决定应该把消息交给哪个队列。

交换器类型

  • fanout:把发给交换器的消息,发送给所有绑定了该交换器的队列。
  • direct:只有bindkey和routingkey相同的时候,才发送。
  • topic:稍微复杂一点,就是支持用*#来进行模糊匹配。
  • header:不使用路由键来匹配,而是根据header属性进行匹配,性能很差,没人用,忘了它的存在吧。

具体的过程

有了上面的概念的理解,现在可以详细总结出流程了。

生产者:

  1. 生产者连接到Broker,建立连接并且开启信道。
  2. 生产者需要声明一个交换器,设置相关属性。设置一个队列并且设置相关属性。如果声明的已经存在了就什么都不做并且成功返回。
  3. 通过路由键将交换器和队列进行绑定。
  4. 生产者发送消息给broker,broker有能力根据消息来辨别出应该给那个队列,如果队列存在则把信息存入,不存在则根据配置选择是直接把信息丢弃还是退回给生产者。
  5. 关闭信道和连接。

消费者:

  1. 消费者连接到Broker,建立连接并且开启信道。
  2. 消费者请求队列中的信息,设置好相关的回调函数。
  3. 消费者会接受到信息,并且回送给broker一个ack代表自己受到了。
  4. broker会在消息队列中删除这个已经被确认的消息。
  5. 关闭信道和连接。

其中,一个连接就是一个tcp的连接,而连接之上还有一个信道,本质上就是一个带着唯一ID的虚拟连接。所以其实多个线程,只需要一个连接,然后线程在连接上有各自的channel即可。

开发

消费端的确认与拒绝

消费者在订阅队列的时候,指定好是不是自动确认的,一般我们都使用手动确认。这样服务器在接受到我们的ack之后,才会将消息删除。所以在这种情况下,消息在消息队列中就有两种状态:一种是还没投递的,另外一种是投递了但是还没被确认的。如果消费者断开连接,那么这些没被确认的就会重新进入队列,投递给下一个消费者(当然有可能是之前的那个)。同时rabbitmq并没有对未确认的消息设置过期时间,这意味着可以保存消息非常久。

同样的,消费者可以拒绝这条消息,并且可以有两种方式来处理这个被拒绝的消息:直接丢弃或者是再次放入队列以便发送给下一个。

进阶

消息流向

生产者生产完消息,并且将消息发送给了交换器,如果此时交换器并没有找到相关的路由键能够匹配上,那么可以根据用户的设置选择是将消息返回给生产者或者丢弃。

如果用户设置了返回给生产者,那么生产者需要进行相关的回调操作。如果生产者比较懒,或者出于一些原因并不想写回调逻辑,可以使用备份交换器,即当一个交换器发现没有队列能够匹配的时候,它把消息交给另外一个交换器(备份交换器),而这个备胎则是来者不拒,所有的消息都会放到队列中去。其实备份交换器和普通交换器一模一样,就是将类型设置为fanout而已。

TTL

可以对队列设置TTL,这样整个队列中的消息都有过期时间,也可以单独给消息设置TTL。如果有多个TTL,那么以时间短的为准。

死信队列

当一个队列过期,或者被拒绝了,或者当队列达到最大长度了,那么消息就会被发送到这个队列中。具体的实现就是有一个死信交换器,通过这个交换器可以把信息发送给死信队列。

延迟队列

在电商项目,通过延迟队列达成的订单未支付则经过30分钟后自动处理(取消订单并且把锁定的库存解锁)。但是rabbitmq其实本身并没有这么一种队列,而是用的TLL+死信交换器来处理的。就是给一个消息设置一个TTL,然后当TTL时间到了之后,此时可以通过一个死信交换器交给另外一个队列,那个队列就可以进行相关的操作。

优先级队列

emm 没用过,也觉得没有什么用。

持久化

交换器可以持久化、队列也可以持久化、队列中的消息也可以持久化。交换器和队列是通过设置它们的durable来进行设置的,而消息则是在投递的时候指定的。当然有一个显然的地方:如果队列不是持久化的,而消息是持久化,没了队列,消息也就没了。

当然,代价就是性能比较低,因为要写入到磁盘中,所以如果可靠性不是那么重要的话,建议还是不要设置消息落盘。

持久化了就能保证消息不丢失吗?显然不是的,因为当broker收到消息之后,到消息落盘还是有一段时间的,只要在这段时间中断电,消息还是会丢失的。

生产者确认

生产者将消息发送出去之后,消息到底有没有到达服务器呢?因为之前的模式都是,生产者发送一条消息给服务器,然后服务器就只是单纯的接收消息,其它什么都不做。所以为了确保消息正确到达服务器,rabbitmq提供了两种措施:事务机制和发送方确认机制。

事务

通过信道开启一个事务,然后进行操作,之后进行事务的提交,如果事务提交成功,那么消息就一定到了rabbitmq中;如果失败了那么就调用相关的回滚函数来进行处理。

事务的底层就是通过和服务器进行通信,服务器如果回送了OK,那么就说明成功。

事务的缺点就是:性能太差了。

发送方确认

发送方其实就是生产者,简单来说就是一个轻量级的事务。生产者把信道设置成确认模式,这样只要这个任何在这个信道上传送的消息,都会有一个唯一的ID,并且当消息被投递到队列之后(注意!不是到达服务器),服务器就会回送一个关于这个ID的确认消息。如果要求持久化,可以当消息被投递到队列且成功保存到磁盘中之后,才发送确认消息。

那么它比事务轻量在哪里呢?事务需要确保成功提交,这样事务1在执行的时候,事务2是无法进行的。而这个机制则不同,它可以异步。就跟TCP一样,对一个包进行了确认,相当于对之前的所有的包进行了确认。

消费者

消费者之前就推荐了进行手动ACK,当然除了确认,消费者还需要注意几点。

消息分发

如果有多个消费者消费同一个队列,那么消息将是通过轮询的方法分发的。但是这个机制比较笨重,因为rabbitmq用的是取模运算来分配消息的,所以如果有些消费者消费的比较慢,就会导致消息的堆积了。

这个时候就可以用到QoS来限制信道中消费者能够拥有的最大未确认消息的数量。类似TCP中的滑动窗口机制。

消息顺序性

消息的顺序性是指,消费者消费到的消息,和发送者发送的消息的顺序是一致的。

那么rabbitmq能否保证呢?并不能。很简单,因为网络等原因,生产者发送给队列的消息本身就不是按顺序到达的。就算能保证顺序到,那优先级队列就是一个最好的反例。

当然这么说可能有点偏执,我们此时把范围稍微缩小点,已经在队列中落盘的消息,和消费者受到的消息的顺序,是一致的吗?

当然也不是啊。因为消费者可以拒绝某个消息,然后消息可以再次回到消息队列中。

所以如果我们如果要确保一致性,就需要自己实现一个全局序列号来实现,这个就和消息队列无关了。

弃用QueueingConsumer

这个消费者的底层用的是LinkedBlockingQueue,所以如果你一次性往队列中放了太多了的消息,那么这个队列会把这些消息全部发给消费者,消费者的LinkedBlockingQueue会无限膨胀,可能会有OOM问题发生。同样由于底层是链表,所以可能会死锁。

消息传输保证

消息中间件中消息的传输保证分成三个部分:

  • At most once:最多一次。也就是消息可能会丢失,但是绝对不会重复。
  • At least once:最少一次。消息绝对不丢失,但是可能会重复。
  • Exactly once:恰好一次。每条消息都肯定会被传输一次,也仅传输一次。

可惜,rabbitmq并不支持Exactly once。

下面两种情况,rabbitmq如何解决?

  • 生产者生产完消息,发送给MQ。MQ给出对应的回复表示消息自己收到了,但是这条确认消息由于网络等原因丢了。此时生产者没收到确认,那么它就会再次把消息发送给MQ,此时MQ中这条消息就会有两条了。
  • 消费者消费完消息,然后发送ack给MQ,由于网络等原因,这条消息也丢了。那么MQ可能会把这条消息再度放到消息队列中,之后又会有消费者收到这条消息,相当于一条信息重复消费了,怎么办?

答案是:rabbitmq解决不掉。你需要自己通过外部的全局唯一标识符(Twitter的雪花算法和美团的Leaf)来自己处理。