


为什么要用 rabbitMQ?
1. 流量削峰

2. 应用解耦

3. 异步处理

RabbitMQ 核心部分

各个名词介绍
RabbitMQ 工作原理



安装
rabbitmq.com/download.html
erlang 和 rabbitMQ 版本匹配:
https://www.cnblogs.com/gne-hwz/p/10714013.html
安装:https://blog.csdn.net/almahehe/article/details/75390572
(建议看尚硅谷视频进行快速安装)
安装之后,可以访问 ip:15672 ,查看发送消息的端口(5672)和用户。

简单队列模式
生产者代码
- 项目依赖:
<dependencies> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> </dependencies>
|
- 生产者代码:
japackage com.atguigu.rabbitmq.one;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer { private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("59.110.171.189"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }
|
如果运行报超时错误,需要打开云服务器的安全组 5672 端口。
(参考博客:https://www.cnblogs.com/jxearlier/p/11920825.html)
消费者代码
package com.atguigu.rabbitmq.one;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/11/29 15:04 * 消费者:接收消息 */ public class Consumer { //队列名称 public static final String QUEUE_NAME = "hello";
//接收消息 public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("59.110.171.189"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明 接收消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(message); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; /** * 消费者 消费消息 * 1.消费哪个队列 * 2. 消费成功之后是否要自动应答,true代表自动应答,false代表手动应答。 * 3. 消费者未成功消费的回调。 * 4. 消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback); } }
|
运行结果:

测试生产者和消费者代码:
- 先运行消费者代码,发现没有消息,再运行生产者代码,发送消息,再看消费者代码控制台,此时已经接收到消息。
工作队列模式
轮训分发消息

- 抽取连接工厂工具类:
package com.atguigu.rabbitmq.utils;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/11/29 19:48 * 连接工厂创建信道的工具类 */ public class RabbitMqUtils { public static Channel getChannel() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("59.110.171.189"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
|
- 工作线程代码:(消费者)
package com.atguigu.rabbitmq.two;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/11/29 19:55 * 这是一个工作线程(相当于之前的消费者) */ public class Worker01 { //队列名称 public static final String QUEUE_NAME = "hello";
//接收消息 的工作线程 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //消息的接收 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到的消息:" + new String(message.getBody())); }; //消息接收被取消时 执行 CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }; /* * 消费者 消费消息 * 1.消费哪个队列 * 2. 消费成功之后是否要自动应答,true代表自动应答,false代表手动应答。 * 3. 消费者未成功消费的回调。 * 4. 消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
- 启动两个工作线程(消费者)
前提是在 idea 设置允许方法多个并行运行:


- 生产者代码:
package com.atguigu.rabbitmq.two;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/11/29 21:04 * 生产者 发送大量消息 */ public class Task01 { public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); /* * 生成一个队列 * 参数;1.队列名称 * 2.队列里面的消息是否持久化(磁盘),默认消息存储在内存中(不持久化false) * 3.该队列是否只供一个消费者进行消费,是否消息独有,true只允许一个消费者进行消费,默认是false(可以多个消费者消费) 4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除,true自动删除,false不自动删除 5.其他参数(延迟消息......) */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送消息完成:" + message); } } }
|
- 测试:启动生产者

可以看见消费者轮循接收消息:


消息应答
自动应答
不建议使用,仅适用在消费者可以高效并以某种速率能够处理这些消息的情况。
手动应答
消息应答的方法:

批量处理 Multiple
手动应答的好处:可以批量应答,并减少网络拥堵。

但是批量应答可能会丢失消息。所以尽量不要批量应答,将 multiple 设置为 false。
消息自动重新入队


测试:
- 生产者:
package com.atguigu.rabbitmq.three;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/12/1 14:46 * 消息在手动应答时不丢失,放回队列中重新消费 */ public class Task2 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes()); System.out.println("生产者发出消息:" + message); } } }
|
- 两个消费者(消息手动应答):
消费者一:
package com.atguigu.rabbitmq.three;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.atguigu.rabbitmq.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/12/1 14:58 * 消息在手动应答时不丢失,放回队列中重新消费 */ public class Work03 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短"); DeliverCallback deliverCallback = (consumerTag, message) -> { //沉睡1s SleepUtils.sleep(1); System.out.println("接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); //手动应答(通过信道) /*参数: 1. 消息的标记 tag 2. 是否批量应答 false:不批量应答信道中的消息,true:批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //采用手动应答 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> { System.out.println(consumerTag + "消费者取消消费接口的回调逻辑"); })); } }
|
消费者二:
package com.atguigu.rabbitmq.three;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.atguigu.rabbitmq.utils.SleepUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
/** * @author LiFang * @version 1.0 * @since 2021/12/1 14:58 * 消息在手动应答时不丢失,放回队列中重新消费 */ public class Work04 { //队列名称 public static final String TASK_QUEUE_NAME = "ack_queue";
//接收消息 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2等待接收消息处理时间较短"); DeliverCallback deliverCallback = (consumerTag, message) -> { //沉睡1s SleepUtils.sleep(30); System.out.println("接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); //手动应答(通过信道) /*参数: 1. 消息的标记 tag 2. 是否批量应答 false:不批量应答信道中的消息,true:批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; //采用手动应答 boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> { System.out.println(consumerTag + "消费者取消消费接口的回调逻辑"); })); } }
|
测试步骤:
① 先启动 task2,创建 ack_queue 队列;(在 ip:15672 的 queue 列表中可以看到目前拥有的队列)
② 启动 work02,work03 接收消息(消费者);
③ 发消息:在 task2 控制台输入 aa,bb,cc,dd,ee,ff,可以看到 work2 和 work3 是轮训接收消息;如果到 work03 应该接收消息 ee 时,work03 突然挂掉,此时 ee 会被转发给 work02 中的 C1,这时 C1 会接收到 ee,因此消息不会丢失,这说明了 rabbitmq 有手动应答的能力,只要没有收到消息,就不会手动应答,从而将消息放回队列。而队列又再次将消息传递给 C1 进行重新消费,从而导致 ee 并没有丢失。
队列持久化
如果存在同名未被持久化的队列,则需要先删除原先的未被持久化的队列,再重新生成一个持久化队列。
boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
|
生成一个持久化队列之后,在 rabbitmq 控制台中这个队列的 features 属性会出现 D(代表持久化)。

消息持久化
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
|
不公平分发
int prefetchCount = 1; channel.basicQos(prefetchCount);
|


预取值

int prefetchCount = 5; channel.basicQos(prefetchCount);
|
发布确认原理

在信道之后开启发布确认:
单个发布确认

package com.atguigu.rabbitmq.four;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException;
public class ConfireMessage { public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { ConfireMessage.publicMessageIndividually(); }
public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String massage = i + ""; channel.basicPublish("", queueName, null, massage.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end - begin)+"ms"); } }
|
批量发布确认

