image.png
image.png
image.png

为什么要用 rabbitMQ?

1. 流量削峰

image.png

2. 应用解耦

image.png

3. 异步处理

image.png

RabbitMQ 核心部分

image.png

各个名词介绍

RabbitMQ 工作原理

image.png
image.png
image.png

安装

rabbitmq.com/download.html
erlang 和 rabbitMQ 版本匹配:
https://www.cnblogs.com/gne-hwz/p/10714013.html
安装:https://blog.csdn.net/almahehe/article/details/75390572
(建议看尚硅谷视频进行快速安装)
安装之后,可以访问 ip:15672 ,查看发送消息的端口(5672)和用户。
image.png

简单队列模式

生产者代码

  1. 项目依赖:
<dependencies>
<!--指定jdk编译版本-->
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<!--rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<!--操作文件流的依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
</dependencies>
  1. 生产者代码:
japackage com.atguigu.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/28 22:15
* 生产者 :发消息
*/
public class Producer {
//队列名称
private static final String QUEUE_NAME = "hello";

//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置工厂ip 连接rabbitmq的队列
factory.setHost("59.110.171.189");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("123");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 参数;1.队列名称
* 2.队列里面的消息是否持久化(磁盘),默认消息存储在内存中(不持久化false)
* 3.该队列是否只供一个消费者进行消费,是否消息独有,true只允许一个消费者进行消费,默认是false(可以多个消费者消费)
4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除,true自动删除,false不自动删除
5.其他参数(延迟消息......)
*/

channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message = "hello world";
/**
* 发送一个消息
* 1. 发送到哪个交换机
* 2. 路由的key值是哪个,本次是队列的名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}

如果运行报超时错误,需要打开云服务器的安全组 5672 端口。
(参考博客:https://www.cnblogs.com/jxearlier/p/11920825.html

消费者代码

package com.atguigu.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/29 15:04
* 消费者:接收消息
*/
public class Consumer {
//队列名称
public static final String QUEUE_NAME = "hello";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("59.110.171.189");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(message);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/**
* 消费者 消费消息
* 1.消费哪个队列
* 2. 消费成功之后是否要自动应答,true代表自动应答,false代表手动应答。
* 3. 消费者未成功消费的回调。
* 4. 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true,deliverCallback,cancelCallback);
}
}

运行结果:
image.png

测试生产者和消费者代码:

  1. 先运行消费者代码,发现没有消息,再运行生产者代码,发送消息,再看消费者代码控制台,此时已经接收到消息。

工作队列模式

轮训分发消息

image.png

  1. 抽取连接工厂工具类:
package com.atguigu.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/29 19:48
* 连接工厂创建信道的工具类
*/
public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("59.110.171.189");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
  1. 工作线程代码:(消费者)
package com.atguigu.rabbitmq.two;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/29 19:55
* 这是一个工作线程(相当于之前的消费者)
*/
public class Worker01 {
//队列名称
public static final String QUEUE_NAME = "hello";

//接收消息 的工作线程
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//消息的接收
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
//消息接收被取消时 执行
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
/*
* 消费者 消费消息
* 1.消费哪个队列
* 2. 消费成功之后是否要自动应答,true代表自动应答,false代表手动应答。
* 3. 消费者未成功消费的回调。
* 4. 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
  1. 启动两个工作线程(消费者)

前提是在 idea 设置允许方法多个并行运行:
image.png
image.png

  1. 生产者代码:
package com.atguigu.rabbitmq.two;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/29 21:04
* 生产者 发送大量消息
*/
public class Task01 {
public static final String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/*
* 生成一个队列
* 参数;1.队列名称
* 2.队列里面的消息是否持久化(磁盘),默认消息存储在内存中(不持久化false)
* 3.该队列是否只供一个消费者进行消费,是否消息独有,true只允许一个消费者进行消费,默认是false(可以多个消费者消费)
4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除,true自动删除,false不自动删除
5.其他参数(延迟消息......)
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
  1. 测试:启动生产者

image.png
可以看见消费者轮循接收消息:
image.png
image.png

消息应答

自动应答

不建议使用,仅适用在消费者可以高效并以某种速率能够处理这些消息的情况。

手动应答

消息应答的方法:

image.png

批量处理 Multiple

手动应答的好处:可以批量应答,并减少网络拥堵。
image.png
但是批量应答可能会丢失消息。所以尽量不要批量应答,将 multiple 设置为 false。

消息自动重新入队

image.png
image.png
测试:

  1. 生产者:
package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/1 14:46
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Task2 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes());
System.out.println("生产者发出消息:" + message);
}
}
}
  1. 两个消费者(消息手动应答):

消费者一:

package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/1 14:58
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Work03 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag, message) -> {
//沉睡1s
SleepUtils.sleep(1);
System.out.println("接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
//手动应答(通过信道)
/*参数:
1. 消息的标记 tag
2. 是否批量应答 false:不批量应答信道中的消息,true:批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口的回调逻辑");
}));
}
}

消费者二:

package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/1 14:58
* 消息在手动应答时不丢失,放回队列中重新消费
*/
public class Work04 {
//队列名称
public static final String TASK_QUEUE_NAME = "ack_queue";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag, message) -> {
//沉睡1s
SleepUtils.sleep(30);
System.out.println("接收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
//手动应答(通过信道)
/*参数:
1. 消息的标记 tag
2. 是否批量应答 false:不批量应答信道中的消息,true:批量
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口的回调逻辑");
}));
}
}

测试步骤:
① 先启动 task2,创建 ack_queue 队列;(在 ip:15672 的 queue 列表中可以看到目前拥有的队列)
② 启动 work02,work03 接收消息(消费者);
③ 发消息:在 task2 控制台输入 aa,bb,cc,dd,ee,ff,可以看到 work2 和 work3 是轮训接收消息;如果到 work03 应该接收消息 ee 时,work03 突然挂掉,此时 ee 会被转发给 work02 中的 C1,这时 C1 会接收到 ee,因此消息不会丢失,这说明了 rabbitmq 有手动应答的能力,只要没有收到消息,就不会手动应答,从而将消息放回队列。而队列又再次将消息传递给 C1 进行重新消费,从而导致 ee 并没有丢失。image.png

队列持久化

如果存在同名未被持久化的队列,则需要先删除原先的未被持久化的队列,再重新生成一个持久化队列。

//声明队列
boolean durable = true; //在生产者中,需要让queue进行持久化
channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

生成一个持久化队列之后,在 rabbitmq 控制台中这个队列的 features 属性会出现 D(代表持久化)。
image.png

消息持久化

//设置生产者发送消息为持久化消息(要求保存到磁盘上MessageProperties.PERSISTENT_TEXT_PLAIN)
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

不公平分发

//在消费者中接收消息之前设置不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

image.png
image.png

预取值

image.png

int prefetchCount = 5;
channel.basicQos(prefetchCount);

发布确认原理

image.png

在信道之后开启发布确认:

//信道开启发布确认
channel.confirmSelect();

单个发布确认

image.png

package com.atguigu.rabbitmq.four;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/1 21:32
* 发布确认模式:
* 使用的时间 比较哪种确认方式是最好的
* 1.单个确认
* 2.批量确认
* 3.异步批量确认
*/
public class ConfireMessage {
//批量发消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
//1. 单个确认
ConfireMessage.publicMessageIndividually(); //发布1000个单独确认消息,耗时29726ms
}

//单个确认
public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
//用信道声明队列
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String massage = i + "";
channel.basicPublish("", queueName, null, massage.getBytes());
//单个消息就马上进行发布确认
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时"+(end - begin)+"ms");
}
}

批量发布确认

image.png

//批量发布确认
public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
//用信道声明队列
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认消息大小
int batchSize = 100;
//批量发布消息, 批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//发布确认
if (i % batchSize == 0) {
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms");
}

异步发布确认

image.png
image.png

//批量发消息的个数
public static final int MESSAGE_COUNT = 1000;

public static void main(String[] args) throws InterruptedException, TimeoutException, IOException {
//1. 单个确认
// ConfireMessage.publicMessageIndividually(); //发布1000个单独确认消息,耗时29726ms
// 2. 批量确认
// ConfireMessage.publicMessageBatch(); //发布1000个批量确认消息,耗时761ms(弊端:无法确认哪个消息未被确认)
// 3. 异步确认
ConfireMessage.publicMessageAsync(); //发布1000个异步确认消息,耗时181ms
}
//异步发布确认
public static void publicMessageAsync() throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
//用信道声明队列
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//消息确认成功,回调函数
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("确认的消息" + deliveryTag);
};
//消息确认失败,回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("未确认的消息" + deliveryTag);
};
//准备消息的监听器,监听哪些消息成功了,哪些消息失败了
channel.addConfirmListener(ackCallback, nackCallback); //异步通知
//异步发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String massage = "消息" + i;
channel.basicPublish("", queueName, null, massage.getBytes());

}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}

