七、Topic 模式

前言

前面章节,用 direct 交换机实现了单播、多播的功能,但是看的出来是通过手动绑定,不是那么灵活。这一章,topic 交换机将会告诉我们,什么叫灵活,而且通过简单、合理的设计, topic 交换机可以完全替代 direct 、fanout 交换机。

模型

使用 topic 交换机的时候,RoutingKey 不能随便取,必须是由 ‘*’ 或者 ‘#’ 分隔的关键词列表。topic 交换机根据关键词对消息进行分发。一些合理的 RoutingKey 如:“stock.usd.nyse”,“nyse.vnw”,“quick.orange.rabbit”。RoutingKey 可以有多个关键词,只要最长不超过 256 个字节就行。

当关键词只有一个的时候,比如 “*.orange.*”,那么效果等同于 RoutingKey 为 “orange” 的 direct 交换机。对于 “*” 和 “#” 两种分隔符:

*(star)可以且仅可以代替一个关键词
#(hash)可以代替零个或者多个关键词

如模型图所示,假如我们要对一些动物进行分类,Q1 的绑定 key 是 “*.orange.*“,Q2 的绑定 key 是 “*.*.rabbit” 和 “lazy.#“。总结一下就是:

Q1 关心的是所有颜色为 “orange” 的动物
Q2 关心的是兔子(是个兔子都要,不管颜色、速度),和所有运动缓慢的动物(只要跑的慢就都要)

如果一个消息的 RoutingKey 是 “quick.orange.rabbit“,那么 Q1、Q2 都会收到这条消息,因为它既符合 Q1 的颜色,也符合 Q2 的兔子。对于 “lazy.orange.elephant“, 也是同理,符合 Q1 的颜色,符合 Q2 的 lazy。

对于 “quick.orange.fox“,只有 Q1 会收到消息,Q2 收不到消息,因为它既不不符合 Q2 的兔子,也不符合 Q2 的lazy。对于 “lazy.pink.rabbit” 只会被送到 Q2 一次,虽然它同时满足 Q2 的两个关键词。

对于 “quick.brown.fox” 不匹配任何队列,则会被丢弃。

如果一个消息的 RoutingKey 是 “orange”,那么它会被丢弃,虽然它包含了 Q1 想要的关键词,但是它的长度不对,前面说到了,*(star)可以且仅可以代表一个关键词。同理 “quick.orange.male.rabbit” 也会被丢弃,虽然它满足 Q2 的兔子关键词。

注意,对于 “lazy.orange.male.rabbit“,不满足 Q1,但是满足 Q2,为什么满足 Q2 ?是因为#(hash)可以代表多个关键词,这个 key 满足了 Q2 的 “lazy.#“,Q2 会收到这条消息。

举了这么多的例子,从各个方面,说清楚了 topic 交换机的关键词匹配机制。

topic 交换机可以表现出其他交换机的特性:

1,如果一个队列的绑定 key 为 "#"(hash),那么它会无视 RoutingKey 收到所有的数据。这就是 fanout 交换机。
2,如果绑定 key 不含 "*"(star)、"#"(hash),那么 topic 交换机表现的就像一个 direct 交换机。

一、生产者(producer)

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Producer
{
    class Program
    {
        static void Main(string[] args)
        {
            //1,声明工厂
            var factory = new ConnectionFactory();

            //2,声明连接
            var connection = factory.CreateConnection();

            //3,声明通道
            var channel = connection.CreateModel();

            //4,声明交换机
            channel.ExchangeDeclare(
                 exchange: "topic_animal",
                 type: ExchangeType.Topic);

            //5,发布消息
            string msg = "怎么也飞不出,这花花的世界";
            var body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(
                exchange: "topic_animal",
                routingKey: "lazy",
                body: body);
            Console.WriteLine($"消息已发布,routingKey:lazy,内容:{msg}");

            msg = "各位客官请接受我,诚挚的祝福";
            body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(
                exchange: "topic_animal",
                routingKey: "orange.fast",
                body: body);
            Console.WriteLine($"消息已发布,routingKey:orange.fast,内容:{msg}");

            msg = "照顾好我七舅老爷!";
            body = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish(
                exchange: "topic_animal",
                routingKey: "book.orange.rabbit",
                body: body);
            Console.WriteLine($"消息已发布,routingKey:book.orange.rabbit,内容:{msg}");
            Console.ReadKey();
        }
    }
}

二、消费者(consumer)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Consumer
    {
        public Consumer(string routingKey)
        {
            //1,声明工厂
            var factory = new ConnectionFactory();

            //2,声明连接
            var connection = factory.CreateConnection();

            //3,声明通道
            var channel = connection.CreateModel();

            //4,声明交换机
            channel.ExchangeDeclare(
                 exchange: "topic_animal",
                 type: ExchangeType.Topic);

            //5,创建队列
            string queueName = channel.QueueDeclare().QueueName;

            //6,绑定队列到交换机
            channel.QueueBind(
                queue:queueName,
                exchange: "topic_animal",
                routingKey: routingKey);

            //7,创建消费者
            var consumer = new EventingBasicConsumer(channel);
            //注册消费事件
            consumer.Received += (object sender, BasicDeliverEventArgs e) =>
            {
                string msg = Encoding.UTF8.GetString(e.Body.ToArray());
                Console.WriteLine($"接收到消息:{msg},RoutingKey:{e.RoutingKey}");
            };

            //8,把消费者挂在队列上开始消费
            channel.BasicConsume(
                queue:queueName,
                autoAck:true,
                consumer:consumer);

            Console.WriteLine($"队列名:{queueName} ,RoutingKey:{routingKey} ,开始接收到消息");
        }
    }
}

消费者创建后,就可以看到绑定关系:

先启动消费者(这里启动 3 个),在启动生产者,可以看到消息分发的结果:

经过以上验证,可以发现 RoutingKey 由消费者和生产者共同制定,然后按照规则,双向匹配即可。

 

以上就是 Topic 模式的内容。

发表评论

Powered by WordPress | Theme Revised from Doo

苏ICP备18047621号

Copyright © 2017-2024 追光者博客