public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); channel.confirmSelect(); long begin = System.currentTimeMillis(); int batchSize = 100; for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); if (i % batchSize == 0) { channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms"); }
|
异步发布确认


public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
ConfireMessage.publicMessageAsync(); }
public static void publicMessageAsync() throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); channel.confirmSelect(); long begin = System.currentTimeMillis(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { System.out.println("确认的消息" + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("未确认的消息" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String massage = "消息" + i; channel.basicPublish("", queueName, null, massage.getBytes());
} long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms"); }
|
如何处理异步未确认消息

上述异步确认有两个线程:
两个线程之间交互,只能用并发链路式队列(可以在确认发布与发布线程之间进行消息传递)。
public static void publicMessageAsync() throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); channel.confirmSelect();
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if(multiple){ ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); }else{ outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息" + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息是:"+message+":::::未确认的消息tag:" + deliveryTag); }; channel.addConfirmListener(ackCallback, nackCallback); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i; outstandingConfirms.put(channel.getNextPublishSeqNo(),message); channel.basicPublish("", queueName, null, message.getBytes());
} long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms"); }
|
以上三种发布确认速度对比:
- 单独发布消息:同步等待确认,简单,但吞吐量非常有限。
- 批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题,很难推断出是哪条出现了问题
- 异步处理:最佳性能和资源利用,在出现错误的情况下,可以很好的控制,但是实现起来稍微难些。
交换机
交换机的作用

交换机的类型
- 直接(direct)== 路由类型
- 主题(topic)
- 标题(headers)(企业不常用)
- 扇出(fanout)== 发布订阅类型
- 无名类型(默认类型),通常用空串进行识别

临时队列
不带有持久化,一旦断开消费者的连接,队列将被自动删除。
创建临时队列:
String queueName = channel.queueDeclare().getQueue();
|

绑定
就是交换机与队列之间的捆绑关系。

发布订阅模式(扇出模式 fanout)
类似广播,两个 routingkey 相同

- 生产者
package com.atguigu.rabbitmq.five;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException;
public class EmitLog { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:" + message); } } }
|
- 两个消费者
package com.atguigu.rabbitmq.five;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogs01 { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收的消息打印在屏幕上。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs01控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queueName, true, deliverCallback,consumerTag->{}); } }
|
package com.atguigu.rabbitmq.five;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogs02 { public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收消息,把接收的消息打印在屏幕上。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs02控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(queueName, true, deliverCallback,consumerTag->{}); } }
|



直接交换机(路由模式 direct)
两个 routingkey 不相同

