SpringBoot整合RabbitMq

SpringBoot整合RabbitMQ

一、引入Pom

1
2
3
4
5
<!--RabbitMq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitAutoConfiguration:自动生效

自动配置类:自动配置

  1. RabbitTemplate:
  2. AmqpAdmin:
  3. CachingConnectionFactory:
  4. MessagingTemplateConfiguration:

二、配置连接,启动RabbitMq

  1. 开启RabbitMQ

    1
    @EnableRabbit
  2. 连接配置RabbitMQ (绑定配置类)

    1
    2
    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitPropertie
  3. application.yaml

    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: localhost
    port: 5672 #高级消息队列端口
    virtual-host: my_vhost #虚拟主机地址
    username: guest
    password: guest

三、使用

3.1 AmqpAdmin

AmqpAdmin提供的管理方法

3.1.1 创建交换机

Exchange:交换机种类接口
交换机种类

1
2
3
4
//创建直接交换机 名字、是否持久化(重启是否存在)、自动删除、其他参数
public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(name, durable, autoDelete, arguments);
}
1
2
3
4
5
6
@Test
void createExchange() {
DirectExchange directExchange = new DirectExchange("javaDirectExchange", true, false, null);
amqpAdmin.declareExchange(directExchange);
log.debug("exchange 创建成功 " + directExchange.getName());
}

3.1.2 创建队列

1
2
//创建队列 名字、是否可持久化、是否排他的(只允许一个连接连接队列)、自动删除、其他参数
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,@Nullable Map<String, Object> arguments)
1
2
3
4
5
6
@Test
void createQueue() {
Queue queue = new Queue("java.queue", true, false, false, null);
amqpAdmin.declareQueue(queue);
log.debug("创建队列 " + queue.getName());
}

3.1.3 绑定队列和交换机

1
2
//目的地(交换机或者队列名称)、目的地类型(交换机还是队列)、交换机、路由键、其他参数
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,@Nullable Map<String, Object> arguments)
1
2
3
4
5
@Test
void bindQueueWithExchange() {
Binding javaDirectExchange = new Binding("java.queue", Binding.DestinationType.QUEUE, "javaDirectExchange", "java.queue", null);
amqpAdmin.declareBinding(javaDirectExchange);
log.debug("创建绑定 " + "绑定目的地是:" + javaDirectExchange.getDestination() + " 绑定类型是:" + javaDirectExchange.getDestinationType() + " 绑定的路由键是:" + javaDirectExchange.getRoutingKey() + " 绑定的交换机是:" + javaDirectExchange.getExchange());

3.2 RabbitTemplate

RabbitTemplate提供的部分消息方法

3.2.1 发送消息

1
2
//交换机名称、路由键、发送的消息内容
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException
  1. 发送一般消息
    1
    2
    3
    4
    5
    6
    @Test
    void sendMsg() {
    String hello_word = new String("hello word");
    rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", hello_word);
    log.debug("消息发送成功!"+hello_word);
    }
    2.发送对象(默认Java序列化机制) —对象必须实现序列化接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//消息对象
@Data
@TableName("oms_order_return_reason")
@AllArgsConstructor
@NoArgsConstructor
public class OrderReturnReasonEntity implements Serializable {
private static final long serialVersionUID = 1L;

/**
* id
*/
@TableId
private Long id;
/**
* 退货原因名
*/
private String name;
/**
* 排序
*/
private Integer sort;
/**
* 启用状态
*/
private Integer status;
/**
* create_time
*/
private Date createTime;

}
1
2
3
4
5
6
7
@Test
void sendMsg() {
//发送对象
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(11L, "退货了", 1, 0, new Date());
rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", orderReturnReasonEntity);
log.debug("对象消息发送成功!" + orderReturnReasonEntity);
}

配置序列化方法:

  1. 自动配置类中注入RabbitTemplate时

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnMissingBean(RabbitOperations.class)
    public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    //使用配置方法进行配置
    configurer.configure(template, connectionFactory);
    return template;
    }
  2. 配置类中配置方法 RabbitTemplateConfigurer
    模板类配置类中配置方法

    消息类型转化器设置到配置类中

    rabbitTemplateConfigure配置类注入方法

​ 容器中不存在就不对RabbiteTemplate进行设置消息类型转化器,RabbiteTemplate使用默认的消息类型转化器(成员变量)SimpleMessageConverterprivate MessageConverter messageConverter = new SimpleMessageConverter();

  1. 使用模板类发送消息的时候默认使用

    消息发送转化源码过程

  2. 自定义序列化方法

    根据自动配置源码,只需要给容器中放入自定义的消息类型转化器,这样RabbitTemplateConfigure就可以在构造的过程中设置到本身的消息类型转化器,并且在使用配置方法的时候设置到RabbitTemplate中,实现自定义消息序列化方法。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Configuration
    public class RabbitMqConfig {
    /**
    * 设置消息类型转化器
    * @return jackson的消息类型转化器
    */
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
    }
    }