如何处理异步未确认消息

image.png
上述异步确认有两个线程:

  • 发消息的线程
  • 监听器的线程

两个线程之间交互,只能用并发链路式队列(可以在确认发布与发布线程之间进行消息传递)

//异步发布确认
public static void publicMessageAsync() throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
//用信道声明队列
channel.queueDeclare(queueName, true, false, false, null);
//开启发布确认
channel.confirmSelect();
/*
线程安全有序的哈希表,适用于高并发的情况
1. 轻松的将序号与消息进行关联
2. 轻松批量删除条目,只要给序号
3.支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

//消息确认成功,回调函数
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
//2. 删除已经确认的消息 剩下的就是未确认的消息
if(multiple){
//如果是批量确认,就去批量删除
ConcurrentNavigableMap<Long,String> confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
}else{
//如果是单个确认,就去单个删除
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息" + deliveryTag);
};
//消息确认失败,回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
//3. 打印未确认的消息有哪些
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息是:"+message+":::::未确认的消息tag:" + deliveryTag);
};
//准备消息的监听器,监听哪些消息成功了,哪些消息失败了
channel.addConfirmListener(ackCallback, nackCallback); //异步通知
//开始时间
long begin = System.currentTimeMillis();
//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
// 1. 此处记录下所有要发送的消息 消息的总和(每发一次消息就记录一次)
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
channel.basicPublish("", queueName, null, message.getBytes());

}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}

以上三种发布确认速度对比:

  • 单独发布消息:同步等待确认,简单,但吞吐量非常有限。
  • 批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题,很难推断出是哪条出现了问题
  • 异步处理:最佳性能和资源利用,在出现错误的情况下,可以很好的控制,但是实现起来稍微难些。

交换机

交换机的作用

image.png

交换机的类型

  • 直接(direct)== 路由类型
  • 主题(topic)
  • 标题(headers)(企业不常用)
  • 扇出(fanout)== 发布订阅类型
  • 无名类型(默认类型),通常用空串进行识别

image.png

临时队列

不带有持久化,一旦断开消费者的连接,队列将被自动删除。

创建临时队列:

String queueName = channel.queueDeclare().getQueue();

image.png

绑定

就是交换机与队列之间的捆绑关系。
image.png

发布订阅模式(扇出模式 fanout)

类似广播,两个 routingkey 相同
image.png

  1. 生产者
package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 9:59
* 发消息:交换机
*/
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
  1. 两个消费者