可以多重绑定。
生产者发消息给队列,直接交换机通过不同 routingkey 路由到相应的队列,然后消费者接收指定日志。
- 发消息
package com.atguigu.rabbitmq.six;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException;
public class DirectLogs { public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息:" + message); } } }
|
- 接收消息
package com.atguigu.rabbitmq.six;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console",false,false,false,null); channel.queueBind("console",EXCHANGE_NAME,"info"); channel.queueBind("console",EXCHANGE_NAME,"warning"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs01控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("console", true, deliverCallback,consumerTag->{});
} }
|
package com.atguigu.rabbitmq.six;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk",false,false,false,null); channel.queueBind("disk",EXCHANGE_NAME,"error"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("ReceiveLogs02控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume("disk", true, deliverCallback,consumerTag->{});
} }
|


主题交换机(Topic)
规范:




主题交换机(实战)

- 消费者
package com.atguigu.rabbitmq.seven;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息。。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }
}
|
package com.atguigu.rabbitmq.seven;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class ReceiveLogsTopic02 { public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息。。。。。"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }
}
|
- 生产者
package com.atguigu.rabbitmq.seven;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.spec.ECField; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
public class EmitLogTopic { public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel();
Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到"); bindingKeyMap.put("lazy.pink.rabbit", "被队列Q1接收到"); bindingKeyMap.put("quick.brown.fox", "被队列Q2接收到"); bindingKeyMap.put("quick.orange.male.rabbit", "虽然满足两个绑定但只被队列Q2接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词不匹配任何绑定定会丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息" + message); } } }
|
先启动消费者,再启动生产者。



死信队列



package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange"; public static final String DEAD_EXCHANGE = "dead_exchange"; public static final String NORMAL_QUEUE = "normal_queue"; public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收消息........."); DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); if (msg.equals("info5")) { System.out.println("此消息被C1拒绝的" + msg); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer01接收的消息" + msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { }); } }
|
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("等待接收消息........."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的消息" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> {
}); } }
|
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); } } }
|
测试步骤:
- 运行消费者:会发现普通和死信交换机已经绑定各自的队列。


- 测试队列达到最大长度,关闭消费者 1 和 2,开启生产者:消息会积压在队列中,消费者 1 所在的普通队列消息限制有 6 条,剩下的 4 条会进入消费者 2 所在的死信队列。如下图所示:

- 取消普通队列的最大长度限制,测试消息 ttl 过期:关闭消费者 1 和 2,开启生产者发送消息。(消息会因为没人接收,会在 ttl 时间内积压在普通队列中, ttl 过期后,消息会进入死信队列中。)
生产者:
package com.atguigu.rabbitmq.eight;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); } } }
|
- 测试消息被拒:
开启消费者 1 和 2,再开启生产者。


延迟队列(基于死信队列)


整合 SpringBoot
实现延迟队列:
- 依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.1</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.atguigu.rabbitmq</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-rabbitmq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.1</version> </dependency> <!-- https: <!--web服务器,可以自启动--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.6.1</version> </dependency> <!--快速进行json转换--> <!-- https: <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency> <!-- https: <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>3.0.0</version> </dependency> <!-- https: <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>3.0.0</version> </dependency> <!-- https: <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <version>2.4.0</version> <scope>test</scope> </dependency> <!-- https: <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>provided</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
|
- yml 配置文件:
spring: rabbitmq: host: 59.110.171.189 port: 5672 username: admin password: 123
|
- swagger 配置类:
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.context.annotation.Bean; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket;
public class SwaggerConfig { @Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); }
private ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description(" 本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("enjoy6288", "http://atguigu.com", "1846015350@qq.com")) .build(); } }
|

- 声明队列配置文件:
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); }
@Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }
@Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); }
@Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); }
@Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); }
@Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); }
@Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
|
- 消费者:接收消息
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j @Component public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg); } }
|
- 发送消息:Controller
package com.atguigu.rabbitmq.springbootrabbitmq.controller;
import io.swagger.annotations.ApiModelProperty; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间:{},发送一条消息给两个ttl队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message); } }
|
访问:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
控制台打印结果:

延迟队列优化:

增加一个 QC 普通队列声明后并绑定交换机 XC。
队列配置 中添加:
public static final String QUEUE_C = "QC";
@Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); }
@Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }
|
添加 Controller 发消息控制器:
@GetMapping("/sendExpireMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) { log.info("当前时间:{},发送一条时长{}毫秒,ttl信息给队列QC:{}", new Date().toString(), ttlTime, message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttlTime); return msg; }); }
|
测试:
- http://localhost:8080/ttl/sendExpireMsg/你好 1/20000
- http://localhost:8080/ttl/sendExpireMsg/你好 2/2000
结果:

延迟队列(基于插件)
进入 rabbitmq 安装目录下的 plugins 目录 ,cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
执行命令让该插件生效:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后重启 rabbitmq:systemctl restart rabbitmq-server
会发现交换机多了一个新类型,意味着延迟消息将由交换机来完成,而不是队列。

原来的情况:基于死信

现在:基于延迟插件

代码架构:

- 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.name"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE_NAME).build(); }
@Bean public CustomExchange delayedExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments); }
@Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
|
- Controller 生产者:
@GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) { log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}", new Date().toString(), delayTime, message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setDelay(delayTime); return msg; }); }
|
- 消费者:
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.atguigu.rabbitmq.springbootrabbitmq.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayQueue(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg); } }
|
测试:
发起请求:http://localhost:8080/ttl/sendDelayMsg/com on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/com on baby2/2000


