蜘蛛池出租蜘蛛池出租

12年網(wǎng)站蜘蛛池出租收錄技術(shù)學(xué)習(xí)博客

福建最全黑帽seo程序代碼合集:RabbitMQ六種工作模式的對(duì)比與實(shí)踐_黑帽SEO排名

:C++ 變量判定的螺旋法則

最近學(xué)習(xí)RabbitMQ的使用方式,記錄下來,方便以后使用,也方便和大家共享,相互交流。

RabbitMQ的六種工作模式:

1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC

一、Work queues

多個(gè)消費(fèi)端消費(fèi)同一個(gè)隊(duì)列中的消息,隊(duì)列采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者;

 

 特點(diǎn):

1、一條消息只會(huì)被一個(gè)消費(fèi)端接收;

2、隊(duì)列采用輪詢的方式將消息是平均發(fā)送給消費(fèi)者的;

3、消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息

生產(chǎn)端:

1、聲明隊(duì)列

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明隊(duì)列

5、制定消息

6、發(fā)送消息,使用默認(rèn)交換機(jī)

消費(fèi)端:

1、聲明隊(duì)列

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明隊(duì)列

5、重寫消息消費(fèi)方法

6、執(zhí)行消息方法

新建兩個(gè)maven工程,生產(chǎn)消息的生產(chǎn)端,消費(fèi)消息的消費(fèi)端;

pom.xml文件中依賴坐標(biāo)如下:

<dependencies>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies>

 生產(chǎn)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊(duì)列
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明隊(duì)列
5、制定消息
6、發(fā)送消息,使用默認(rèn)交換機(jī)
*/
public class Producer02 {
    //聲明隊(duì)列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)器
            //創(chuàng)建與RabbitMQ服務(wù)的TCP連接
            connection = connectionFactory.newConnection();
            //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
            channel = connection.createChannel();

            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊(duì)列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發(fā)送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費(fèi)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊(duì)列
2、創(chuàng)建連接
3、創(chuàng)建通道
4、通道聲明隊(duì)列
5、重寫消息消費(fèi)方法
6、執(zhí)行消息方法
*/
public class Consumer02 {
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊(duì)列

            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生產(chǎn)端啟動(dòng)后,控制臺(tái)打印信息如下:

 RabbitMQ中的已有消息:

 queue中的消息正是生產(chǎn)端發(fā)送的消息:

