博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列(五):Routing 消息路由 2[原]
阅读量:6075 次
发布时间:2019-06-20

本文共 9558 字,大约阅读时间需要 31 分钟。

上一篇文章使用的是Direct的Exchange,但是没有指定Queue的名字,这样只能是先运行Consumer之后,Producer在运行发消息Consumer才能收到,否则先运行Producer发送消息,在运行Consumer是收不到之前Producer发送的消息,因为Queue的名字像是这样的:amq.gen-X-XSTaseUmil42zrawBVsw都是临时,如果Consumer关闭之后,这个Queue就会自动被RabbitMQ删掉。

如果想创建可以先执行Producer的Direct的Exchnage呢?因为在实际工作中我们可能需要发送端有消息就会一直发给接收端,不管接收端是否已经运行。如果我们需要指定名称的Queue,并且使用Direct的Exchange方式,我们需要使用Binding的方式。上一篇和第一篇文章中都解释了绑定的含义:绑定其实就是关联了exchange和queue。

多个routing key指定同一个queue,不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行接收,每次接收一个消息。例子:

Producer.cs

1 ///  2         /// 多个routing key指定同一个queue 3         /// 指定Queue的名称,好处就是可以持久化Queue 4         ///  5         ///  6         /// SendDemo51.exe direct_custom_routing_key_hello1 7         /// SendDemo51.exe direct_custom_routing_key_hello2 8         /// 不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行每次接收一个消息。 9         /// 10         static void Main(string[] args)11         {12             if (args.Length < 1)13             {14                 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);15                 Environment.ExitCode = 1;16                 return;17             }18             var factory = new ConnectionFactory() { HostName = "localhost" };19             using (var connection = factory.CreateConnection())20             {21                 using (var channel = connection.CreateModel())22                 {23                     const string EXCHANGE_NAME = "direct_logs";24                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。25 26                     const string QUEUE_NAME = "direct_same_queue_name_hello";//使用我们自己指定Queue的名称27                     bool durable = true;28                     channel.QueueDeclare(QUEUE_NAME, durable, false, false, null);29 30                     var routingKey = args[0];//指定Routing Key的名称31                     channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称、不同的RoutingKey名称使用Direct的Exchange方式进行关联32 33                     var message = "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");34 35                     var body = Encoding.UTF8.GetBytes(message);36 37                     var properties = channel.CreateBasicProperties();38                     properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties39 40                     channel.BasicPublish(EXCHANGE_NAME, routingKey, properties, body);41                     Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);42 43                     Console.Read();44                 }45             }46         }
Producer.cs

Consumer.cs

1 ///  2     /// 多个routing key指定同一个queue 3     /// 指定Queue的名称,好处就是可以持久化Queue 4     /// ReceiveDemo51.exe direct_custom_routing_key_hello1 5     /// ReceiveDemo51.exe direct_custom_routing_key_hello2 6     /// 不管如何指定routing key的名字,发送端发送一次信息,接收端按启动顺序循环执行每次接收一个消息。 7     ///  8     class Program 9     {10         static void Main(string[] args)11         {12             if (args.Length < 1)13             {14                 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);15                 Environment.ExitCode = 1;16                 return;17             }18             var factory = new ConnectionFactory() { HostName = "localhost" };19             using (var connection = factory.CreateConnection())20             {21                 using (var channel = connection.CreateModel())22                 {23                     const string EXCHANGE_NAME = "direct_logs";24                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");25 26                     const string QUEUE_NAME = "direct_same_queue_name_hello";//使用我们自己指定Queue的名称27                     bool durable = true;28                     channel.QueueDeclare(QUEUE_NAME, durable, false, false, null);29 30                     string routingKey = args[0];//指定Routing Key的名称31                     channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);//通过绑定将指定的Queue名称、不同的RoutingKey名称使用Direct的Exchange方式进行关联32                     33                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");34 35                     var consumer = new QueueingBasicConsumer(channel);36                     channel.BasicConsume(QUEUE_NAME, true, consumer);37 38                     while (true)39                     {40                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();41 42                         var body = ea.Body;43                         var message = Encoding.UTF8.GetString(body);44                         var routingkey = ea.RoutingKey;45                         Console.WriteLine(" [x] Received '{0}':'{1}'", routingkey, message);46                     }47                 }48             }49         }50     }
Consumer.cs

 多个Queue关联同一个routing key,多个queue通过routing key可以同时得到发送的内容,例子如下:

Producer.cs 

