RabbitMQ
Overview
消息队列(Message queue)是一种进程间通信方式。发送者的发出的消息会保存在队列中,直到接收者取走消息。正因为其保存消息的特点,自然而然的支持异步。但同时对于接收者来说,也必须轮询消息队列才能够获取到消息。 消息队列最常用于异步调用、系统解耦、消峰。
RabbitMQ是最常用的消息队列之一,其支持丰富的插件,监控UI友好,且支持分布部署。 RabbitMQ被看作为消息的代理人,其抽象出虚拟连接channel在消息发送者和接收者之间传输消息。其内部连接方式如下:
通过在发送方将
channel连接到exchange(交换机),交换机路由(routing)消息到队列。而队列需要在消费方声明,并且绑定到对应的交换机上,然后消费方从队列中接收消息。

这里需要注意的是,在发送方连接的交换机将同一条消息路由到多个队列,而对于消费方而言始终只能从一个队列中获取消息。这也是所有消息队列的通病。通常的解决方案是通过topic来将多个消息分配到同一个队列。
以下是RabbitMQ最常用的一些例子,开始之前默认我们已经按照官方文档在本地安装了RabbitMQ,这里使用的是docker的3.8.19-management镜像,management的镜像会附带UI管理页面。
Publish/Subscribe
以下将展示使用RabbitMQ的Go客户端将一条消息同时发送给两个接收者,并分别输出。结构图如下:

Producer
首先我们需要获取RabbitMQ的Go客户端:
然后引入工具包,并添加部分准备代码:
然后通过安装时设置的用户名和密码连接服务器:
通过连接获取一个通道:
逻辑上对MQ的操作都是在通道上进行的,比如现在我们需要在通道上声明一个交换机,先暂时忽略选项参数:
需要注意的是,交换机的声明是幂等的,如果不设置交换机名称,系统将会自动设置一个名称。如果已经存在相同名称的交换机,然而选项不同,则会返回错误。 此时则需要将原来的交换机删除才能重新创建,可用通过客户端删除,然后重新创建:
最后再将消息通过指定的交换机发送,同样先忽略选项参数:
完整的发送方的代码如下:
Consumer
在消费方我们需要声明一个队列,并将其绑定到交换机HelloWorkExchange上。 同样我们需要先获取连接和通道:
然后声明一个消费方处理的队列,并将其绑定到交换机上,这里我们设置队列的名称为空字符串,让RabbitMQ为我们自动生成名称:
通常交换机的声明应该由发送方声明,但是我们不确定消费方绑定队列到交换机时,交换机是否已经声明,并且如果没有提前声明交换机就绑定队列的话会报错。所以在绑定之前我们需要先声明交换机。并且由于交换机的声明是等幂的,我们不用担心此处的声明与发送方的声明会产生冲突:
此时通过将队列绑定到交换机上,逻辑上的channel已经连接了发送方和消费方。在消费方通过channel便可以消费消息:
完整的消费方代码如下:
Run
我们分别使用两个终端来运行consumer.go以模仿两个消费方:
然后再运行peoducer.go发送信息:
可以看到两个消费方都能收到消息:
然后通过命令rabbitmqctl list_bindings可以查看到RabbitMQ中的绑定情况:
可以看到,HelloWorkExchange交换机绑定了两个队列。
Routing&Topics
上面的例子我们使用的交换机类型是fanout,从名字也能看出来,这是一种将消息发送到所有绑定队列的方式。RabbitMQ还提供了根据路由来发送消息到指定队列的方式——Routing,以及根据类型匹配到队列的方式——Topics。
从之前的例子我们可以看到,在绑定fanout类型时我们设置的routine key为空:
当交换机类型是fanout时,即使是设置了routine key也会被忽略。只有当类型为direct和topics时才会根据routing key来匹配消息到队列。
Direct exchange
Direct exchange会通过binding key来匹配发送消息的routing key,然后将消息放到对应的队列中被消费方消费。需要注意的是,消息的routing key需要在发送的时候指定。结构图如下:

Producer
我们先修改交换机名称:
对于发送方,依旧是通过连接获取channel,然后声明交换机。但是交换机的类型需要修改为direct:
此外在发送方发送消息时,需要指定routing key。为了更直观,我们这里将routing key使用系统参数传入:
完整代码如下:
Consumer
在接收方则也需要修改交换机的声明:
然后将队列绑定到交换机,并设置路由。这里我们为了更直观,也将路由从系统参数传入:
接收方完整代码如下:
Run
然后在两个终端分别启动不同路由的消费者:
运行发送方发送消息:
可以看到error类型的消息只有Consumer2接收到了,而info类型的消息连个消费方都接收到了:
Topic exchange
Topic exchange可以将由.定界的routing key按照模式匹配到对应的队列。topic exchange接收的路由,因由两个以上的词组成,并以.分隔开,最大长度为255 bytes。仅一个词的routing key不会被topic exchange路由到任何队列。以下符号可以进行匹配:
* :能够匹配任意一个词
# :能够匹配任意0个或多个词
例如如下的绑定:

