0%

最近公司需要使用RabbitMQ,但我之前一直使用的是ActiveMQ,对RabbitMQ进行了初步的学习,但是还不系统,自己做了一些小测试,怕自己以后忘了

一. 背景

  拿到代码以后,发现,生产者在向外发送消息时,指定了exchange(交换机)和routing key,但是没有指定queue(队列)也没有将queue(队列)绑定到exchange,刚开始因为不熟悉rabbitMQ,所有不知道怎么回事,后来知道了:消费者在消费消息时,需要声明队列(队列名字随便),并将声明的队列通过routing key绑定到exchange,这样才能接收到数据,因此,生产者方需要将exchange和routing key实现告知消费者方。

二. 代码实例:生产者方指定了exchange(交换机)和routing key,但是不指定queue(队列)也不将queue(队列)绑定到exchange,队列声明和绑定队列到exchange的工作由消费者方完成

  1. 生产者方

    ① 生产者方代码

复制代码

1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5
6 public class Producer { 7 private final static String QUEUE_NAME = “QUEUE8”;
8
9 public static void main(String[] args) throws IOException { 10
11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost(“localhost”); 13 factory.setPort(5672); 14 factory.setUsername(“guest”); 15 factory.setPassword(“guest”); 16
17 Connection connection = factory.newConnection(); 18 Channel channel = connection.createChannel(); 19
20 String message = “Hello World!”; 21
22 // 指定exchange和routing key,并发送消息到exchange
23 channel.basicPublish(“FILETOPIC”, “KEY.FILE”, null, message.getBytes()); 24 System.out.println(“ [x] Sent ‘“ + message + “‘“); 25
26 channel.close(); 27 connection.close(); 28 } 29 }

复制代码

    ② 生产者方代码运行后,可在rabbiteMQ managerment 管理界面看到相应exchange,如下图所示:

-————————————————————————————————————————————————————————————–

-————————————————————————————————————————————————————————————-

  2. 消费者方:消费者声明队列,队列名称随便起,并将该队列通过生产者指定的routing key绑定的其指定的exchange上

    ① 消费者方代码

复制代码

1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5
6 public class Reqv { 7
8 private final static String QUEUE_NAME = “QUEUE8”;
9
10 public static void main(String[] argv) throws Exception { 11
12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setPort(5672); 14 factory.setUsername(“guest”); 15 factory.setPassword(“guest”); 16 factory.setHost(“localhost”); 17
18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20
21 // 声明队列
22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 System.out.println(“ [*] Waiting for messages. To exit press CTRL+C”); 24
25 // 绑定队列到交换机
26 channel.queueBind(QUEUE_NAME, “FILETOPIC”, “KEY.FILE”); 27
28 QueueingConsumer consumer = new QueueingConsumer(channel); 29 channel.basicConsume(QUEUE_NAME, true, consumer); 30 while(true){ 31 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 32 String message = new String(delivery.getBody(),”UTF-8”); 33 System.out.println(“ 【[x] Received 】:” + message); 34 } 35 } 36 }

复制代码

    ② 运行效果,消费者方代码运行后,在rabbiteMQ managerment 管理界面可以看到声明的队列,并发现该队列已经绑定到了生产者指定的exchange上

-————————————————————————————————————————————————————–

-——————————————————————————————————————————————————————-

 

-————————————————————————————————————————————————————–

 三. 代码实例:生产者指定exchange和routing key,声明队列并将队列绑定到exchange,消费者只需从生产者绑定的队列消费即可

  1. 生产者

    ① 生产者代码

复制代码

1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5
6 public class Producer { 7 private final static String QUEUE_NAME = “QUEUE1”;
8
9 public static void main(String[] args) throws IOException { 10
11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost(“localhost”); 13 factory.setPort(5672); 14 factory.setUsername(“guest”); 15 factory.setPassword(“guest”); 16
17 Connection connection = factory.newConnection(); 18 Channel channel = connection.createChannel(); 19
20 String message = “Hello World!”; 21
22 // 声明队列
23 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 24
25 // 绑定队列到交换机
26 channel.queueBind(QUEUE_NAME, “FILETOPIC”, “KEY.FILE”); 27
28 // 指定exchange和routing key,并发送消息到exchange
29 channel.basicPublish(“FILETOPIC”, “KEY.FILE”, null, message.getBytes()); 30 System.out.println(“ [x] Sent ‘“ + message + “‘“); 31
32 channel.close(); 33 connection.close(); 34 } 35 }

复制代码

    ② 生产者运行效果:生产者代码运行后,在rabbiteMQ managerment 管理界面可以看到生产者声明的队列,并发现该队列已经绑定到了生产者指定的exchange上,如下图所示:

  2. 消费者

    ① 消费者代码:消费者现在直接消费队列中的消息即可,既不用声明队列,也不用绑定

复制代码

1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5
6 public class Reqv { 7
8 private final static String QUEUE_NAME = “QUEUE1”;
9
10 public static void main(String[] argv) throws Exception { 11
12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setPort(5672); 14 factory.setUsername(“guest”); 15 factory.setPassword(“guest”); 16 factory.setHost(“localhost”); 17
18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20
21 QueueingConsumer consumer = new QueueingConsumer(channel); 22 channel.basicConsume(QUEUE_NAME, true, consumer); 23 while(true){ 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody(),”UTF-8”); 26 System.out.println(“ 【[x] Received 】:” + message); 27 } 28 } 29 }

复制代码

    ② 消费者运行效果:在rabbiteMQ managerment 管理界面可以看到,生产者指定的队列已经有了消费者