1 ///  2         /// 多个Queue关联一个routing key,通过routingkey可以拿到多个queue里面的内容; 3         /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息, 4         /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。 5         /// SendDemo52.exe direct_custom_queue_name_hello1 6         /// SendDemo52.exe direct_custom_queue_name_hello2 7         /// 发送端发送一次信息,多个接收端同时接收到消息 8         ///  9         /// 10         static void Main(string[] args)11         {12             if (args.Length < 1)13             {14                 Console.Error.WriteLine("请指定一个新的Queue名称", Environment.GetCommandLineArgs()[0]);15                 Environment.ExitCode = 1;16                 return;17             }18             var factory = new ConnectionFactory() { HostName = "localhost" };19             using (var connection = factory.CreateConnection())20             {21                 using (var channel = connection.CreateModel())22                 {23                     const string EXCHANGE_NAME = "direct_logs";24                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。25 26                     string queueName = args[0];//得到我们自己指定Queue的名称27                     channel.QueueDeclare(queueName, true, false, false, null);28 29                     const string ROUTING_KEY = "direct_same_routing_key";30                     channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//通过绑定将不同的Queue名称、相同的Routing Key名称采用Direct的Exchange方式进行关联31 32                     var message = "Hello World! " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");33 34                     var body = Encoding.UTF8.GetBytes(message);35 36                     var properties = channel.CreateBasicProperties();37                     properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties38 39                     channel.BasicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, body);40                     Console.WriteLine(" [x] Sent '{0}':'{1}'", ROUTING_KEY, message);41 42                     Console.Read();43                 }44             }45         }
Producer.cs

Consumer.cs

1 ///  2     /// 多个Queue关联一个routing key,通过routingkey可以拿到多个queue里面的内容; 3     /// 如果得到消息的Queue不是指定名称的Queue(此时这个例子是通过routing key得到消息),那么它是不会自动从Queue中删除接收到的消息, 4     /// 只有是指定名称的Queue收到消息之后才会把Queue中的消息删除。 5     /// ReceiveDemo52.exe direct_custom_queue_name_hello1 6     /// ReceiveDemo52.exe direct_custom_queue_name_hello2 7     /// 发送端发送一次信息,多个接收端同时接收到消息 8     ///  9     class Program10     {11         static void Main(string[] args)12         {13             if (args.Length < 1)14             {15                 Console.Error.WriteLine("请指定一个新的Queue名称", Environment.GetCommandLineArgs()[0]);16                 Environment.ExitCode = 1;17                 return;18             }19             var factory = new ConnectionFactory() { HostName = "localhost"  };20             using (var connection = factory.CreateConnection())21             {22                 using (var channel = connection.CreateModel())23                 {24                     const string EXCHANGE_NAME = "direct_logs";25                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");26                     27                     string queueName = args[0];//得到我们自己指定Queue的名称28                     channel.QueueDeclare(queueName, true, false, false, null);29 30                     const string ROUTING_KEY = "direct_same_routing_key";31                     channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//通过绑定将不同的Queue名称、相同的Routing Key名称采用Direct的Exchange方式进行关联32 33                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");34 35                     var consumer = new QueueingBasicConsumer(channel);36                     channel.BasicConsume(queueName, true, consumer);37 38                     while (true)39                     {40                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();41 42                         var body = ea.Body;43                         var message = Encoding.UTF8.GetString(body);44                         string routingKeyRe = ea.RoutingKey;45                         Console.WriteLine(" [x] Received '{0}':'{1}'", routingKeyRe, message);46                     }47                 }48             }49         }50     }
Consumer.cs

特别注意:

本例子是支持多个Queue名称同时发送和接收的,Consumer通过routing key可以拿到属于这个routing key里面的多个queue的内容;

如果Consumer一旦接收到消息,分两种情况:

1、接收的Consumer不是指定名称的Queue(此时这个Consumer是通过routing key得到Producer发送的消息),那么这个Consumer是不会自动从Queue中删除接收到的消息;

2、接收的Consumer是指定名称的Queue,那么这个Consumer是会自动从Queue中删除接收到的消息。

总结:只有当指定名称的Queue收到消息之后才会把Queue中的这条消息删除。

 

转载地址:http://hyxgx.baihongyu.com/

你可能感兴趣的文章
tomcat多应用之间如何共享jar
查看>>
Flex前后台交互,service层调用后台服务的简单封装
查看>>
MySQL入门12-数据类型
查看>>
Windows Azure 保留已存在的虚拟网络外网IP(云服务)
查看>>
修改字符集
查看>>
HackTheGame 攻略 - 第四关
查看>>
js删除数组元素
查看>>
带空格文件名的处理(find xargs grep ..etc)
查看>>
华为Access、Hybrid和Trunk的区别和设置
查看>>
centos使用docker下安装mysql并配置、nginx
查看>>
关于HTML5的理解
查看>>
需要学的东西
查看>>
Internet Message Access Protocol --- IMAP协议
查看>>
Linux 获取文件夹下的所有文件
查看>>
对 Sea.js 进行配置(一) seajs.config
查看>>
第六周
查看>>
解释一下 P/NP/NP-Complete/NP-Hard 等问题
查看>>
javafx for android or ios ?
查看>>
微软职位内部推荐-Senior Software Engineer II-Sharepoint
查看>>
sql 字符串操作
查看>>