优势:
高内聚低耦合
问题:
优势二:“
”
优势三:
相关概念:
1,工作模式(Work Queues):
首先写一个连接单元类:
package com.hmdp.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String HOST_ADDRESS = "192.168.253.128";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection con = ConnectionUtil.getConnection();
// amqp://guest@192.168.200.100:5672/
System.out.println(con);
con.close();
}
}
模拟生产者:
package com.hmdp.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
// public void main(String[] args) throws Exception {
//
// // 创建连接工厂
// ConnectionFactory connectionFactory = new ConnectionFactory();
//
// // 设置主机地址
// connectionFactory.setHost("192.168.253.128");
//
// // 设置连接端口号:默认为 5672
// connectionFactory.setPort(5672);
//
// // 虚拟主机名称:默认为 /
// connectionFactory.setVirtualHost("/");
//
// // 设置连接用户名;默认为guest
// connectionFactory.setUsername("guest");
//
// // 设置连接密码;默认为guest
// connectionFactory.setPassword("123456");
//
// // 创建连接
// Connection connection = connectionFactory.newConnection();
//
// // 创建频道
// Channel channel = connection.createChannel();
//
// // 声明(创建)队列
// // queue 参数1:队列名称
// // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// // arguments 参数5:队列其它参数
// channel.queueDeclare("simple_queue", true, false, false, null);
//
// // 要发送的信息
// String message = "你好;小兔子!";
//
// // 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// // 参数2:路由key,简单模式可以传递队列名称
// // 参数3:配置信息
// // 参数4:消息内容
// channel.basicPublish("", "simple_queue", null, message.getBytes());
//
// System.out.println("已发送消息:" + message);
//
// // 关闭资源
// channel.close();
// connection.close();
//
// }
public static final String QUEUE_NAME = "work_queue";//声明一个名为 "work_queue" 的队列
public static void main(String[] args) throws Exception {
//连接上mq
Connection connection = ConnectionUtil.getConnection();
//创建一个信道
Channel channel = connection.createChannel();
// 队列名称:work_queue
// 持久化设置:true(队列会在 RabbitMQ 重启后保留)
// 非独占队列:false(允许多个连接访问该队列)
// 非自动删除:false(即使没有消费者,队列也不会自动删除)
// 没有额外参数:null
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}
消费者写两个:
package com.hmdp.utils;
import com.rabbitmq.client.*;
import java.io.IOException;
import static com.hmdp.utils.Consumer2.QUEUE_NAME;
public class Consumer1 {
// public static void main(String[] args) throws Exception {
//
// // 1.创建连接工厂
// ConnectionFactory factory = new ConnectionFactory();
//
// // 2. 设置参数
// factory.setHost("192.168.253.128");
// factory.setPort(5672);
// factory.setVirtualHost("/");
// factory.setUsername("guest");
// factory.setPassword("123456");
//
// // 3. 创建连接 Connection
// Connection connection = factory.newConnection();
//
// // 4. 创建Channel,频道对象
// Channel channel = connection.createChannel();
//
// // 5. 创建队列
// // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// // 参数1. queue:队列名称
// // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// // 参数3. exclusive:是否独占。
// // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// // 参数5. arguments:其它参数。
//// channel.queueDeclare("simple_queue",true,false,false,null);
//
// // 接收消息
// DefaultConsumer consumer = new DefaultConsumer(channel){
//
// // 回调方法,当收到消息后,会自动执行该方法
// // 参数1. consumerTag:标识
// // 参数2. envelope:获取一些信息,交换机,路由key...
// // 参数3. properties:配置信息
// // 参数4. body:数据
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//
// System.out.println("consumerTag:"+consumerTag);
// System.out.println("Exchange:"+envelope.getExchange());
// System.out.println("RoutingKey:"+envelope.getRoutingKey());
// System.out.println("properties:"+properties);
// System.out.println("body:"+new String(body));
//
// }
//
// };
//
// // 参数1. queue:队列名称
// // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// // 参数3. callback:回调对象
// // 消费者类似一个监听程序,主要是用来监听消息
// channel.basicConsume("simple_queue",true,consumer);
//
// }
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();//创建一个通道
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//创建队列
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
还有Consumer2只是名字不同,让生产者产生消息,消费者去接受消息:
可以看到两个消费者是竞争状态
一条消息只被一个消费者程序消费:
2,发布订阅模式(Publish/Subscribe):
多个消费端监听同一个队列还是竞争的关系
交换机:
X就是交换机
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
**发布订阅模式与工作队列模式的区别:**
– 工作队列模式本质上是绑定默认交换机
– 发布订阅模式绑定指定交换机
– 监听同一个队列的消费端程序彼此之间是竞争关系
– 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
生产者把消息给交换机,每个队列绑定了交换机就能收到交换机发来的消息,然后队列上就有了消息,消费者消费指定队列上的消息
路由模式:
也就是消息队列和路由键进行一个绑定,在发送消息的时候要指定交换机和路由键
package com.hmdp.RabbitMq;
import com.hmdp.utils.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class ProducerRouting {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();//创建连接
Channel channel = connection.createChannel();//创建通道
String exchangeName = "test_direct";//交换机名称
// 创建交换机,指定交换机类型BuiltinExchangeType.DIRECT
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name,exchangeName,"error");//error是路由键,不同的队列可以绑定相同的路由键
// 队列2绑定info error warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
//绑定了四个路由键
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName,"error",null,message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}
package com.hmdp.RabbitMq;
import com.hmdp.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerRouting1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
在生产者中创建队列,创建路由键,交换机,将队列名,交换机,路由键进行绑定,这样生产消息后,消息就会通过路由键发送给对应的队列
主题模式(Topics):
举例子:
进阶篇: