前言
前面章节,用 direct 交换机实现了单播、多播的功能,但是看的出来是通过手动绑定,不是那么灵活。这一章,topic 交换机将会告诉我们,什么叫灵活,而且通过简单、合理的设计, topic 交换机可以完全替代 direct 、fanout 交换机。
模型
使用 topic 交换机的时候,RoutingKey 不能随便取,必须是由 ‘*’ 或者 ‘#’ 分隔的关键词列表。topic 交换机根据关键词对消息进行分发。一些合理的 RoutingKey 如:“stock.usd.nyse”,“nyse.vnw”,“quick.orange.rabbit”。RoutingKey 可以有多个关键词,只要最长不超过 256 个字节就行。
当关键词只有一个的时候,比如 “*.orange.*”,那么效果等同于 RoutingKey 为 “orange” 的 direct 交换机。对于 “*” 和 “#” 两种分隔符:
*(star)可以且仅可以代替一个关键词 #(hash)可以代替零个或者多个关键词
如模型图所示,假如我们要对一些动物进行分类,Q1 的绑定 key 是 “*.orange.*“,Q2 的绑定 key 是 “*.*.rabbit” 和 “lazy.#“。总结一下就是:
Q1 关心的是所有颜色为 “orange” 的动物 Q2 关心的是兔子(是个兔子都要,不管颜色、速度),和所有运动缓慢的动物(只要跑的慢就都要)
如果一个消息的 RoutingKey 是 “quick.orange.rabbit“,那么 Q1、Q2 都会收到这条消息,因为它既符合 Q1 的颜色,也符合 Q2 的兔子。对于 “lazy.orange.elephant“, 也是同理,符合 Q1 的颜色,符合 Q2 的 lazy。
对于 “quick.orange.fox“,只有 Q1 会收到消息,Q2 收不到消息,因为它既不不符合 Q2 的兔子,也不符合 Q2 的lazy。对于 “lazy.pink.rabbit” 只会被送到 Q2 一次,虽然它同时满足 Q2 的两个关键词。
对于 “quick.brown.fox” 不匹配任何队列,则会被丢弃。
如果一个消息的 RoutingKey 是 “orange”,那么它会被丢弃,虽然它包含了 Q1 想要的关键词,但是它的长度不对,前面说到了,*(star)可以且仅可以代表一个关键词。同理 “quick.orange.male.rabbit” 也会被丢弃,虽然它满足 Q2 的兔子关键词。
注意,对于 “lazy.orange.male.rabbit“,不满足 Q1,但是满足 Q2,为什么满足 Q2 ?是因为#(hash)可以代表多个关键词,这个 key 满足了 Q2 的 “lazy.#“,Q2 会收到这条消息。
举了这么多的例子,从各个方面,说清楚了 topic 交换机的关键词匹配机制。
topic 交换机可以表现出其他交换机的特性:
1,如果一个队列的绑定 key 为 "#"(hash),那么它会无视 RoutingKey 收到所有的数据。这就是 fanout 交换机。 2,如果绑定 key 不含 "*"(star)、"#"(hash),那么 topic 交换机表现的就像一个 direct 交换机。
一、生产者(producer)
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Producer { class Program { static void Main(string[] args) { //1,声明工厂 var factory = new ConnectionFactory(); //2,声明连接 var connection = factory.CreateConnection(); //3,声明通道 var channel = connection.CreateModel(); //4,声明交换机 channel.ExchangeDeclare( exchange: "topic_animal", type: ExchangeType.Topic); //5,发布消息 string msg = "怎么也飞不出,这花花的世界"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "topic_animal", routingKey: "lazy", body: body); Console.WriteLine($"消息已发布,routingKey:lazy,内容:{msg}"); msg = "各位客官请接受我,诚挚的祝福"; body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "topic_animal", routingKey: "orange.fast", body: body); Console.WriteLine($"消息已发布,routingKey:orange.fast,内容:{msg}"); msg = "照顾好我七舅老爷!"; body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "topic_animal", routingKey: "book.orange.rabbit", body: body); Console.WriteLine($"消息已发布,routingKey:book.orange.rabbit,内容:{msg}"); Console.ReadKey(); } } }
二、消费者(consumer)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ConsoleApp1 { public class Consumer { public Consumer(string routingKey) { //1,声明工厂 var factory = new ConnectionFactory(); //2,声明连接 var connection = factory.CreateConnection(); //3,声明通道 var channel = connection.CreateModel(); //4,声明交换机 channel.ExchangeDeclare( exchange: "topic_animal", type: ExchangeType.Topic); //5,创建队列 string queueName = channel.QueueDeclare().QueueName; //6,绑定队列到交换机 channel.QueueBind( queue:queueName, exchange: "topic_animal", routingKey: routingKey); //7,创建消费者 var consumer = new EventingBasicConsumer(channel); //注册消费事件 consumer.Received += (object sender, BasicDeliverEventArgs e) => { string msg = Encoding.UTF8.GetString(e.Body.ToArray()); Console.WriteLine($"接收到消息:{msg},RoutingKey:{e.RoutingKey}"); }; //8,把消费者挂在队列上开始消费 channel.BasicConsume( queue:queueName, autoAck:true, consumer:consumer); Console.WriteLine($"队列名:{queueName} ,RoutingKey:{routingKey} ,开始接收到消息"); } } }
消费者创建后,就可以看到绑定关系:
先启动消费者(这里启动 3 个),在启动生产者,可以看到消息分发的结果:
经过以上验证,可以发现 RoutingKey 由消费者和生产者共同制定,然后按照规则,双向匹配即可。
以上就是 Topic 模式的内容。