前言
前面 Publish/Subscribe 模式,fanout 交换机无脑的把数据分发到每一个连接的队列里,这一章节,对消息进行定向分发。
模型
可以看到,使用的是 direct 交换机,把消息用 routingKey 分类,送到不同的队列里,然后不同的消费者分别去处理相应类别的消息。
一、生产者(Producer)
直接上代码,将不同级别的日志,送往不同 routingKey 下的所有队列里:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace rabbitmq { public class Producer { public Producer() { //1,创建工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建 连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,创建一个交换机 channel.ExchangeDeclare( exchange: "exchangeOne", type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null ); //6,发布消息到队列 for (int i = 0; i < 5; i++) { string msg = $"日志内容:{i}"; var body = Encoding.UTF8.GetBytes(msg); string routingKey = RandomMsgLevel().ToString(); //创建队列并获取队列名 string queueName = channel.QueueDeclare().QueueName; //把队列绑定到交换机(routingKey表示绑定在哪个路径下) channel.QueueBind( queue: queueName, exchange: "exchangeOne", routingKey:routingKey, arguments:null ); channel.BasicPublish( exchange: "exchangeOne", routingKey: routingKey, basicProperties:null, body:body); } Console.WriteLine(""); } private MsgLevel RandomMsgLevel() { var random = new Random(); int i = random.Next(0,3); if (i <= 2) return MsgLevel.Debug; if (i <= 5) return MsgLevel.Info; //if (i <= 7) return MsgLevel.Warning; //if (i <= 10) return MsgLevel.Error; return MsgLevel.Error; } private enum MsgLevel { Debug = 0, Info = 1, Warning = 2, Error = 3, } } }
程序中,声明了交换机,声明的队列,然后将他们绑定起来,绑定的时候,使用了 RoutingKey。
下面是交换机列表,和我们声明的交换机 “exchangeOne” :
下面是该交换机绑定的队列:
交换机 “exchangeOne” 下面,有 2 个路径(Routing),RoutingKey 分别是 “Info” 和 “Debug”,“Info” 路径下绑定了 3 个队列,“Debug” 路径下绑定了 2 个队列。
从上图可以看出,这并不符合一开始模型图的要求。但是方法已经确定了,就是队列通过 RoutingKey 绑定到交换机,只要在代码中,把已有的队列再绑定一次,比如:
channel.QueueBind( queue: queueName, exchange: "exchangeOne", routingKey: "Info", arguments: null);
已经绑定过 “Debug” Routing 的队列,再次绑定一次 “Info” Routing,就实现了一个队列即接收 Debug 消息,又接收 Info 消息。
小结: 1,队列可以多次绑定到不同的 RoutingKey 2,队列绑定到重复的 RoutingKey 不会报错,相当于字典重复添加已有的话就不做动作就好了。
二、消费者(Consumer)
写到消费者的时候发现,消费的时候,需要指定消费哪个具体的队列吧,但是队列是生产者创建的,消费者不知道名字,那消费个啥。
所以生产者的代码要改一下,不声明、绑定队列,只往指定交换机、指定路由里发布消息:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace rabbitmq { public class Producer { public Producer() { //1,创建工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建 连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,创建一个交换机 channel.ExchangeDeclare( exchange: "exchangeOne", type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null ); //6,发布消息到队列 for (int i = 0; i < 5; i++) { string msg = $"日志内容:{i}"; var body = Encoding.UTF8.GetBytes(msg); string routingKey = RandomMsgLevel().ToString(); //创建队列并获取队列名 //string queueName = channel.QueueDeclare().QueueName; //把队列绑定到交换机(routingKey表示绑定在哪个路径下) //channel.QueueBind( // queue: queueName, // exchange: "exchangeOne", // routingKey:routingKey, // arguments:null // ); channel.BasicPublish( exchange: "exchangeOne", routingKey: routingKey, basicProperties:null, body:body); if(routingKey != "Info") { channel.BasicPublish( exchange: "exchangeOne", routingKey: "Info", basicProperties:null, body:body); } Console.WriteLine($"{i} 已发布到 {routingKey} 和 Info"); } Console.WriteLine(""); } } }
上面代码中,每一份消息,都发布到了 “Info” 路由里,满足了开篇模型图的要求。
消费者自己带着队列来到系统里,订阅对应的路由。交换机就会把生产者新发布的消息送到绑定(订阅)的队列里。代码如下:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace rabbitmq { public class Producer { public Consumer() { //1,创建工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建 连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,声明交换机 channel.ExchangeDeclare( exchange: "exchangeOne", type: ExchangeType.Direct ); //5,声明队列 string queueName = channel.QueueDeclare().QueueName; //6,声明要消费的队列 channel.QueueBind( queue: queueName, exchange: "exchangeOne", routingKey: "Info"); //7,创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { var body = e.Body.ToArray(); string msg = Encoding.UTF8.GetString(body); Console.WriteLine($"接收到消息:{msg}"); Thread.Sleep(5000); channel.BasicAck(e.DeliveryTag, false); }; //8,把消费者挂到通道中开始消费 channel.BasicConsume( queue:queueName, autoAck:false, consumer:consumer); } } }
这里的消费者,指定消费 “Info” 路由里的东西,所以实际的执行结果如下:
(发布到 Info和 Info 是打日志的效果,实际只发布一次到 Info,见生产者代码)
————————–
以上就是 Routing 模式的内容,总结一下,有以下几个点:
1,生产者和消费者都能声明交换机(exchange)、路由(RoutingKey)、队列(Queue),只要相互匹配上,消费者就能消费队列里的数据。
2,“谁声明谁负责” 原则,虽然可以重复声明,但其实只有第一次声明有效且归属权属于第一次声明者。比如队列,只能由第一次声明的对象进行释放,这个在 Simplest queue 模式 中写过。
3,“RoutingKey” 对 “Direct” 交换机有效,对 “fanout” 交换机无效。所以 “fanout” 交换机的 “RoutingKey” 参数值经常为空字符串,即便写了也没啥用。
三、疑问
1,刚开始的生产者,自己创建队列的代码,程序执行后,factory、connection、channel 对象都保持不释放(断点打在 Console.WriteLine()),但是过一段时间(3 分钟左右),队列就会消失,连同里面的消息一起。像是被 RabbitMQ 给清除掉了,不明白为啥会这样。