SpringBoot整合RabbitMq
SpringBoot整合RabbitMQ
一、引入Pom
1 | <!--RabbitMq--> |
RabbitAutoConfiguration:自动生效
自动配置类:自动配置
- RabbitTemplate:
- AmqpAdmin:
- CachingConnectionFactory:
- MessagingTemplateConfiguration:
二、配置连接,启动RabbitMq
开启RabbitMQ
1
连接配置RabbitMQ (绑定配置类)
1
2
public class RabbitPropertieapplication.yaml
1
2
3
4
5
6
7spring:
rabbitmq:
host: localhost
port: 5672 #高级消息队列端口
virtual-host: my_vhost #虚拟主机地址
username: guest
password: guest
三、使用
3.1 AmqpAdmin
3.1.1 创建交换机
Exchange:交换机种类接口
1 | //创建直接交换机 名字、是否持久化(重启是否存在)、自动删除、其他参数 |
1 |
|
3.1.2 创建队列
1 | //创建队列 名字、是否可持久化、是否排他的(只允许一个连接连接队列)、自动删除、其他参数 |
1 |
|
3.1.3 绑定队列和交换机
1 | //目的地(交换机或者队列名称)、目的地类型(交换机还是队列)、交换机、路由键、其他参数 |
1 |
|
3.2 RabbitTemplate
3.2.1 发送消息
1 | //交换机名称、路由键、发送的消息内容 |
- 发送一般消息2.发送对象(默认Java序列化机制) —对象必须实现序列化接口
1
2
3
4
5
6
void sendMsg() {
String hello_word = new String("hello word");
rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", hello_word);
log.debug("消息发送成功!"+hello_word);
}
1 | //消息对象 |
1 |
|
配置序列化方法:
自动配置类中注入RabbitTemplate时
1
2
3
4
5
6
7
8
9
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
//使用配置方法进行配置
configurer.configure(template, connectionFactory);
return template;
}配置类中配置方法 RabbitTemplateConfigurer
消息类型转化器设置到配置类中
容器中不存在就不对
RabbiteTemplate
进行设置消息类型转化器,RabbiteTemplate
使用默认的消息类型转化器(成员变量)SimpleMessageConverter
:private MessageConverter messageConverter = new SimpleMessageConverter();
使用模板类发送消息的时候默认使用
自定义序列化方法
根据自动配置源码,只需要给容器中放入自定义的消息类型转化器,这样RabbitTemplateConfigure就可以在构造的过程中设置到本身的消息类型转化器,并且在使用配置方法的时候设置到RabbitTemplate中,实现自定义消息序列化方法。
1
2
3
4
5
6
7
8
9
10
11
public class RabbitMqConfig {
/**
* 设置消息类型转化器
* @return jackson的消息类型转化器
*/
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.2.2 监听消息内容 @RabbitListener 方法 + 类上
必须开启@EnableRabbit、队列必须存在
方法上,表明当前方法处理消息队列中某种消息
类上,表明当前类监听这个消息队列,结合@RabbitHandler进行处理不同消息类型(重载方法:white_check_mark:)
1 | //监听多个队列 |
监听消息
监听消息参数类型:
- OBject
- Message(org.springframework.amqp.core.Message):包含消息的详细信息
- 写目标需要的消息类型(Spring会自动转化)
- Channel(com.rabbitmq.client):当前传输数据的通道
1
2
3
4
public void reviveMsg(Object msg) {
System.out.println("接受到消息......内容是:" + msg + " 类型是:" + msg.getClass());
}发送消息
1
2
3
4
5
6
7
void sendMsg() {
//发送对象
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(11L, "退货了", 1, 0, new Date());
rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", orderReturnReasonEntity);
log.debug("对象消息发送成功!" + orderReturnReasonEntity);
}查看监听获取的消息内容
1.Object接收结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20接受到消息......内容是:
(
Body:'[B@27942ebe(byte[75])' //消息体内容(字节数组未打印)
MessageProperties [
headers={__TypeId__=com.gulimall.gulimallorder.entity.OrderReturnReasonEntity},
contentType=application/json,
contentEncoding=UTF-8,
contentLength=0,
receivedDeliveryMode=PERSISTENT,
priority=0,
redelivered=false,
receivedExchange=javaDirectExchange,
receivedRoutingKey=java.queue,
deliveryTag=1,
consumerTag=amq.ctag-6neAUPCvXJWN4PenBct0jQ,
consumerQueue=java.queue
]
)
类型是:
class org.springframework.amqp.core.Message2.Message接受结果(上述相同)
3.目标消息对象
OrderReturnReasonEntity
1
2
3
4
5
6
7
8
9接受到消息......内容是:
OrderReturnReasonEntity(
id=11,
name=退货了,
sort=1,
status=0,
createTime=Thu Jun 16 16:02:39 CST 2022
)
类型是:class com.gulimall.gulimallorder.entity.OrderReturnReasonEntityQueue:队列多人监听,只要收到,队列删除消息,并且每次只能有一个人消费
分布式场景下:
1. 多个微服务监听一个消息队列,这个时候谁会消费掉消息?还是都可以收到消息?:同一个消息只能一个客户端接收到 1. 收到消息,进行业务处理(高耗时)--此时能否正常接受其他消息?:消息处理完成一个,才会接受下一个消息进行处理!
3.2.3 监听消息 @RabbitHandler 方法上(Linstener类 + Handler方法 处理不同消息)
标注@RabbitHandler类表明监听哪个队列,在类中方法中标注@RabbitHandler可以根据不同消息类型进行处理消息
接受消息:
1 | /** |
发送消息:
1 |
|