五、Publish/Subscribe 模式

前言

在上一章的 Work queue 模式 中,将消息分配给多个消费者进行处理,处理速度快的获取的消息次数就多。

在这一章,Publish/Subscribe 模式又是另外一个场景:广播场景。就像广播电台,谁有收音机,就可以收到电台。张三用收音机听新闻联播不影响李四也用收音机听新闻联播。

模型

通过模型图,可以看到,生产者(Producer)好像是把相同的消息,塞到了两个不同的队列里,难道有多少消费者就需要多少队列吗?生产者怎么知道有多少个队列?

从这个模式开始,就要明确 “交换机(exchange)” 的概念了,在之前的模式种,没有专门声明交换机,为了便于理解,在模型图中也省略了交换机。实际上生产者并不是直接把消息发布到了队列里,而是把消息发布到了默认的 “direct” 交换机,“direct” 交换机把消息塞到了相应队列名的队列中。

在所有的模式中,生产者都是不直接接触队列的,生产者将消息交给交换机,消息的分配由交换机管理。

RabbitMQ 一共有 4 种交换机:direct、fanout、headers、topic。Publish/Subscribe 模式使用的是 fanout 交换机。

一、生产者(Producer)

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace Simplest_Queue
{
    class Producer
    {
        public Producer()
        {
            //1,创建连接工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建新连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,声明交换机类型
            channel.ExchangeDeclare(
                exchange:"broadcast",      //定义交换机名
                type:ExchangeType.Fanout   //交换机类型
                );

            //5,声明通道参数
            //channel.QueueDeclare(
            //    queue: "Hello Queue",  //队列的名称
            //    durable: false,        //队列是否在代理(broker)重启后继续存在
            //    exclusive: false,      //队列是否在声明者断开连接时被删除
            //    autoDelete: false,     //队列是否在最后一个订阅者取消订阅时被删除
            //    arguments: null
            //    );

            //6,向队列发布消息
            for (int i = 0; i < 100; i++)
            {
                string msg = $"Task {i}";
                var body = Encoding.UTF8.GetBytes(msg);

                channel.BasicPublish(
                exchange: "broadcast",    //指定交换机
                routingKey: "",           //routingKey为空
                basicProperties: null,
                body: body                //消息
                );
            }
        }
    }
}

第 4 步是之前的模式没有的,作用就是声明使用哪一种交换机,RabbitMQ 默认有几个交换机名,但是一般都会给交换机取一个对应场景的名字便于记忆和区分。代码执行完这一步,就可以在 UI 里看到新增的名为 “broadcast” 的 fanout 交换机。

第 5 步全部注销掉了,使用 fanout 交换机,队列的生命周期由交换机管理。就像广播电台到点了就广播,有没有人听那是另外一回事,没有听到的人也就听不到了,不会保留。

第 6 步传入了交换机的名字,并且 routingKey 是空字符串,routingKey 其实就是交换机到队列的映射,RabbitMQ 里叫 “Binding(绑定)”,是一个意思。这里无需指定这个映射,因为生产者根本不知道也不关心消费者的队列,由交换机打理后面的事情。

执行程序,然后在 127.0.0.1:15672  查看队列里的内容:

队列为空,难道消息没有发布出去?不是的,是因为没有消费者!打开 “broadcast” 交换机的详情,可以看到没有任何绑定:

接下来创建消费者,看看具体情况。

二、消费者(Consumer)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Simplest_Queue
{
    class Consumer
    {
        public Consumer()
        {
            //1,创建连接工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建新连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,声明交换机
            channel.ExchangeDeclare(
                exchange:"broadcast",      //交换机名
                type:ExchangeType.Fanout   //交换机类型
                );

            //5,绑定队列
            string queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(
                queue:queueName,
                exchange:"broadcast",
                routingKey:""
                );

            ////5,声明通道参数
            //channel.QueueDeclare(
            //    queue: "Hello Queue",  //队列的名称
            //    durable: false,        //队列是否在代理(broker)重启后继续存在
            //    exclusive: false,      //队列是否在声明者断开连接时被删除
            //    autoDelete: false,     //队列是否在最后一个订阅者取消订阅时被删除
            //    arguments: null
            //    );
            ////增加一条通道设定,保证消费者都能获取到消息
            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //6,创建通道的消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, e) =>
            {
                Thread.Sleep(2000);
                var body = e.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received:{0}",message);

                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
            };

            //6,把消费者放在通道上开始消费
            channel.BasicConsume(
                queue: queueName,
                autoAck: false,
                consumer:consumer
                ) ;

            Console.ReadKey();
        }
    }
}

消费者跟生产者一样,不需要声明队列但是要声明交换机。消费者多一步就是把要消费的队列绑定到交换机上。消费者的个数往往是不确定的,所以这里绑定交换机的时候,使用 RabbitMQ 的方法自动生成队列名,既方便又避免了队列名冲突。从这里就可以回答开头问的问题,确实是有多少消费者,就有多少个队列。

通过上面的代码执行后可以看到队列绑定到了交换机上,启动多少个消费者,就有多少行绑定,下面是启动了 3 个:

创建的队列的队列名也是类似于 GUID 一样的形式和作用:

三、执行程序

1,先启动生产者,再启动消费者

消费者收不到任何消息。因为消费者还没来得及启动,生产者已经生产完了,而没有消费者的消息,fanout 交换机会直接丢弃掉。

2,先启动消费者(启动 3 个),再启动生产者

可以看到消费者收到了消息,但是处理消息的速度很慢(2s 的延时)。

此时关掉生产者,不影响消费者继续消费,并且队列里的数据依然保留,随着消费者的消费逐步减少。

关掉其中某个消费者,对应的队列消失,fanout 对应的绑定也消失,没关闭的继续消费。

 

以上就是 Publish/Subscribe 模式的内容。

发表评论

Powered by WordPress | Theme Revised from Doo

苏ICP备18047621号

Copyright © 2017-2024 追光者博客