package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 9:35
*/
public class ReceiveLogs01 {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个队列 临时队列 (生成一个临时队列,队列的名称是随机的,当消费者断开与队列的连接的时候,队列就自动删除)
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收的消息打印在屏幕上。。。。");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs01控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName, true, deliverCallback,consumerTag->{});
}
}
package com.atguigu.rabbitmq.five;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 9:35
*/
public class ReceiveLogs02 {
//交换机的名称
public static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明一个队列 临时队列 (生成一个临时队列,队列的名称是随机的,当消费者断开与队列的连接的时候,队列就自动删除)
String queueName = channel.queueDeclare().getQueue();
//绑定交换机与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收的消息打印在屏幕上。。。。");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs02控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(queueName, true, deliverCallback,consumerTag->{});
}
}

image.png
image.png
image.png

直接交换机(路由模式 direct)

两个 routingkey 不相同
image.png
可以多重绑定。
生产者发消息给队列,直接交换机通过不同 routingkey 路由到相应的队列,然后消费者接收指定日志。

  1. 发消息
package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 9:59
* 发消息:交换机
*/
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
  1. 接收消息
package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 10:15
*/
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs01控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("console", true, deliverCallback,consumerTag->{});

}
}
package com.atguigu.rabbitmq.six;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 10:15
*/
public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANGE_NAME,"error");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs02控制台接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume("disk", true, deliverCallback,consumerTag->{});

}
}

