消息的可靠传输是面试必问的问题之一,保证消息的可靠传输主要在生产端开启 comfirm 模式,RabbitMQ 开启持久化,消费端关闭自动 ack 模式。本文将详解SpringBoot整合RabbitMQ如何实现消息可靠传输,需要的可以参考一下
环境配置
SpringBoot
整合 RabbitMQ
实现消息的发送。
1.添加 maven
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.添加 application.yml 配置文件
spring:
rabbitmq:
host: 192.168.3.19
port: 5672
username: admin
password: xxxx
3.配置交换机、队列以及绑定
@Bean
public DirectExchange myExchange() {
DirectExchange directExchange = new DirectExchange("myExchange");
return directExchange;
}
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue");
return queue;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
}
4.生产发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String send(String message) {
rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
System.out.println("【发送消息】" + message)
return "【send message】" + message;
}
5.消费者接收消息
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process(String msg, Channel channel, Message message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time = sdf.format(date);
System.out.println("【接收信息】" + msg + " 当前时间" + time);
6.调用生产端发送消息 hello
,控制台输出:
【发送消息】hello
【接收信息】hello 当前时间2022-05-12 10:21:14
说明消息已经被成功接收。
消息丢失分析
一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:
- 生产端丢失: 生产者无法传输到
RabbitMQ
- 存储端丢失:
RabbitMQ
存储自身挂了 - 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费
RabbitMQ
从生产端、储存端、消费端都对可靠性传输做很好的支持。
生产阶段
生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。
配置application.yml
spring:
rabbitmq:
# 消息确认机制 生产者 -> 交换机
publisher-confirms: true
# 消息返回机制 交换机 -> 队列
publisher-returns: true
配置
@Configuration
@Slf4j
public class RabbitConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("【correlationData】:" + correlationData);
log.info("【ack】" + ack);
log.info("【cause】" + cause);
if (ack) {
log.info("【发送成功】");
} else {
log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
}
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.warn("【消息发送失败】");
log.info("【message】" + message);
log.info("【replyCode】" + replyCode);
}
});
return rabbitTemplate;
}
}
消息从 生产者 到 交换机, 有confirmCallback
确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause)
,根据 ack
判断消息是否发送成功。
消息从 交换机 到 队列,有returnCallback
退回模式。
发送消息 product message
控制台输出如下:
【发送消息】product message
【接收信息】product message 当前时间2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【发送成功】
生产端模拟消息丢失
这里有两个方案:
- 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
- 发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
结果:
【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【发送失败】
当发送失败可以对消息进行重试
交换机正确,发送不存在的队列:
交换机接收到消息,返回成功通知,控制台输出:
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【发送成功】
交换机没有找到队列,返回失败信息:
【消息发送失败】
【message】product message
【replyCode】312
RabbitMQ
开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里。
修改队列的持久化,修改成非持久化:
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue",false);
return queue;
}
发送消息之后,消息存放在队列中,然后重启 RabbitMQ
,消息不存在了。
设置队列持久化:
@Bean
public Queue myQueue() {
Queue queue = new Queue("myQueue",true);
return queue;
}
重启之后,队列的消息还存在。
消费端
消费端默认开始 ack
自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:
修改application.yml 文件
spring:
rabbitmq:
# 手动消息确认
listener:
simple:
acknowledge-mode: manual
消费接收消息之后需要手动确认:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue"))
public void process(String msg, Channel channel, Message message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date();
String time = sdf.format(date);
System.out.println("【接收信息】" + msg + " 当前时间" + time);
System.out.println(message.getMessageProperties().getDeliveryTag());
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
如果不添加:
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
发送两条消息
消息被接收后,没有确认,重新放到队列中:
重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。
加上 channel.basicAck
之后,再重启项目
队列消息就被删除了
basicAck
方法最后一个参数 multiple
表示是删除之前的队列。
multiple
设置成 true
,把后面的队列都清理掉了
到此这篇关于SpringBoot+RabbitMQ实现消息可靠传输详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ消息可靠传输内容请搜索编程学习网以前的文章希望大家以后多多支持编程学习网!
本文标题为:SpringBoot+RabbitMQ实现消息可靠传输详解
基础教程推荐
- springboot自定义starter方法及注解实例 2023-03-31
- JDK数组阻塞队列源码深入分析总结 2023-04-18
- Java数据结构之对象比较详解 2023-03-07
- Java并发编程进阶之线程控制篇 2023-03-07
- Java实现查找文件和替换文件内容 2023-04-06
- Java文件管理操作的知识点整理 2023-05-19
- ConditionalOnProperty配置swagger不生效问题及解决 2023-01-02
- java基础知识之FileInputStream流的使用 2023-08-11
- Java实现线程插队的示例代码 2022-09-03
- java实现多人聊天系统 2023-05-19