 二、Publish/subscribe 模式

這種模式又稱為發(fā)布訂閱模式,相對(duì)于Work queues模式,該模式多了一個(gè)交換機(jī),生產(chǎn)端先把消息發(fā)送到交換機(jī),再由交換機(jī)把消息發(fā)送到綁定的隊(duì)列中,每個(gè)綁定的隊(duì)列都能收到由生產(chǎn)端發(fā)送的消息。

發(fā)布訂閱模式:

1、每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列;

2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收
到消息

應(yīng)用場(chǎng)景:用戶通知,當(dāng)用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;

生產(chǎn)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

6、通過通道使隊(duì)列綁定到交換機(jī)

7、制定消息

8、發(fā)送消息

消費(fèi)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

6、通過通道使隊(duì)列綁定到交換機(jī)

7、重寫消息消費(fèi)方法

8、執(zhí)行消息方法

Publish/subscribe 模式綁定兩個(gè)消費(fèi)端,因此需要有兩個(gè)消費(fèi)端,一個(gè)郵件消費(fèi)端,一個(gè)短信消費(fèi)端;

生產(chǎn)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //聲明兩個(gè)隊(duì)列和一個(gè)交換機(jī)
    //Publish/subscribe發(fā)布訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)器
            //創(chuàng)建與RabbitMQ服務(wù)的TCP連接
            connection = connectionFactory.newConnection();
            //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發(fā)布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Publish/subscribe發(fā)布訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Publish/subscribe發(fā)布訂閱模式
                channel.basicPublish(EXCHANGE,"",null,message.getBytes());
                System.out.println("mq消息發(fā)送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費(fèi)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發(fā)布訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發(fā)布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Publish/subscribe發(fā)布訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費(fèi)者接收消息調(diào)用此方法
              * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                (收到消息失敗后是否需要重新發(fā)送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }
            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費(fèi)端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發(fā)布訂閱模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發(fā)布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Publish/subscribe發(fā)布訂閱模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費(fèi)者接收消息調(diào)用此方法
              * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                (收到消息失敗后是否需要重新發(fā)送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機(jī)
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

三、Routing 路由模式

Routing 模式又稱路由模式,該種模式除了要綁定交換機(jī)外,發(fā)消息的時(shí)候還要制定routing key,即路由key,隊(duì)列通過通道綁定交換機(jī)的時(shí)候,需要指定自己的routing key,這樣,生產(chǎn)端發(fā)送消息的時(shí)候也會(huì)指定routing key,通過routing key就可以把相應(yīng)的消息發(fā)送到綁定相應(yīng)routing key的隊(duì)列中去。

路由模式:

1、每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置routingkey;
2、生產(chǎn)者將消息發(fā)給交換機(jī),由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列;

應(yīng)用場(chǎng)景:用戶通知,當(dāng)用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;

生產(chǎn)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

,  【聲音】【量天】【矗立】【能量】,【方的】【戰(zhàn)場(chǎng)】【紫真】【又不】,【飄散】【擊螞】【當(dāng)下】【尊大】【斷了】.【里面】【骨下】【暢沒】【擊中】【作勢(shì)】,【新派】【神族】【是一】【活意】,【行設(shè)】【有黑】【非?!俊居蚶铩俊疽孕巍?【案發(fā)】【歸入】【間都】【血河】【音似】【到?jīng)]】,【微微】【毒蛤】【脫了】【這尊】,【掉了】【已經(jīng)】【凜然】【筑前】【在左】,【一望】【人真】【眼的】.【的陰】【戰(zhàn)斗】【是一】【鎖區(qū)】,【好歹】【展鯤】【難性】【掉這】,【噬整】【可以】【真的】【白象】.【士卒】!【覺要】【雨般】【體積】【里卻】【生命】【個(gè)黑】【神強(qiáng)】.【只有】,

6、通過通道使隊(duì)列綁定到交換機(jī)并指定該隊(duì)列的routingkey

7、制定消息

8、發(fā)送消息并指定routingkey

消費(fèi)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

6、通過通道使隊(duì)列綁定到交換機(jī)并指定routingkey

7、重寫消息消費(fèi)方法

8、執(zhí)行消息方法

按照假設(shè)的應(yīng)用場(chǎng)景,同樣,Routing 路由模式也是一個(gè)生產(chǎn)端,兩個(gè)消費(fèi)端,所不同的是,聲明交換機(jī)的類型不同,隊(duì)列綁定交換機(jī)的時(shí)候需要指定Routing key,發(fā)送消息的時(shí)候也需要指定Routing key,這樣根據(jù)Routing key就能把相應(yīng)的消息發(fā)送到相應(yīng)的隊(duì)列中去。

生產(chǎn)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer03 {
    //聲明兩個(gè)隊(duì)列和一個(gè)交換機(jī)
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)器
            //創(chuàng)建與RabbitMQ服務(wù)的TCP連接
            connection = connectionFactory.newConnection();
            //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
             * 參數(shù)明細(xì)
             * 1、交換機(jī)名稱
             * 2、交換機(jī)類型,fanout、topic、direct、headers
             */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            //給email隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送email消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
                System.out.println("mq消息發(fā)送成功!");
            }
            //給sms隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送sms消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
                System.out.println("mq消息發(fā)送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費(fèi)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費(fèi)端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }
            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

四、Topics 模式

Topics 模式和Routing 路由模式最大的區(qū)別就是,Topics 模式發(fā)送消息和消費(fèi)消息的時(shí)候是通過通配符去進(jìn)行匹配的。

路由模式:

1、每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey

2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列

應(yīng)用場(chǎng)景:用戶通知,當(dāng)用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法;

生產(chǎn)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

6、通過通道使隊(duì)列綁定到交換機(jī)并指定該隊(duì)列的routingkey(通配符)

7、制定消息

8、發(fā)送消息并指定routingkey(通配符)

消費(fèi)端:

1、聲明隊(duì)列,聲明交換機(jī)

2、創(chuàng)建連接

3、創(chuàng)建通道

4、通道聲明交換機(jī)

5、通道聲明隊(duì)列

6、通過通道使隊(duì)列綁定到交換機(jī)并指定routingkey(通配符)

7、重寫消息消費(fèi)方法

8、執(zhí)行消息方法

按照假設(shè)的應(yīng)用場(chǎng)景,Topics 模式也是一個(gè)生產(chǎn)端,兩個(gè)消費(fèi)端,生產(chǎn)端隊(duì)列綁定交換機(jī)的時(shí)候,需要指定的routingkey是通配符,發(fā)送消息的時(shí)候綁定的routingkey也是通配符,消費(fèi)端隊(duì)列綁定交換機(jī)的時(shí)候routingkey也是通配符,這樣就能根據(jù)通配符匹配到消息了。

生產(chǎn)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer04 {
    //聲明兩個(gè)隊(duì)列和一個(gè)交換機(jī)
    //Topics 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)器
            //創(chuàng)建與RabbitMQ服務(wù)的TCP連接
            connection = connectionFactory.newConnection();
            //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Topics 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
            //給email隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送email消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());
                System.out.println("mq email 消息發(fā)送成功!");
            }
            //給sms隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送sms消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());
                System.out.println("mq sms 消息發(fā)送成功!");
            }
            //給email和sms隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送email sms消息。。。");
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());
                System.out.println("mq email sms 消息發(fā)送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費(fèi)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer04 {
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費(fèi)端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer04 {
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

由于生產(chǎn)端同時(shí)發(fā)送了email的消息(10條),sms的消息(10條),email和sms同時(shí)收到的消息(10條),所以每個(gè)消費(fèi)端都應(yīng)收到各自的10條消息,加上同時(shí)都能收到的10條消息,每個(gè)消費(fèi)端應(yīng)該收到20條消息;

生產(chǎn)端控制臺(tái)打?。?/p>

 郵件消費(fèi)端控制臺(tái)打印:

 短信消費(fèi)端的控制臺(tái)打?。?/p>

 生產(chǎn)端執(zhí)行后,RabbitMQ上的消息隊(duì)列情況:

 兩個(gè)消費(fèi)端執(zhí)行完后,RabbitMQ上的消息隊(duì)列情況:

 五、Header 模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對(duì))匹配隊(duì)列。

案例:

根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種通知類型都接收的則兩種通知都有效。

根據(jù)假設(shè)使用場(chǎng)景,需要一個(gè)生產(chǎn)端,兩個(gè)消費(fèi)端,不同的是,生產(chǎn)端聲明交換機(jī)時(shí),交換機(jī)的類型不同,是headers類型,生產(chǎn)端隊(duì)列綁定交換機(jī)時(shí),不使用routingkey,而是使用header中的 key/value(鍵值對(duì))匹配隊(duì)列,發(fā)送消息時(shí)也是使用header中的 key/value(鍵值對(duì))匹配隊(duì)列。

消費(fèi)端同樣是聲明交換機(jī)時(shí),交換機(jī)的類型不同,是headers類型,消費(fèi)端隊(duì)列綁定交換機(jī)時(shí),不使用routingkey,而是使用header中的 key/value(鍵值對(duì))匹配隊(duì)列,消費(fèi)消息時(shí)也是使用header中的 key/value(鍵值對(duì))匹配隊(duì)列。

生產(chǎn)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;


import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Producer05 {
    //聲明兩個(gè)隊(duì)列和一個(gè)交換機(jī)
    //Header 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務(wù)ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)器
            //創(chuàng)建與RabbitMQ服務(wù)的TCP連接
            connection = connectionFactory.newConnection();
            //創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話任務(wù)
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            //Header 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             * 4、
             * String queue, String exchange, String routingKey, Map<String, Object> arguments
             */
            Map<String,Object> headers_email = new Hashtable<String,Object>();
            headers_email.put("inform_type","email");
            Map<String,Object> headers_sms = new Hashtable<String, Object>();
            headers_sms.put("inform_type","sms");
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
            channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);
            //給email隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送email消息。。。");
                Map<String,Object> headers = new Hashtable<String,Object>();
                headers.put("inform_type","email");//匹配email通知消費(fèi)者綁定的header
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                properties.headers(headers);
                //Email通知
                channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                System.out.println("mq email 消息發(fā)送成功!");
            }
            //給sms隊(duì)列發(fā)消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發(fā)送sms消息。。。");
                Map<String,Object> headers = new Hashtable<String,Object>();
                headers.put("inform_type","sms");//匹配sms通知消費(fèi)者綁定的header
                /**
                  * 消息發(fā)布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯示綁定或解除綁定
                  * 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
                properties.headers(headers);
                //sms通知
                channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
                System.out.println("mq sms 消息發(fā)送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費(fèi)端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Consumer05 {
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
                          * 參數(shù)明細(xì)
                          * 1、交換機(jī)名稱
                          * 2、交換機(jī)類型,fanout、topic、direct、headers
                          */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             * 4、
             * String queue, String exchange, String routingKey, Map<String, Object> arguments
             */
            Map<String,Object> headers_email = new Hashtable<String,Object>();
            headers_email.put("inform_email","email");
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費(fèi)端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Consumer05 {
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機(jī)
            /**
              * 參數(shù)明細(xì)
              * 1、交換機(jī)名稱
              * 2、交換機(jī)類型,fanout、topic、direct、headers
              */
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
            //通道綁定隊(duì)列
            /**
             * 聲明隊(duì)列,如果Rabbit中沒有此隊(duì)列將自動(dòng)創(chuàng)建
             * param1:隊(duì)列名稱
             * param2:是否持久化
             * param3:隊(duì)列是否獨(dú)占此連接
             * param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列
             * param5:隊(duì)列參數(shù)
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定郵件隊(duì)列
            //交換機(jī)和隊(duì)列綁定
            /**
             * 參數(shù)明細(xì)
             * 1、隊(duì)列名稱
             * 2、交換機(jī)名稱
             * 3、路由key
             * 4、
             * String queue, String exchange, String routingKey, Map<String, Object> arguments
             */
            Map<String,Object> headers_email = new Hashtable<String,Object>();
            headers_email.put("inform_email","sms");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費(fèi)者接收消息調(diào)用此方法
                  * @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志
                    (收到消息失敗后是否需要重新發(fā)送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機(jī)
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費(fèi)者啟動(dòng)成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生產(chǎn)端啟動(dòng)后RabbitMQ上面的消息隊(duì)列情況:

六、RPC 模式

 RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實(shí)現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實(shí)現(xiàn),流程如下:

1、客戶端即是生產(chǎn)者也是消費(fèi)者,向RPC請(qǐng)求隊(duì)列發(fā)送RPC調(diào)用消息,同時(shí)監(jiān)聽RPC響應(yīng)隊(duì)列。

2、服務(wù)端監(jiān)聽RPC請(qǐng)求隊(duì)列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果。

3、服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列。

4、客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊(duì)列,接收到RPC調(diào)用結(jié)果。

 

至此,RabbitMQ的六種工作模式已經(jīng)介紹完畢,手動(dòng)代碼實(shí)現(xiàn),實(shí)際體驗(yàn)六種工作模式的不同。

 

|轉(zhuǎn)載請(qǐng)注明來源地址:蜘蛛池出租 http://www.wholesalehouseflipping.com/
專注于SEO培訓(xùn),快速排名黑帽SEO https://www.heimao.wiki

版權(quán)聲明:本文為 “蜘蛛池出租” 原創(chuàng)文章,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明;

原文鏈接:http://www.wholesalehouseflipping.com/post/17878.html

相關(guān)文章

?    2026年1月    ?
1234
567891011
12131415161718
19202122232425
262728293031

搜索

控制面板

您好,歡迎到訪網(wǎng)站!
  查看權(quán)限

網(wǎng)站分類

最新留言

標(biāo)簽列表

最近發(fā)表

作者列表

站點(diǎn)信息

  • 文章總數(shù):11694
  • 頁面總數(shù):3
  • 分類總數(shù):7
  • 標(biāo)簽總數(shù):40
  • 評(píng)論總數(shù):827
  • 瀏覽總數(shù):3765555

友情鏈接

免费国产亚洲天堂AV,国产又粗又猛又黄又爽视频,亚州国产精品一线北,国产线播放免费人成视频播放