image.png
image.png

主题交换机(Topic)

规范:
image.png
image.png
image.png
image.png

主题交换机(实战)

image.png

  1. 消费者
package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 11:37
* 声明主题交换机 及相关队列
* <p>
* 消费者 C2
*/
public class ReceiveLogsTopic01 {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
//交换机绑定 routingkey
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息。。。。。");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}

}
package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 11:37
* 声明主题交换机 及相关队列
* <p>
* 消费者 C2
*/
public class ReceiveLogsTopic02 {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
//交换机绑定 routingkey
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息。。。。。");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("接收队列:" + queueName + "绑定键:" + message.getEnvelope().getRoutingKey());
};
//接收消息
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}

}
  1. 生产者
package com.atguigu.rabbitmq.seven;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.spec.ECField;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 14:45
* 生产者
*/
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
/**
* 下图绑定关系如下:
* Q1--> 绑定的是: 中间带3个单词的字符串(*.orange*)
* Q2--> 绑定的是: 最后一个单词是rabbit的3个单词(*.*.rabbit)
* 第一个单词是lazy的多个单词(lazy.#)
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit", "被队列Q1接收到");
bindingKeyMap.put("quick.brown.fox", "被队列Q2接收到");
bindingKeyMap.put("quick.orange.male.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词不匹配任何绑定定会丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");

for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息" + message);
}
}
}

先启动消费者,再启动生产者。
image.png
image.png
image.png

死信队列

image.png
image.pngimage.png

  • 消费者 1:
package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 15:40
* 死信队列
* 消费者1
*/
public class Consumer01 {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信队列的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//声明死信和普通交换机, 类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String, Object> arguments = new HashMap<>();
//过期时间
// arguments.put("x-message-ttl",10000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
//设置正常队列的长度的限制
// arguments.put("x-max-length", 19);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
///////////////////////////////////////////////////
//声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//交换机与队列绑定
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收消息.........");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
if (msg.equals("info5")) {
System.out.println("此消息被C1拒绝的" + msg);
//拒绝此消息,并且不放回队列中。因此成为死信
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01接收的消息" + msg);
//不批量应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//开启手动应答(如果不开启手动应答,就不存在拒绝了)
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
});
}
}
  • 消费者 2:
package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 15:40
* 死信队列
* 消费者2
*/
public class Consumer02 {
//死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息.........");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Consumer02接收的消息" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> {

});
}
}
  • 生产者:
package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 16:13
* 死信队列-生产者
*/
public class Producer {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//死信时间 设置ttl时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
}
}
}

测试步骤:

  1. 运行消费者:会发现普通和死信交换机已经绑定各自的队列。

image.png
image.png

  1. 测试队列达到最大长度,关闭消费者 1 和 2,开启生产者:消息会积压在队列中,消费者 1 所在的普通队列消息限制有 6 条,剩下的 4 条会进入消费者 2 所在的死信队列。如下图所示:

image.png

  1. 取消普通队列的最大长度限制,测试消息 ttl 过期:关闭消费者 1 和 2,开启生产者发送消息。(消息会因为没人接收,会在 ttl 时间内积压在普通队列中, ttl 过期后,消息会进入死信队列中。)

生产者:

package com.atguigu.rabbitmq.eight;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 16:13
* 死信队列-生产者
*/
public class Producer {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
//死信时间 设置ttl时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
}
}
}
  1. 测试消息被拒