Q1队列将接收所有橙色的动物,Q2队列将会接收所有懒惰的动物,以及所有兔子。
Producer
topic exchange跟direct exchange几乎一模一样,只需要将声明的交换机类型改为topic,然后在设置对应的路由即可。我们对Direct exchange中的代码做适当修改:
发送方完整代码如下:
Consumer
消费方完整代码如下:
Run
同样我们在两个不同的终端运行消费方的代码:
然后运行发送方代码:
可以看到输出如下:
RPC
RabbitMQ还经常用于实现RPC。客户端通过向一个队列发送一条消息,被服务端处理后在由另一个队列将结果返回给客户端。其中有几个问题需要注意:
请求的消息与返回的消息需要唯一对应
保证消息不丢失
多个服务端监听一个请求队列时,需要做负载均衡
而RabbitMQ很好的解决了这三个问题:
RabbitMQ提供了一套较完整的
amqp协议,可以在发送信息的中设置头部correlation_id,返回时通过将发送的correlation_id接收的correlation_id做比较便可以判断是否为请求返回的结果。amqp协议还提供了delivery_mode的字段,当被设置为persistent时,RabbitMQ 会将消息保存到磁盘中,即使是RabbitMQ 服务器重启,也不会造成消息丢失。(注:RabbitMQ 并不能完全保证消息不丢失,在接收消息但还没保存到磁盘时出现故障,RabbitMQ可能来不及将保存到磁盘中)设置RabbitMQ的Qos可以控制队列将消息推送到消费方的数量,
perfetch count可以设置消费方正在处理的消息的最大数量,perfetch size可以设置每次推送给消费方的消息数量,以此来实现负载均衡。
让我们使用RabbitMQ实现一个简单RPC调用,其架构图如下:

客户端需要创建一个接收消息的队列,供服务端返回消息
客户端请求时需要将唯一的
correlation_id发送到队列,并且附上接收返回的队列名称服务端获取消息并完成调用以后将结果发送到返回的队列中
客户端将返回的
correlation_id和发送的coorelation_id比较,以获取到正确的结果
让我们通过一个RPC调用计算Fibonacci数列的例子来看看RabbitMQ如何实现RPC。
Client
我们需要先声明一个消息队列用于接收返回信息:
然后准备一个唯一的correlation_id用于确定请求的返回信息:
将消息发送到指定队列中,我们这里将其命名为rpc_queue。并通过设置DeliveryMode将消息持久化,防止消息丢失:
需要注意的是,当消息发送方和接收方之间只有一个队列关联时,RabbitMQ允许发送方之间将队列名称作为路由,并为我们创建一个临时的交换机,然后将消息发送到对应队列。所以我们填写的交换机为空,然后将队列名称设置为路由。
最后,消息的发送方还需要接收replyTo队列中返回的消息:
客户端完整代码如下:
Server
在服务方需要准备一个计算Fibonacci数列的方法:
然后声明一个接收RPC消息的队列:
此时,我们需要在服务方维护Qos,以实现负载均衡。 prefetchCount设置每次从队列中取出消息的个数,prefetchSize设置可以预取出的消息个数。global将设置当前的配置是作用于整个Connection连接还是当前channel:
注:如果在没有设置Qos的情况下,一个队列绑定了多个消费方,RabbitMQ会采用轮询的方式,将消息逐个发送到所有消费方,并等到确认之后再次发送
然后便从rpc_queue队列中取出消息:
值得注意的是,这里我们并没有将auto-ack设置为true。因为我们需要在计算完程并返回到消息队列后,再确认消息,以防止消息丢失。接下来便将计算后将结果返回到replyTo队列中:
服务方完整代码如下:
Run
接下来先启动两个服务端:
然后再启动客户端(客户端将调用封装在了main()函数中,以便更直观的展示负载均衡):
查看两个服务端的执行结果,可以看到S1消费了三个耗时较短的方法,S2消费了耗时较长的fib(45):
Dead letter
dead letter被译为”死信“。是RabbitMQ中的一种消息机制。有时候我们在队列中的消息可能出现以下情况:
消息被接收方拒绝
消息在队列中存活时间超过设置的TTL时间
消息队列中的数量已满,发送方仍然在发送消息
那么这些消息会成为死信,如果不进行特殊处理,这些消息将会被丢弃。通常这类消息我们应对对其进行特殊的处理,而不是直接丢弃。 RabbitMQ允许将死信发送到指定队列,这些队列称之为”死信队列“。
让我们来实现一个系统,实现客户端发送消息通知服务端进行某操作。如果有死信的产生,则将死信转发到死信队列,并由专门的服务器发出预警。其结构图如下:

Producer
对于发送方,只需要将消息发送到MsgQueue并在消息头部设置过期时间Expiration即可。过期时间可以设置在队列上,具体可参考Time-To-Live and Expiration。但设置在消息上可以更精确控制每条消息的过期时间。 发送方完整代码如下:
Consumer1
消费者C1中则需要设置拒绝策略。这里我们直接将消息为2的信息拒绝:
C1中还需要设置MsgQueue需要转发的死信交换机:
C1完整代码如下:
Consumer2
对于C2我们需要将死信队列绑定到死信交换机。然后从死信队列获取消息处理。完整代码如下:
Run
我们在发送方设置的消息过期时间为3s,同时发送了六条消息。在C1中处理每条消息需要1s,并且会拒绝消息2。因为设置端消息过期时间为3s,所有消息5,6被接收时以及过期,会被直接发送到死信队列中。也就是说最后C1将会执行消息1,3,4。死信交换机将会收到消息2,5,6。我们在终端运行以下,来验证一下结果:
发送方运行:
可以看到最终的运行结果:
最后更新于
这有帮助吗?