最近學(xué)習RabbitMQ的使用方式,記錄下來,方便以后使用,也方便和大家共享,相互交流。
RabbitMQ的六種工作模式:
1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC
一、Work queues
多個消費端消費同一個隊列中的消息,隊列采用輪詢的方式將消息是平均發(fā)送給消費者;
特點:
1、一條消息只會被一個消費端接收;
2、隊列采用輪詢的方式將消息是平均發(fā)送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息
生產(chǎn)端:
1、聲明隊列
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明隊列
5、制定消息
6、發(fā)送消息,使用默認交換機
消費端:
1、聲明隊列
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明隊列
5、重寫消息消費方法
6、執(zhí)行消息方法
新建兩個maven工程,生產(chǎn)消息的生產(chǎn)端,消費消息的消費端;
pom.xml文件中依賴坐標如下:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.0</version> </dependency> </dependencies>
生產(chǎn)端的代碼如下:
package com.xyfer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /* 1、聲明隊列 2、創(chuàng)建連接 3、創(chuàng)建通道 4、通道聲明隊列 5、制定消息 6、發(fā)送消息,使用默認交換機 */ public class Producer02 { //聲明隊列 private static final String QUEUE ="queue"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址 connectionFactory.setPort(5672);//mq client連接端口 connectionFactory.setUsername("guest");//mq登錄用戶名 connectionFactory.setPassword("guest");//mq登錄密碼 connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務(wù)器 //創(chuàng)建與RabbitMQ服務(wù)的TCP連接 connection = connectionFactory.newConnection(); //創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù) channel = connection.createChannel(); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish("",QUEUE,null,message.getBytes("utf-8")); System.out.println("mq消息發(fā)送成功!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消費端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /* 1、聲明隊列 2、創(chuàng)建連接 3、創(chuàng)建通道 4、通道聲明隊列 5、重寫消息消費方法 6、執(zhí)行消息方法 */ public class Consumer02 { private static final String QUEUE ="queue"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列 //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生產(chǎn)端啟動后,控制臺打印信息如下:
RabbitMQ中的已有消息:
queue中的消息正是生產(chǎn)端發(fā)送的消息:
二、Publish/subscribe 模式
這種模式又稱為發(fā)布訂閱模式,相對于Work queues模式,該模式多了一個交換機,生產(chǎn)端先把消息發(fā)送到交換機,再由交換機把消息發(fā)送到綁定的隊列中,每個綁定的隊列都能收到由生產(chǎn)端發(fā)送的消息。
發(fā)布訂閱模式:
1、每個消費者監(jiān)聽自己的隊列;
2、生產(chǎn)者將消息發(fā)給broker,由交換機將消息轉(zhuǎn)發(fā)到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息
應(yīng)用場景:用戶通知,當用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;
生產(chǎn)端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
6、通過通道使隊列綁定到交換機
7、制定消息
8、發(fā)送消息
消費端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
6、通過通道使隊列綁定到交換機
7、重寫消息消費方法
8、執(zhí)行消息方法
Publish/subscribe 模式綁定兩個消費端,因此需要有兩個消費端,一個郵件消費端,一個短信消費端;
生產(chǎn)端的代碼如下:
package com.xyfer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer01 { //聲明兩個隊列和一個交換機 //Publish/subscribe發(fā)布訂閱模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址 connectionFactory.setPort(5672);//mq client連接端口 connectionFactory.setUsername("guest");//mq登錄用戶名 connectionFactory.setPassword("guest");//mq登錄密碼 connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務(wù)器 //創(chuàng)建與RabbitMQ服務(wù)的TCP連接 connection = connectionFactory.newConnection(); //創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù) channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Publish/subscribe發(fā)布訂閱模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Publish/subscribe發(fā)布訂閱模式 channel.queueBind(QUEUE_EMAIL,EXCHANGE,""); channel.queueBind(QUEUE_SMS,EXCHANGE,""); for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body //Publish/subscribe發(fā)布訂閱模式 channel.basicPublish(EXCHANGE,"",null,message.getBytes()); System.out.println("mq消息發(fā)送成功!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
郵件消費端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer01 { //Publish/subscribe發(fā)布訂閱模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Publish/subscribe發(fā)布訂閱模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Publish/subscribe發(fā)布訂閱模式 channel.queueBind(QUEUE_EMAIL,EXCHANGE,""); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_EMAIL,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
短信消費端的代碼如下:
package xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer01 { //Publish/subscribe發(fā)布訂閱模式 private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Publish/subscribe發(fā)布訂閱模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Publish/subscribe發(fā)布訂閱模式 channel.queueBind(QUEUE_SMS,EXCHANGE,""); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_SMS,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
三、Routing 路由模式
Routing 模式又稱路由模式,該種模式除了要綁定交換機外,發(fā)消息的時候還要制定routing key,即路由key,隊列通過通道綁定交換機的時候,需要指定自己的routing key,這樣,生產(chǎn)端發(fā)送消息的時候也會指定routing key,通過routing key就可以把相應(yīng)的消息發(fā)送到綁定相應(yīng)routing key的隊列中去。
路由模式:
1、每個消費者監(jiān)聽自己的隊列,并且設(shè)置routingkey;
2、生產(chǎn)者將消息發(fā)給交換機,由交換機根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊列;
應(yīng)用場景:用戶通知,當用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;
生產(chǎn)端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
, 【聲音】【量天】【矗立】【能量】,【方的】【戰(zhàn)場】【紫真】【又不】,【飄散】【擊螞】【當下】【尊大】【斷了】.【里面】【骨下】【暢沒】【擊中】【作勢】,【新派】【神族】【是一】【活意】,【行設(shè)】【有黑】【非常】【域里】【以形】!【案發(fā)】【歸入】【間都】【血河】【音似】【到?jīng)]】,【微微】【毒蛤】【脫了】【這尊】,【掉了】【已經(jīng)】【凜然】【筑前】【在左】,【一望】【人真】【眼的】.【的陰】【戰(zhàn)斗】【是一】【鎖區(qū)】,【好歹】【展鯤】【難性】【掉這】,【噬整】【可以】【真的】【白象】.【士卒】!【覺要】【雨般】【體積】【里卻】【生命】【個黑】【神強】.【只有】,6、通過通道使隊列綁定到交換機并指定該隊列的routingkey
7、制定消息
8、發(fā)送消息并指定routingkey
消費端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
6、通過通道使隊列綁定到交換機并指定routingkey
7、重寫消息消費方法
8、執(zhí)行消息方法
按照假設(shè)的應(yīng)用場景,同樣,Routing 路由模式也是一個生產(chǎn)端,兩個消費端,所不同的是,聲明交換機的類型不同,隊列綁定交換機的時候需要指定Routing key,發(fā)送消息的時候也需要指定Routing key,這樣根據(jù)Routing key就能把相應(yīng)的消息發(fā)送到相應(yīng)的隊列中去。
生產(chǎn)端的代碼如下:
package com.xyfer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer03 { //聲明兩個隊列和一個交換機 //Routing 路由模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址 connectionFactory.setPort(5672);//mq client連接端口 connectionFactory.setUsername("guest");//mq登錄用戶名 connectionFactory.setPassword("guest");//mq登錄密碼 connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務(wù)器 //創(chuàng)建與RabbitMQ服務(wù)的TCP連接 connection = connectionFactory.newConnection(); //創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù) channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Routing 路由模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Routing 路由模式 channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL); channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS); //給email隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送email消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body //Routing 路由模式 channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes()); System.out.println("mq消息發(fā)送成功!"); } //給sms隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送sms消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body //Routing 路由模式 channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes()); System.out.println("mq消息發(fā)送成功!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
郵件消費端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer03 { //Routing 路由模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Routing 路由模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Routing 路由模式 channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_EMAIL,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
短信消費端的代碼如下:
package xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer03 { //Routing 路由模式 private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Routing 路由模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ //Routing 路由模式 channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_SMS,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
四、Topics 模式
Topics 模式和Routing 路由模式最大的區(qū)別就是,Topics 模式發(fā)送消息和消費消息的時候是通過通配符去進行匹配的。
路由模式:
1、每個消費者監(jiān)聽自己的隊列,并且設(shè)置帶統(tǒng)配符的routingkey
2、生產(chǎn)者將消息發(fā)給broker,由交換機根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊列
應(yīng)用場景:用戶通知,當用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;
生產(chǎn)端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
6、通過通道使隊列綁定到交換機并指定該隊列的routingkey(通配符)
7、制定消息
8、發(fā)送消息并指定routingkey(通配符)
消費端:
1、聲明隊列,聲明交換機
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明交換機
5、通道聲明隊列
6、通過通道使隊列綁定到交換機并指定routingkey(通配符)
7、重寫消息消費方法
8、執(zhí)行消息方法
按照假設(shè)的應(yīng)用場景,Topics 模式也是一個生產(chǎn)端,兩個消費端,生產(chǎn)端隊列綁定交換機的時候,需要指定的routingkey是通配符,發(fā)送消息的時候綁定的routingkey也是通配符,消費端隊列綁定交換機的時候routingkey也是通配符,這樣就能根據(jù)通配符匹配到消息了。
生產(chǎn)端的代碼如下:
package com.xyfer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer04 { //聲明兩個隊列和一個交換機 //Topics 模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址 connectionFactory.setPort(5672);//mq client連接端口 connectionFactory.setUsername("guest");//mq登錄用戶名 connectionFactory.setPassword("guest");//mq登錄密碼 connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務(wù)器 //創(chuàng)建與RabbitMQ服務(wù)的TCP連接 connection = connectionFactory.newConnection(); //創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù) channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Topics 模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#"); channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#"); //給email隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送email消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes()); System.out.println("mq email 消息發(fā)送成功!"); } //給sms隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送sms消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes()); System.out.println("mq sms 消息發(fā)送成功!"); } //給email和sms隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送email sms消息。。。"); /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes()); System.out.println("mq email sms 消息發(fā)送成功!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
郵件消費端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer04 { private static final String QUEUE_EMAIL ="queueEmail"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#"); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_EMAIL,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
短信消費端的代碼如下:
package xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer04 { private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key */ channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#"); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_SMS,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
由于生產(chǎn)端同時發(fā)送了email的消息(10條),sms的消息(10條),email和sms同時收到的消息(10條),所以每個消費端都應(yīng)收到各自的10條消息,加上同時都能收到的10條消息,每個消費端應(yīng)該收到20條消息;
生產(chǎn)端控制臺打印:
郵件消費端控制臺打?。?/p>
短信消費端的控制臺打印:
生產(chǎn)端執(zhí)行后,RabbitMQ上的消息隊列情況:
兩個消費端執(zhí)行完后,RabbitMQ上的消息隊列情況:
五、Header 模式
header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。
案例:
根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種通知類型都接收的則兩種通知都有效。
根據(jù)假設(shè)使用場景,需要一個生產(chǎn)端,兩個消費端,不同的是,生產(chǎn)端聲明交換機時,交換機的類型不同,是headers類型,生產(chǎn)端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,發(fā)送消息時也是使用header中的 key/value(鍵值對)匹配隊列。
消費端同樣是聲明交換機時,交換機的類型不同,是headers類型,消費端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,消費消息時也是使用header中的 key/value(鍵值對)匹配隊列。
生產(chǎn)端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.TimeoutException; public class Producer05 { //聲明兩個隊列和一個交換機 //Header 模式 private static final String QUEUE_EMAIL ="queueEmail"; private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址 connectionFactory.setPort(5672);//mq client連接端口 connectionFactory.setUsername("guest");//mq登錄用戶名 connectionFactory.setPassword("guest");//mq登錄密碼 connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務(wù)器 //創(chuàng)建與RabbitMQ服務(wù)的TCP連接 connection = connectionFactory.newConnection(); //創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù) channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ //Header 模式 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key * 4、 * String queue, String exchange, String routingKey, Map<String, Object> arguments */ Map<String,Object> headers_email = new Hashtable<String,Object>(); headers_email.put("inform_type","email"); Map<String,Object> headers_sms = new Hashtable<String, Object>(); headers_sms.put("inform_type","sms"); channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email); channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms); //給email隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送email消息。。。"); Map<String,Object> headers = new Hashtable<String,Object>(); headers.put("inform_type","email");//匹配email通知消費者綁定的header /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //Email通知 channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes()); System.out.println("mq email 消息發(fā)送成功!"); } //給sms隊列發(fā)消息 for(int i = 0;i<10;i++){ String message = new String("mq 發(fā)送sms消息。。。"); Map<String,Object> headers = new Hashtable<String,Object>(); headers.put("inform_type","sms");//匹配sms通知消費者綁定的header /** * 消息發(fā)布方法 * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列 * param3:消息包含的屬性 * param4:消息體 * 這里沒有指定交換機,消息將發(fā)送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定 * 默認的交換機,routingKey等于隊列名稱 */ //String exchange, String routingKey, BasicProperties props, byte[] body AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); properties.headers(headers); //sms通知 channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes()); System.out.println("mq sms 消息發(fā)送成功!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
郵件消費端的代碼如下:
package com.xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.TimeoutException; public class Consumer05 { private static final String QUEUE_EMAIL ="queueEmail"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key * 4、 * String queue, String exchange, String routingKey, Map<String, Object> arguments */ Map<String,Object> headers_email = new Hashtable<String,Object>(); headers_email.put("inform_email","email"); channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_EMAIL,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
短信消費端的代碼如下:
package xyfer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Hashtable; import java.util.Map; import java.util.concurrent.TimeoutException; public class Consumer05 { private static final String QUEUE_SMS ="queueSms"; private static final String EXCHANGE = "messageChange"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //通道綁定交換機 /** * 參數(shù)明細 * 1、交換機名稱 * 2、交換機類型,fanout、topic、direct、headers */ channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS); //通道綁定隊列 /** * 聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建 * param1:隊列名稱 * param2:是否持久化 * param3:隊列是否獨占此連接 * param4:隊列不再使用時是否自動刪除此隊列 * param5:隊列參數(shù) * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments * */ channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊列 //交換機和隊列綁定 /** * 參數(shù)明細 * 1、隊列名稱 * 2、交換機名稱 * 3、路由key * 4、 * String queue, String exchange, String routingKey, Map<String, Object> arguments */ Map<String,Object> headers_email = new Hashtable<String,Object>(); headers_email.put("inform_email","sms"); channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email); //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body DefaultConsumer consumer = new DefaultConsumer(channel) { /** * 消費者接收消息調(diào)用此方法 * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定 * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志 (收到消息失敗后是否需要重新發(fā)送) * @param properties * @param body * @throws IOException * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //交換機 String exchange = envelope.getExchange(); //路由key String routingKey = envelope.getRoutingKey(); envelope.getDeliveryTag(); String msg = new String(body,"utf-8"); System.out.println("mq收到的消息是:"+msg ); } }; System.out.println("消費者啟動成功!"); channel.basicConsume(QUEUE_SMS,true,consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生產(chǎn)端啟動后RabbitMQ上面的消息隊列情況:
六、RPC 模式
RPC即客戶端遠程調(diào)用服務(wù)端的方法 ,使用MQ可以實現(xiàn)RPC的異步調(diào)用,基于Direct交換機實現(xiàn),流程如下:
1、客戶端即是生產(chǎn)者也是消費者,向RPC請求隊列發(fā)送RPC調(diào)用消息,同時監(jiān)聽RPC響應(yīng)隊列。
2、服務(wù)端監(jiān)聽RPC請求隊列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果。
3、服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊列。
4、客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊列,接收到RPC調(diào)用結(jié)果。
至此,RabbitMQ的六種工作模式已經(jīng)介紹完畢,手動代碼實現(xiàn),實際體驗六種工作模式的不同。
|轉(zhuǎn)載請注明來源地址:蜘蛛池出租 http://www.wholesalehouseflipping.com/
專注于SEO培訓(xùn),快速排名黑帽SEO https://www.heimao.wiki
