Spring RabbitMQ 延迟队列

2023-05-27,,

一、说明

在实际业务场景中可能会用到延时消息发送,例如异步回调失败时的重发机制。 RabbitMQ本身不具有延时消息队列的功能,但是可以通过rabbitmq-delayed-message-exchange来实现(也可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现,我们主要讲解通过延迟插件来实现的方法)。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。

二、安装插件

RabbitMQ的安装请参考我的文章“RabbitMQ安装与使用”,这里我们重点讲插件的安装。

首先到http://www.rabbitmq.com/community-plugins.html网页下载适合的“rabbitmq_delayed_message_exchange插件”。下载完成后将它放到RabbitMQ插件安装目录({rabbitmq-server}/plugins/),然后执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange启用插件,执行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange也可以关闭插件。具体过程可以查看参考文档2。

三、spring集成RabbitMQ

1、maven配置

    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>1.6.6.RELEASE</version>
    <exclusions>
    <exclusion>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.1.6.RELEASE</version>
    </exclusion>
    </exclusions>
    </dependency>
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.6.6.RELEASE</version>
    <exclusions>
    <exclusion>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.1.6.RELEASE</version>
    </exclusion>
    <exclusion>
    <groupId>org.springframework</groupId>
    <artifactId>spring-messaging</artifactId>
    <version>4.1.6.RELEASE</version>
    </exclusion>
    <exclusion>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>4.1.6.RELEASE</version>
    </exclusion>
    <exclusion>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.1.6.RELEASE</version>
    </exclusion>
    </exclusions>
    </dependency>

说明:实现延迟队列需要Spring在4.1以上,spring-amqp在1.6以上。

2、xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">
    <context:property-placeholder location="classpath:rmq-config.properties" ignore-unresolvable="true"/>
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="${rabbitmq.host}" />
    <property name="port" value="${rabbitmq.port}" />
    <property name="username" value="${rabbitmq.username}" />
    <property name="password" value="${rabbitmq.password}" />
    <property name="channelCacheSize" value="${rabbitmq.channel.cacheSize}" />
    </bean>
    <bean id="orderConsumer" class="com.xxx.rmq.OrderConsumer"></bean>
    <bean id="messageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter" />
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <rabbit:admin connection-factory="connectionFactory" />
    <!-- 延迟消息start -->
    <rabbit:topic-exchange name="delay_exchange" delayed="true">
    <rabbit:bindings>
    <rabbit:binding queue="delay_queue" pattern="order.delay.notify" />
    </rabbit:bindings>
    </rabbit:topic-exchange>
    <rabbit:queue name="delay_queue" durable="true" auto-declare="true" auto-delete="false" />
    <rabbit:template id="delayMsgTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" exchange="delay_exchange" />
    <rabbit:listener-container connection-factory="connectionFactory" channel-transacted="false" acknowledge="auto" message-converter="jsonMessageConverter">
    <rabbit:listener queues="delay_queue" ref="orderConsumer" method="delayMsg" />
    </rabbit:listener-container>
    <!-- 延迟消息end -->
    </beans>

说明:spring-rabbit-1.6.xsd必须是1.6及以上版本,否则会报“元素 'rabbit:topic-exchange' 中不允许出现属性 'delayed'”错误。具体请查看参考文档3。

四、延迟队列的使用

1、发送消息Producer

    import net.sf.json.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    /**
    *
    * @author Horace
    * @version 创建时间:2016年10月26日 下午6:34:31
    */
    @Service
    public class MessageProducerServiceImpl implements MessageProducerService{
    @Autowired
    private AmqpTemplate delayMsgTemplate;
    @Override
    public void delayMsg(JSONObject msg,int delay) {
    // TODO Auto-generated method stub
    final int xdelay= delay*1000;
    delayMsgTemplate.convertAndSend("order.delay.notify", (Object) msg,
    new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message)
    throws AmqpException {
    // TODO Auto-generated method stub
    message.getMessageProperties().setDelay(xdelay);
    return message;
    }
    });
    }
    }

2、异步接收消息Consumer

    import net.sf.json.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    /**
    *
    * @author Horace
    * @version 创建时间:2016年10月26日 下午2:48:14
    */
    public class OrderConsumer {
    private static Logger logger = LoggerFactory.getLogger(OrderConsumer.class);
    @Autowired
    private MessageProducerService messageProducerService;
    public void delayMsg(Object obj) {
    logger.info("[延时消息]" + obj);
    if (obj != null) {
    JSONObject notifyJson = JSONObject.fromObject(obj);
    String notifyUrl = notifyJson.getString("notifyUrl");
    String notifyContent = notifyJson.getString("notifyContent");
    String result = HttpUtil.postMessage(notifyUrl, notifyContent);
    if (StringUtils.isBlank(result)) { // 通知失败 进入重发机制
    int newNotifyCount = notifyJson.getInt("notifyCount") + 1; //已经通知的次数
    if (newNotifyCount < 5) {
    notifyJson.put("notifyCount", newNotifyCount);
    int spacingInterval = getSpacingInterval(newNotifyCount);
    messageProducerService
    .delayMsg(notifyJson, spacingInterval);
    } else {
    logger.info("通知5次都失败,等待后台手工处理!");
    }
    }
    }
    }
    /**
    * 重复通知间隔时间(单位为秒)
    * @param notifyCount 已经通知的次数
    * @return
    */
    private int getSpacingInterval(int notifyCount) {
    // TODO Auto-generated method stub
    int spacingInterval = 0;
    switch (notifyCount) {
    case 1:
    spacingInterval = 10;
    break;
    case 2:
    spacingInterval = 20;
    break;
    case 3:
    spacingInterval = 30;
    break;
    case 4:
    spacingInterval = 60;
    break;
    case 5:
    spacingInterval = 90;
    break;
    default:
    break;
    }
    return spacingInterval;
    }
    }

Spring RabbitMQ 延迟队列的相关教程结束。

《Spring RabbitMQ 延迟队列.doc》

下载本文的Word格式文档,以方便收藏与打印。