RabbitMQ通过TTL和DLX实现延时队列

2022-07-30,,,

RabbitMQ实现延时队列

  • 一、介绍
    • 1.TTL
      • 如何设置TTL(2种方式):
    • 2.Dead Letter Exchanges
  • 二、实现延时队列的思路
  • 三、SpringBoot+RabbitMQ实现延时队列
    • 1.RabbitMQConfig配置类
    • 2.消费者类
    • 3.生产者
    • 测试

一、介绍

RabbitMQ本身是没有直接支持延迟队列功能,但是可以通过TTL和DLX模拟出延迟队列的功能。 通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX)

1.TTL

TTL是MQ中一个消息或者队列的属性,表明一条消息或者队列中所有消息或者队列的最大存活时间,单位是毫秒。如果一条消息设置了TTL属性,或者进入了设置TTL的队列,如果这条消息在TTL内的时间未被消费则该条消息则变成死信,如果配置了消息的TTL和队列的TTL则较小的那个值会被使用。
消息的TTL才是实现延迟任务的关键。

如何设置TTL(2种方式):

1.一种是创建队列的时候设置队列的“x-message-ttl”属性。
2.另一种是针对每条消息设置TTL。

注意
第一种如果设置了队列的TTL,如果消息过期则被进入死信队列;
而第二种即使消息过期也不会马上被丢弃, 因为消息是否过期是在即将投递到消费者之前被判定的。此外,如果不设置TTL则表示消息永远不会过期,消息过期则变成死信。

2.Dead Letter Exchanges

DLX就是死信交换机。
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
②. 上面的消息的TTL到了,消息过期了。
③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列(即延时队列)中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

注意
即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

二、实现延时队列的思路

延时队列,就是想要消息延迟多久被处理。
TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

三、SpringBoot+RabbitMQ实现延时队列

1.RabbitMQConfig配置类

创建延时队列、死信队列、延时交换机、死信交换机
我们使用一个普通的队列来当做延时队列,设置TTL,当消息过期后会进入死信队列。

  1. 将延时队列与延时交换机进行绑定并设置路由key
  2. 将死信队列与私信交换机进行绑定bing设置路由key
  3. 并在延时队列中新建一个Map,设置当前队列绑定的死信交换机、当前队列的死信路由key、队列的TTL等参数。

视图:

 /**
     * 实现延时队列的思路:
     * 生产者生产消息
     * 通过 延时队列路由key  经过  延时交换机   发送到  延时队列
     * 延时队列中的消息过期以后
     * 通过 死信队列路由key 经过  死信交换机  进入死信队列
     * 消费者监听死信队列进行消费
     */ // 延时交换机 @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange("delayExchange"); } // 死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange("deadLetterExchange"); } /**
     * 死信队列A            用于接收延时10s处理的消息
     * @return
     */ @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue("deadLetterQueueA"); } /**
     * 延时队列A        延时6s
     * 并绑定到对应的死信交换机
     * @return
     */ @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", "deadLetterExchange"); // x-dead-letter-routing-key  这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", "deadLetterQueueAroutingkey"); // x-message-ttl  声明队列的TTL  单位是毫秒 1000*6 args.put("x-message-ttl", 6000); return QueueBuilder.durable("delayQueueA").withArguments(args).build(); } /**
     * 将延时队列A与延时交换机绑定   并指定延时队列路由
     * @param queue
     * @param exchange
     * @return
     */ @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("delayQueueAroutingkey"); } /**
     * 将死信队列 与 死信交换机绑定   指定死信队列路由
     * @param queue
     * @param exchange
     * @return
     */ @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("deadLetterQueueAroutingkey"); } 

2.消费者类

消费者直接监听死信队列即可!死信队列中的消息就是延时的消息。

@Component @Slf4j public class RabbitMQConsumer { @RabbitListener(queues = "deadLetterQueueA") public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } 

3.生产者

在控制层调用RabbitTemplate发送消息到延时队列。

@RestController public class RabbitController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/delayQueue") public void test(){ rabbitTemplate.convertAndSend("delayExchange","delayQueueAroutingkey",LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); System.out.println("延时队列已发送"); } } 

测试

测试成功

参考文章【RabbitMQ】一文带你搞定RabbitMQ延迟队列
这篇文章写得太好了!致敬!

本文地址:https://blog.csdn.net/DreamsArchitects/article/details/108268770

《RabbitMQ通过TTL和DLX实现延时队列.doc》

下载本文的Word格式文档,以方便收藏与打印。