前言
在上一章的 Work queue 模式 中,将消息分配给多个消费者进行处理,处理速度快的获取的消息次数就多。
在这一章,Publish/Subscribe 模式又是另外一个场景:广播场景。就像广播电台,谁有收音机,就可以收到电台。张三用收音机听新闻联播不影响李四也用收音机听新闻联播。
模型
通过模型图,可以看到,生产者(Producer)好像是把相同的消息,塞到了两个不同的队列里,难道有多少消费者就需要多少队列吗?生产者怎么知道有多少个队列?
从这个模式开始,就要明确 “交换机(exchange)” 的概念了,在之前的模式种,没有专门声明交换机,为了便于理解,在模型图中也省略了交换机。实际上生产者并不是直接把消息发布到了队列里,而是把消息发布到了默认的 “direct” 交换机,“direct” 交换机把消息塞到了相应队列名的队列中。
在所有的模式中,生产者都是不直接接触队列的,生产者将消息交给交换机,消息的分配由交换机管理。
RabbitMQ 一共有 4 种交换机:direct、fanout、headers、topic。Publish/Subscribe 模式使用的是 fanout 交换机。
一、生产者(Producer)
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace Simplest_Queue { class Producer { public Producer() { //1,创建连接工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建新连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,声明交换机类型 channel.ExchangeDeclare( exchange:"broadcast", //定义交换机名 type:ExchangeType.Fanout //交换机类型 ); //5,声明通道参数 //channel.QueueDeclare( // queue: "Hello Queue", //队列的名称 // durable: false, //队列是否在代理(broker)重启后继续存在 // exclusive: false, //队列是否在声明者断开连接时被删除 // autoDelete: false, //队列是否在最后一个订阅者取消订阅时被删除 // arguments: null // ); //6,向队列发布消息 for (int i = 0; i < 100; i++) { string msg = $"Task {i}"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "broadcast", //指定交换机 routingKey: "", //routingKey为空 basicProperties: null, body: body //消息 ); } } } }
第 4 步是之前的模式没有的,作用就是声明使用哪一种交换机,RabbitMQ 默认有几个交换机名,但是一般都会给交换机取一个对应场景的名字便于记忆和区分。代码执行完这一步,就可以在 UI 里看到新增的名为 “broadcast” 的 fanout 交换机。
第 5 步全部注销掉了,使用 fanout 交换机,队列的生命周期由交换机管理。就像广播电台到点了就广播,有没有人听那是另外一回事,没有听到的人也就听不到了,不会保留。
第 6 步传入了交换机的名字,并且 routingKey 是空字符串,routingKey 其实就是交换机到队列的映射,RabbitMQ 里叫 “Binding(绑定)”,是一个意思。这里无需指定这个映射,因为生产者根本不知道也不关心消费者的队列,由交换机打理后面的事情。
执行程序,然后在 127.0.0.1:15672 查看队列里的内容:
队列为空,难道消息没有发布出去?不是的,是因为没有消费者!打开 “broadcast” 交换机的详情,可以看到没有任何绑定:
接下来创建消费者,看看具体情况。
二、消费者(Consumer)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace Simplest_Queue { class Consumer { public Consumer() { //1,创建连接工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建新连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,声明交换机 channel.ExchangeDeclare( exchange:"broadcast", //交换机名 type:ExchangeType.Fanout //交换机类型 ); //5,绑定队列 string queueName = channel.QueueDeclare().QueueName; channel.QueueBind( queue:queueName, exchange:"broadcast", routingKey:"" ); ////5,声明通道参数 //channel.QueueDeclare( // queue: "Hello Queue", //队列的名称 // durable: false, //队列是否在代理(broker)重启后继续存在 // exclusive: false, //队列是否在声明者断开连接时被删除 // autoDelete: false, //队列是否在最后一个订阅者取消订阅时被删除 // arguments: null // ); ////增加一条通道设定,保证消费者都能获取到消息 //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6,创建通道的消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { Thread.Sleep(2000); var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received:{0}",message); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; //6,把消费者放在通道上开始消费 channel.BasicConsume( queue: queueName, autoAck: false, consumer:consumer ) ; Console.ReadKey(); } } }
消费者跟生产者一样,不需要声明队列但是要声明交换机。消费者多一步就是把要消费的队列绑定到交换机上。消费者的个数往往是不确定的,所以这里绑定交换机的时候,使用 RabbitMQ 的方法自动生成队列名,既方便又避免了队列名冲突。从这里就可以回答开头问的问题,确实是有多少消费者,就有多少个队列。
通过上面的代码执行后可以看到队列绑定到了交换机上,启动多少个消费者,就有多少行绑定,下面是启动了 3 个:
创建的队列的队列名也是类似于 GUID 一样的形式和作用:
三、执行程序
1,先启动生产者,再启动消费者
消费者收不到任何消息。因为消费者还没来得及启动,生产者已经生产完了,而没有消费者的消息,fanout 交换机会直接丢弃掉。
2,先启动消费者(启动 3 个),再启动生产者
可以看到消费者收到了消息,但是处理消息的速度很慢(2s 的延时)。
此时关掉生产者,不影响消费者继续消费,并且队列里的数据依然保留,随着消费者的消费逐步减少。
关掉其中某个消费者,对应的队列消失,fanout 对应的绑定也消失,没关闭的继续消费。
以上就是 Publish/Subscribe 模式的内容。