开启消费者 1 和 2,再开启生产者。
image.png
image.png

延迟队列(基于死信队列)

image.png
image.png

整合 SpringBoot

实现延迟队列:

  1. 依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.atguigu.rabbitmq</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<!--web服务器,可以自启动-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.6.1</version>
</dependency>
<!--快速进行json转换-->
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit-test -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
  1. yml 配置文件:
spring:
rabbitmq:
host: 59.110.171.189
port: 5672
username: admin
password: 123
  1. swagger 配置类:
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.context.annotation.Bean;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/2 22:34
*/
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description(" 本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1846015350@qq.com"))
.build();
}
}

image.png

  1. 声明队列配置文件:
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 8:15
* TTL队列, 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
//普通交换机的名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
//死信队列的名称
public static final String DEAD_LETTER_QUEUE = "QD";

//声明xExchange 别名
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}

@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

//声明普通队列 ttl为10s
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信 routing-key
arguments.put("x-dead-letter-routing-key", "YD");
//设置ttl 单位为ms
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}

//声明普通队列 ttl为40s
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信 routing-key
arguments.put("x-dead-letter-routing-key", "YD");
//设置ttl 单位为ms
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}

//死信队列
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

//绑定交换机和队列
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}

@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
  1. 消费者:接收消息
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 9:01
* 队列ttl 消费者
*/
@Slf4j
@Component
public class DeadLetterQueueConsumer {

//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg);
}
}
  1. 发送消息:Controller
package com.atguigu.rabbitmq.springbootrabbitmq.controller;

import io.swagger.annotations.ApiModelProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 8:47
* 发送延迟消息
* <p>
* http://localhost:8080/ttl/sendMsg/嘻嘻嘻
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;

//开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
log.info("当前时间:{},发送一条消息给两个ttl队列:{}", new Date().toString(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message);
}
}

访问:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
控制台打印结果:
image.png

延迟队列优化:

image.png
增加一个 QC 普通队列声明后并绑定交换机 XC。
队列配置 中添加:

//--------------------------------优化延迟队列-------------------------------------
//普通队列的名称(为了优化延迟队列)
public static final String QUEUE_C = "QC";

//声明QC
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>(2);
//设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//设置死信routing-key
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}

//绑定普通队列QC和交换机
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
//-----------------------------------优化延迟队列----------------------------------

添加 Controller 发消息控制器:

//开始发消息  消息ttl
@GetMapping("/sendExpireMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
log.info("当前时间:{},发送一条时长{}毫秒,ttl信息给队列QC:{}", new Date().toString(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
//发送消息的时候 延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}

测试:

  1. http://localhost:8080/ttl/sendExpireMsg/你好 1/20000
  2. http://localhost:8080/ttl/sendExpireMsg/你好 2/2000

结果:
image.png

延迟队列(基于插件)

进入 rabbitmq 安装目录下的 plugins 目录 ,cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
执行命令让该插件生效:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后重启 rabbitmq:systemctl restart rabbitmq-server
会发现交换机多了一个新类型,意味着延迟消息将由交换机来完成,而不是队列。
image.png

原来的情况:基于死信
image.png
现在:基于延迟插件
image.png
代码架构:
image.png

  1. 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 16:41
*/
@Configuration
public class DelayedQueueConfig {
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.name";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routing-key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

//声明队列
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}

//声明交换机 基于插件
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
/**
* 1. 交换机的名称
* 2. 交换机的类型
* 3. 是否需要持久化
* 4. 是否需要自动删除
* 5. 其他的参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}

//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
  1. Controller 生产者:
//发消息  基于延迟插件
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}", new Date().toString(), delayTime, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
//发送消息的时候 延迟时长 单位:ms
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
  1. 消费者:
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;

import com.atguigu.rabbitmq.springbootrabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 19:36
* 消费者 基于插件的延迟消息
*/
@Slf4j
@Component
public class DelayQueueConsumer {
//监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg);
}
}

测试:
发起请求:http://localhost:8080/ttl/sendDelayMsg/com on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/com on baby2/2000
image.png
image.png

发布确认高级

image.png
image.png

回调接口 : (若交换机收不到消息)

  1. 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:15
* 配置类 发布确认 (高级)
*/
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routing-key
public static final String CONFIRM_ROUTING_KEY = "key1";

//声明交换机
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}

//声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

//绑定交换机和队列
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
  1. 生产者:发消息
package com.atguigu.rabbitmq.springbootrabbitmq.controller;

import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:27
* 开始发消息
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;

//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
log.info("发送消息内容为:{}", message);
}
}
  1. 消费者:
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;