发布确认高级


回调接口 : (若交换机收不到消息)
- 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE_NAME); }
@Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
|
- 生产者:发消息
package com.atguigu.rabbitmq.springbootrabbitmq.controller;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { CorrelationData correlationData = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData); log.info("发送消息内容为:{}", message); } }
|
- 消费者:
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message) { String msg = new String(message.getBody()); log.info("接收到的队列confirm.queue消息:{}", msg); } }
|
- 回调接口
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到id为:{}的消息", id); } else { log.error("交换机还未收到id为:{}的消息,由于原因:{}", id, cause); } } }
|

spring: rabbitmq: host: 59.110.171.189 port: 5672 username: admin password: 123 publisher-confirm-type: correlated # 消息确认机制
|
- 发送请求 : http://localhost:8080/confirm/sendMessage/大家好 1

- 测试交换机收不到消息:在发送消息中,将交换机名字后面拼接上"123",再次启动,发送请求: http://localhost:8080/confirm/sendMessage/大家好 1
会得到:

- 测试队列收不到消息
package com.atguigu.rabbitmq.springbootrabbitmq.controller;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;
@Slf4j @RestController @RequestMapping("/confirm") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message) { CorrelationData correlationData1 = new CorrelationData("1"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData1); log.info("发送消息内容为:{}", message);
CorrelationData correlationData2 = new CorrelationData("2"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message, correlationData2); log.info("发送消息内容为:{}", message); } }
|

可见,队列没有收到消息,也没有应答和确认。
若队列收不到消息
回退消息

spring: rabbitmq: host: 59.110.171.189 port: 5672 username: admin password: 123 publisher-confirm-type: correlated # 消息确认机制 publisher-returns: true # 发布确认机制(消息在交换机那若路由失败,则会回退消息给生产者)
|
回退接口:
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); }
@Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}", new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey()); }
|

备份交换机
添加一个交换机和两个队列。

- 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
public static final String BACKUP_QUEUE_NAME = "backup_queue";
public static final String WARNING_QUEUE_NAME = "warning_queue";
@Bean public DirectExchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build(); }
@Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); }
@Bean public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE_NAME); }
@Bean public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); }
@Bean public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); }
@Bean public Binding backupQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue) { return BindingBuilder.bind(backupQueue).to(backupExchange); }
@Bean public Binding warningQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue) { return BindingBuilder.bind(warningQueue).to(backupExchange); } }
|
- 消费者(报警消费者)
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;
import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message) { String msg = new String(message.getBody()); log.error("报警发现不可路由消息:{}", msg); } }
|
发送请求:http://localhost:8080/confirm/sendMessage/大家好 1

备份交换机的优先级高于回退消息、
其他知识点
幂等性


优先级队列


实现优先级:
- 生产者:
package com.atguigu.rabbitmq.one;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException;
public class Producer { private static final String QUEUE_NAME = "hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 0; i < 11; i++) { String message = "info" + i; if (i == 5) { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("", QUEUE_NAME, properties, message.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } }
System.out.println("消息发送完毕"); } }
|
启动生产者:


- 消费者 :
package com.atguigu.rabbitmq.one;
import com.atguigu.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Arrays; import java.util.concurrent.TimeoutException;
public class Consumer { public static final String QUEUE_NAME = "hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
|
启动消费者,

惰性队列


惰性队列执行性能不太好,因此默认情况下不使用惰性队列,而使用正常队列。


- 惰性队列从磁盘上读取消息,因此消费消息比较慢,但是内存消耗较小,在内存中只存储一些索引。一旦需要消费这些消息时,惰性队列会通过内存中的索引,去读取磁盘中相应的消息,到内存,再消费消息。
- 正常队列从内存中读取消息,因此消费消息比较快,但是内存消耗较大。
rabbitmq 集群
集群原理

镜像队列(备份)

高可用负载均衡
若节点 1 宕机了,生产者需要连接节点 2 或节点 3。

出现问题:生产者无法变更 rabbitmq 的 ip,此时需要借助外力 Haproxy。
Haproxy 实现高可用 负载均衡(高并发)



联合交换机





联邦队列

两个不同地区数据同步。

Shovel