序列化成Json的消息内容

3.2.2 监听消息内容 @RabbitListener 方法 + 类上

必须开启@EnableRabbit、队列必须存在

  1. 方法上,表明当前方法处理消息队列中某种消息

  2. 类上,表明当前类监听这个消息队列,结合@RabbitHandler进行处理不同消息类型(重载方法:white_check_mark:)

1
@RabbitListener(queues = {"队列名称",....})//监听多个队列
  1. 监听消息

    监听消息参数类型:

    1. OBject
    2. Message(org.springframework.amqp.core.Message):包含消息的详细信息
    3. 写目标需要的消息类型(Spring会自动转化)
    4. Channel(com.rabbitmq.client):当前传输数据的通道
    1
    2
    3
    4
    @RabbitListener(queues = {"java.queue"})
    public void reviveMsg(Object msg) {
    System.out.println("接受到消息......内容是:" + msg + " 类型是:" + msg.getClass());
    }
  2. 发送消息

    1
    2
    3
    4
    5
    6
    7
    @Test
    void sendMsg() {
    //发送对象
    OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(11L, "退货了", 1, 0, new Date());
    rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", orderReturnReasonEntity);
    log.debug("对象消息发送成功!" + orderReturnReasonEntity);
    }
  3. 查看监听获取的消息内容

    1.Object接收结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    接受到消息......内容是:
    (
    Body:'[B@27942ebe(byte[75])' //消息体内容(字节数组未打印)
    MessageProperties [
    headers={__TypeId__=com.gulimall.gulimallorder.entity.OrderReturnReasonEntity},
    contentType=application/json,
    contentEncoding=UTF-8,
    contentLength=0,
    receivedDeliveryMode=PERSISTENT,
    priority=0,
    redelivered=false,
    receivedExchange=javaDirectExchange,
    receivedRoutingKey=java.queue,
    deliveryTag=1,
    consumerTag=amq.ctag-6neAUPCvXJWN4PenBct0jQ,
    consumerQueue=java.queue
    ]
    )
    类型是:
    class org.springframework.amqp.core.Message

    2.Message接受结果(上述相同)

    3.目标消息对象OrderReturnReasonEntity

    1
    2
    3
    4
    5
    6
    7
    8
    9
    接受到消息......内容是:
    OrderReturnReasonEntity(
    id=11,
    name=退货了,
    sort=1,
    status=0,
    createTime=Thu Jun 16 16:02:39 CST 2022
    )
    类型是:class com.gulimall.gulimallorder.entity.OrderReturnReasonEntity

    Queue:队列多人监听,只要收到,队列删除消息,并且每次只能有一个人消费

    分布式场景下:

    1. 多个微服务监听一个消息队列,这个时候谁会消费掉消息?还是都可以收到消息?:同一个消息只能一个客户端接收到
    1. 收到消息,进行业务处理(高耗时)--此时能否正常接受其他消息?:消息处理完成一个,才会接受下一个消息进行处理!
    

3.2.3 监听消息 @RabbitHandler 方法上(Linstener类 + Handler方法 处理不同消息)

标注@RabbitHandler类表明监听哪个队列,在类中方法中标注@RabbitHandler可以根据不同消息类型进行处理消息

接受消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* <p>
* Description: 消息队列接受测试
* </p>
* <p>PackageName: com.gulimall.gulimallorder.service.impl</p>
* <p>ClassName: RabbitServiceHandler</p>
*
* @author <a href="mail to: byz0825@outlook.com" rel="nofollow">BaiYZ</a>
* @since 2022-06-16 16:18:29
*/
@Service
@RabbitListener(queues = {"java.queue"})
public class RabbitServiceHandler {
@RabbitHandler
public void receiveMsg(OrderEntity orderEntity){
System.out.println(orderEntity);
}
@RabbitHandler
public void receiveMsg(OrderReturnReasonEntity orderReturnReasonEntity){
System.out.println(orderReturnReasonEntity);
}
}

发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void sendMsg() {
//发送orderReturnReasonEntity对象
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity(11L, "退货了", 1, 0, new Date());
rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", orderReturnReasonEntity);
log.debug("OrderReturnReasonEntity对象消息发送成功!" + orderReturnReasonEntity);
OrderEntity orderEntity = new OrderEntity();
//发送orderReturnReasonEntity对象
orderEntity.setBillReceiverEmail("ssafsfafsfa");
rabbitTemplate.convertAndSend("javaDirectExchange", "java.queue", orderEntity);
log.debug("OrderEntity对象消息发送成功!" + orderEntity);
}