import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:33
* 接收消息
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message) {
String msg = new String(message.getBody());
log.info("接收到的队列confirm.queue消息:{}", msg);
}
}
  1. 回调接口
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:50
* 回调接口
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
//注入 (需要将当前实现类注入到RabbitTemplate的ConfirmCallback函数式接口中)
rabbitTemplate.setConfirmCallback(this);
}

/**
* 交换机确认回调方法
* 1. 发消息 交换机接收到了 回调
* 1.1 correlationData 保存回调消息的id及相关信息
* 1.2 交换机收到消息 ack = true
* 1.3 cause null
* 2. 发消息 交换机接收失败 回调
* 2.1 correlationData 保存回调消息的id及相关信息
* 2.2 交换机收到消息 ack = false
* 2.3 cause 失败的原因
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到id为:{}的消息", id);
} else {
log.error("交换机还未收到id为:{}的消息,由于原因:{}", id, cause);
}
}
}

image.png

spring:
rabbitmq:
host: 59.110.171.189
port: 5672
username: admin
password: 123
publisher-confirm-type: correlated # 消息确认机制
  1. 发送请求 : http://localhost:8080/confirm/sendMessage/大家好 1

image.png

  1. 测试交换机收不到消息:在发送消息中,将交换机名字后面拼接上"123",再次启动,发送请求: http://localhost:8080/confirm/sendMessage/大家好 1

会得到:
image.png

  1. 测试队列收不到消息
package com.atguigu.rabbitmq.springbootrabbitmq.controller;

import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:27
* 开始发消息 生产者
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;

//发消息
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData1 = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData1);
log.info("发送消息内容为:{}", message);

CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message, correlationData2);
log.info("发送消息内容为:{}", message);
}
}

image.png
可见,队列没有收到消息,也没有应答和确认。

若队列收不到消息

回退消息

image.png

spring:
rabbitmq:
host: 59.110.171.189
port: 5672
username: admin
password: 123
publisher-confirm-type: correlated # 消息确认机制
publisher-returns: true # 发布确认机制(消息在交换机那若路由失败,则会回退消息给生产者)

回退接口:

//注入
@PostConstruct
public void init() {
//注入 (需要将当前实现类注入到RabbitTemplate的ConfirmCallback函数式接口中)
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 可以在当消息传递过程中,不可达目的地时将消息返回给生产者
* 只有不可到目的地时,才进行回退
*
* @param returnedMessage
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}",
new String(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),
returnedMessage.getReplyText(),
returnedMessage.getRoutingKey());
}

image.png

备份交换机

添加一个交换机和两个队列。
image.png

  1. 配置类
package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/3 20:15
* 配置类 发布确认 (高级)
*/
@Configuration
public class ConfirmConfig {
//交换机
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//routing-key
public static final String CONFIRM_ROUTING_KEY = "key1";
// --------------------------备份交换机---------------------------------
//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";

//报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";

//--------------------------------------------------------------------
//声明确认交换机(要转发到备份交换机)
@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
}

//声明队列
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

//绑定交换机和队列
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}

//备份交换机
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}

//备份队列
@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}

//报警队列
@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}

//绑定(备份交换机和备份队列)
@Bean
public Binding backupQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}

