MQ

优势:

高内聚低耦合

问题:

优势二:“

优势三:

相关概念:

首先写一个连接单元类:

package com.hmdp.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

    public static final String HOST_ADDRESS = "192.168.253.128";

    public static Connection getConnection() throws Exception {

        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置服务地址
        factory.setHost(HOST_ADDRESS);

        // 端口
        factory.setPort(5672);

        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("123456");

        // 通过工程获取连接
        Connection connection = factory.newConnection();

        return connection;
    }



    public static void main(String[] args) throws Exception {

        Connection con = ConnectionUtil.getConnection();

        // amqp://guest@192.168.200.100:5672/
        System.out.println(con);

        con.close();

    }

}

模拟生产者:

package com.hmdp.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

//    public  void main(String[] args) throws Exception {
//
//        // 创建连接工厂
//        ConnectionFactory connectionFactory = new ConnectionFactory();
//
//        // 设置主机地址
//        connectionFactory.setHost("192.168.253.128");
//
//        // 设置连接端口号:默认为 5672
//        connectionFactory.setPort(5672);
//
//        // 虚拟主机名称:默认为 /
//        connectionFactory.setVirtualHost("/");
//
//        // 设置连接用户名;默认为guest
//        connectionFactory.setUsername("guest");
//
//        // 设置连接密码;默认为guest
//        connectionFactory.setPassword("123456");
//
//        // 创建连接
//        Connection connection = connectionFactory.newConnection();
//
//        // 创建频道
//        Channel channel = connection.createChannel();
//
//        // 声明(创建)队列
//        // queue      参数1:队列名称
//        // durable    参数2:是否定义持久化队列,当 MQ 重启之后还在
//        // exclusive  参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
//        // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
//        // arguments  参数5:队列其它参数
//        channel.queueDeclare("simple_queue", true, false, false, null);
//
//        // 要发送的信息
//        String message = "你好;小兔子!";
//
//        // 参数1:交换机名称,如果没有指定则使用默认Default Exchange
//        // 参数2:路由key,简单模式可以传递队列名称
//        // 参数3:配置信息
//        // 参数4:消息内容
//        channel.basicPublish("", "simple_queue", null, message.getBytes());
//
//        System.out.println("已发送消息:" + message);
//
//        // 关闭资源
//        channel.close();
//        connection.close();
//
//    }
public static final String QUEUE_NAME = "work_queue";//声明一个名为 "work_queue" 的队列

    public static void main(String[] args) throws Exception {
        //连接上mq
        Connection connection = ConnectionUtil.getConnection();
        //创建一个信道
        Channel channel = connection.createChannel();

//        队列名称:work_queue
//        持久化设置:true(队列会在 RabbitMQ 重启后保留)
//        非独占队列:false(允许多个连接访问该队列)
//        非自动删除:false(即使没有消费者,队列也不会自动删除)
//        没有额外参数:null
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        for (int i = 1; i <= 10; i++) {

            String body = i+"hello rabbitmq~~~";

            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());

        }

        channel.close();

        connection.close();

    }
}

消费者写两个:

package com.hmdp.utils;

import com.rabbitmq.client.*;

import java.io.IOException;

import static com.hmdp.utils.Consumer2.QUEUE_NAME;


public class Consumer1 {

//    public static void main(String[] args) throws Exception {
//
//        // 1.创建连接工厂
//        ConnectionFactory factory = new ConnectionFactory();
//
//        // 2. 设置参数
//        factory.setHost("192.168.253.128");
//        factory.setPort(5672);
//        factory.setVirtualHost("/");
//        factory.setUsername("guest");
//        factory.setPassword("123456");
//
//        // 3. 创建连接 Connection
//        Connection connection = factory.newConnection();
//
//        // 4. 创建Channel,频道对象
//        Channel channel = connection.createChannel();
//
//        // 5. 创建队列
//        // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
//        // 参数1. queue:队列名称
//        // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
//        // 参数3. exclusive:是否独占。
//        // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
//        // 参数5. arguments:其它参数。
////        channel.queueDeclare("simple_queue",true,false,false,null);
//
//        // 接收消息
//        DefaultConsumer consumer = new DefaultConsumer(channel){
//
//            // 回调方法,当收到消息后,会自动执行该方法
//            // 参数1. consumerTag:标识
//            // 参数2. envelope:获取一些信息,交换机,路由key...
//            // 参数3. properties:配置信息
//            // 参数4. body:数据
//            @Override
//            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//
//                System.out.println("consumerTag:"+consumerTag);
//                System.out.println("Exchange:"+envelope.getExchange());
//                System.out.println("RoutingKey:"+envelope.getRoutingKey());
//                System.out.println("properties:"+properties);
//                System.out.println("body:"+new String(body));
//
//            }
//
//        };
//
//        // 参数1. queue:队列名称
//        // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
//        // 参数3. callback:回调对象
//        // 消费者类似一个监听程序,主要是用来监听消息
//        channel.basicConsume("simple_queue",true,consumer);
//
//    }
public static void main(String[] args) throws Exception {
    Connection connection = ConnectionUtil.getConnection();

    Channel channel = connection.createChannel();//创建一个通道

    channel.queueDeclare(QUEUE_NAME,true,false,false,null);//创建队列

    Consumer consumer = new DefaultConsumer(channel){

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

            System.out.println("Consumer1 body:"+new String(body));

        }

    };

    channel.basicConsume(QUEUE_NAME,true,consumer);

}

}

还有Consumer2只是名字不同,让生产者产生消息,消费者去接受消息:

可以看到两个消费者是竞争状态

一条消息只被一个消费者程序消费:

多个消费端监听同一个队列还是竞争的关系

X就是交换机

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
**发布订阅模式与工作队列模式的区别:**

– 工作队列模式本质上是绑定默认交换机

– 发布订阅模式绑定指定交换机

– 监听同一个队列的消费端程序彼此之间是竞争关系

– 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

生产者把消息给交换机,每个队列绑定了交换机就能收到交换机发来的消息,然后队列上就有了消息,消费者消费指定队列上的消息

也就是消息队列和路由键进行一个绑定,在发送消息的时候要指定交换机和路由键

package com.hmdp.RabbitMq;

import com.hmdp.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class ProducerRouting {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();//创建连接

        Channel channel = connection.createChannel();//创建通道

        String exchangeName = "test_direct";//交换机名称

        // 创建交换机,指定交换机类型BuiltinExchangeType.DIRECT
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        // 创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        // 声明(创建)队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 队列绑定交换机
        // 队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");//error是路由键,不同的队列可以绑定相同的路由键
        // 队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");
        //绑定了四个路由键
        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";

        // 发送消息
        channel.basicPublish(exchangeName,"error",null,message.getBytes());
        System.out.println(message);

        // 释放资源
        channel.close();
        connection.close();

    }
}
package com.hmdp.RabbitMq;

import com.hmdp.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerRouting1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }

}

在生产者中创建队列,创建路由键,交换机,将队列名,交换机,路由键进行绑定,这样生产消息后,消息就会通过路由键发送给对应的队列

举例子:

博客内容均系原创,未经允许严禁转载!
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