六、Routing 模式

前言

前面 Publish/Subscribe 模式,fanout 交换机无脑的把数据分发到每一个连接的队列里,这一章节,对消息进行定向分发。

模型

可以看到,使用的是 direct 交换机,把消息用 routingKey 分类,送到不同的队列里,然后不同的消费者分别去处理相应类别的消息。

一、生产者(Producer)

直接上代码,将不同级别的日志,送往不同 routingKey 下的所有队列里:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace rabbitmq
{
    public class Producer
    {
        public Producer()
        {
            //1,创建工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建 连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,创建一个交换机
            channel.ExchangeDeclare(
                exchange: "exchangeOne",
                type: ExchangeType.Direct,
                durable: false,
                autoDelete: false,
                arguments: null
                );

            //6,发布消息到队列
            for (int i = 0; i < 5; i++)
            {
                string msg = $"日志内容:{i}";
                var body = Encoding.UTF8.GetBytes(msg);

                string routingKey = RandomMsgLevel().ToString();
                //创建队列并获取队列名
                string queueName = channel.QueueDeclare().QueueName;

                //把队列绑定到交换机(routingKey表示绑定在哪个路径下)
                channel.QueueBind(
                    queue: queueName,
                    exchange: "exchangeOne",
                    routingKey:routingKey,
                    arguments:null
                    );

                channel.BasicPublish(
                   exchange: "exchangeOne",
                   routingKey: routingKey,
                   basicProperties:null,
                   body:body); 
            }

            Console.WriteLine("");
        }

        private MsgLevel RandomMsgLevel()
        {
            var random = new Random();
            int i = random.Next(0,3);

            if (i <= 2) return MsgLevel.Debug;
            if (i <= 5) return MsgLevel.Info;
            //if (i <= 7) return MsgLevel.Warning;
            //if (i <= 10) return MsgLevel.Error;

            return MsgLevel.Error;
        }

        private enum MsgLevel
        {
            Debug = 0,
            Info = 1,
            Warning = 2,
            Error = 3,
        }
    }
}

程序中,声明了交换机,声明的队列,然后将他们绑定起来,绑定的时候,使用了 RoutingKey。

下面是交换机列表,和我们声明的交换机 “exchangeOne” :

下面是该交换机绑定的队列:

交换机 “exchangeOne” 下面,有 2 个路径(Routing),RoutingKey 分别是 “Info” 和 “Debug”,“Info” 路径下绑定了 3 个队列,“Debug” 路径下绑定了 2 个队列。

从上图可以看出,这并不符合一开始模型图的要求。但是方法已经确定了,就是队列通过 RoutingKey 绑定到交换机,只要在代码中,把已有的队列再绑定一次,比如:

channel.QueueBind(
     queue: queueName,
     exchange: "exchangeOne",
     routingKey: "Info",
     arguments: null);

已经绑定过 “Debug” Routing 的队列,再次绑定一次 “Info” Routing,就实现了一个队列即接收 Debug 消息,又接收 Info 消息。

小结:
1,队列可以多次绑定到不同的 RoutingKey 
2,队列绑定到重复的 RoutingKey 不会报错,相当于字典重复添加已有的话就不做动作就好了。

二、消费者(Consumer)

写到消费者的时候发现,消费的时候,需要指定消费哪个具体的队列吧,但是队列是生产者创建的,消费者不知道名字,那消费个啥。
所以生产者的代码要改一下,不声明、绑定队列,只往指定交换机、指定路由里发布消息:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace rabbitmq
{
    public class Producer
    {
        public Producer()
        {
            //1,创建工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建 连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,创建一个交换机
            channel.ExchangeDeclare(
                exchange: "exchangeOne",
                type: ExchangeType.Direct,
                durable: false,
                autoDelete: false,
                arguments: null
                );

            //6,发布消息到队列
            for (int i = 0; i < 5; i++)
            {
                string msg = $"日志内容:{i}";
                var body = Encoding.UTF8.GetBytes(msg);

                string routingKey = RandomMsgLevel().ToString();
                //创建队列并获取队列名
                //string queueName = channel.QueueDeclare().QueueName;
                //把队列绑定到交换机(routingKey表示绑定在哪个路径下)
                //channel.QueueBind(
                //    queue: queueName,
                //    exchange: "exchangeOne",
                //    routingKey:routingKey,
                //    arguments:null
                //   );

                channel.BasicPublish(
                   exchange: "exchangeOne",
                   routingKey: routingKey,
                   basicProperties:null,
                   body:body); 
                if(routingKey != "Info")
                {
                   channel.BasicPublish(
                      exchange: "exchangeOne",
                      routingKey: "Info",
                      basicProperties:null,
                      body:body); 
               }
               Console.WriteLine($"{i} 已发布到 {routingKey} 和 Info");
            }

            Console.WriteLine("");
        }
    }
}

上面代码中,每一份消息,都发布到了 “Info” 路由里,满足了开篇模型图的要求。

消费者自己带着队列来到系统里,订阅对应的路由。交换机就会把生产者新发布的消息送到绑定(订阅)的队列里。代码如下:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace rabbitmq
{
    public class Producer
    {
        public Consumer()
        {
            //1,创建工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建 连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,声明交换机
            channel.ExchangeDeclare(
                exchange: "exchangeOne",
                type: ExchangeType.Direct
                );

            //5,声明队列
            string queueName = channel.QueueDeclare().QueueName;

            //6,声明要消费的队列
            channel.QueueBind(
                queue: queueName,
                exchange: "exchangeOne",
                routingKey: "Info");

            //7,创建消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, e) =>
            {
                var body = e.Body.ToArray();
                string msg = Encoding.UTF8.GetString(body);
                Console.WriteLine($"接收到消息:{msg}");
                Thread.Sleep(5000);

                channel.BasicAck(e.DeliveryTag, false);
            };

            //8,把消费者挂到通道中开始消费
            channel.BasicConsume(
                queue:queueName,
                autoAck:false,
                consumer:consumer);
        }
    }
}

这里的消费者,指定消费 “Info” 路由里的东西,所以实际的执行结果如下:

(发布到 Info和 Info 是打日志的效果,实际只发布一次到 Info,见生产者代码)

————————–

以上就是 Routing 模式的内容,总结一下,有以下几个点:

1,生产者和消费者都能声明交换机(exchange)、路由(RoutingKey)、队列(Queue),只要相互匹配上,消费者就能消费队列里的数据。

2,“谁声明谁负责” 原则,虽然可以重复声明,但其实只有第一次声明有效且归属权属于第一次声明者。比如队列,只能由第一次声明的对象进行释放,这个在 Simplest queue 模式 中写过。

3,“RoutingKey” 对 “Direct” 交换机有效,对 “fanout” 交换机无效。所以 “fanout” 交换机的 “RoutingKey” 参数值经常为空字符串,即便写了也没啥用。

 

三、疑问

1,刚开始的生产者,自己创建队列的代码,程序执行后,factory、connection、channel 对象都保持不释放(断点打在 Console.WriteLine()),但是过一段时间(3 分钟左右),队列就会消失,连同里面的消息一起。像是被 RabbitMQ 给清除掉了,不明白为啥会这样。

 

发表评论

Powered by WordPress | Theme Revised from Doo

苏ICP备18047621号

Copyright © 2017-2024 追光者博客