//绑定(备份交换机和报警队列)
@Bean
public Binding warningQueueBindingBackupExchange(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
  1. 消费者(报警消费者)
package com.atguigu.rabbitmq.springbootrabbitmq.consumer;

import com.atguigu.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author LiFang
* @version 1.0
* @since 2021/12/4 9:10
* 报警消费者
*/
@Component
@Slf4j
public class WarningConsumer {
//接收报警消息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}

发送请求:http://localhost:8080/confirm/sendMessage/大家好 1
image.png

备份交换机的优先级高于回退消息、

其他知识点

幂等性

image.png
image.png

优先级队列

image.png
image.png
实现优先级:

  1. 生产者:
package com.atguigu.rabbitmq.one;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/28 22:15
* 生产者 :发消息
*/
public class Producer {
//队列名称
private static final String QUEUE_NAME = "hello1";

//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个连接工厂
// ConnectionFactory factory = new ConnectionFactory();
// //设置工厂ip 连接rabbitmq的队列
// factory.setHost("59.110.171.189");
// //用户名
// factory.setUsername("admin");
// //密码
// factory.setPassword("123");
// //创建连接
// Connection connection = factory.newConnection();
// //获取信道
// Channel channel = connection.createChannel();
Channel channel = RabbitMqUtils.getChannel();
/**
* 生成一个队列
* 参数;1.队列名称
* 2.队列里面的消息是否持久化(磁盘),默认消息存储在内存中(不持久化false)
* 3.该队列是否只供一个消费者进行消费,是否消息独有,true只允许一个消费者进行消费,默认是false(可以多个消费者消费)
4. 是否自动删除,最后一个消费者断开连接后,该队列是否自动删除,true自动删除,false不自动删除
5.其他参数(延迟消息......)
*/
Map<String, Object> arguments = new HashMap<>();
//官方允许是0-255之间。此处设置10. 允许优先级范围为0-10 不要设置过大 浪费CPU与内存
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//发消息
for (int i = 0; i < 11; i++) {
String message = "info" + i;
if (i == 5) {
//设置优先级
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
/*
* 发送一个消息
* 1. 发送到哪个交换机
* 2. 路由的key值是哪个,本次是队列的名称
* 3. 其他参数信息
* 4. 发送消息的消息体
*/
System.out.println("消息发送完毕");
}
}

启动生产者:
image.png

image.png

  1. 消费者 :
package com.atguigu.rabbitmq.one;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

/**
* @author LiFang
* @version 1.0
* @since 2021/11/29 15:04
* 消费者:接收消息
*/
public class Consumer {
//队列名称
public static final String QUEUE_NAME = "hello1";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost("59.110.171.189");
// factory.setUsername("admin");
// factory.setPassword("123");
// Connection connection = factory.newConnection();
// Channel channel = connection.createChannel();
Channel channel = RabbitMqUtils.getChannel();
//声明 接收消息(成功后的回调)
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
/*
* 消费者 消费消息
* 1.消费哪个队列
* 2. 消费成功之后是否要自动应答,true代表自动应答,false代表手动应答。
* 3. 消费者未成功消费的回调。
* 4. 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}

启动消费者,
image.png

惰性队列

image.pngimage.png
惰性队列执行性能不太好,因此默认情况下不使用惰性队列,而使用正常队列。
image.png
image.png

  • 惰性队列从磁盘上读取消息,因此消费消息比较慢,但是内存消耗较小,在内存中只存储一些索引。一旦需要消费这些消息时,惰性队列会通过内存中的索引,去读取磁盘中相应的消息,到内存,再消费消息。
  • 正常队列从内存中读取消息,因此消费消息比较快,但是内存消耗较大。

rabbitmq 集群

集群原理

image.png

镜像队列(备份)

image.png

高可用负载均衡

若节点 1 宕机了,生产者需要连接节点 2 或节点 3。
image.png
出现问题:生产者无法变更 rabbitmq 的 ip,此时需要借助外力 Haproxy。

Haproxy 实现高可用 负载均衡(高并发)

image.png
image.png
image.png

联合交换机

image.png
image.png
image.png
image.png
image.png

联邦队列

image.png
两个不同地区数据同步。
image.png

Shovel

image.png
image.png
image.png
image.png
image.png