rabbit Mq 实现定向消费,设置Ip白名单

2022-07-25,,,,

 

rabbit Mq 实现定向消费设置Ip白名单

初衷:为了在生产环境调试生产的问题。但是本地启动生产环境,就会产生一些不必要的问题。本地启动生产环境。就会有可能消费生产环境的消息。为了解决这一问题。我提出三种实现方案:

方案一:Mq和spring集成的时候,做Ip白名单限制。在启动项目的时候就会检测本地的Ip是否属于配置的白名单Ip段(缺点:就是只能围绕)
方案二:在mq send 的时候带上特定的Ip. 然后在消费端进行判断,如果消费端不属于Ip白名单,那么直接再次放进mq,或者说抛异常。(缺点:直接放回mq,做法不好,每次放入顶端,抛异常感觉不错,但是得把Mq配置成支持事务的方式))
方案三:需要在rabbit mq 后台管理系统上面配置用户,且需要rabbit.confg 里面配置固定的白名单Ip   

这三种方案:我更倾向于方案三。但是现实往往是不允许你去这样做。

方案一:代码启动之后会在mq manager管理系统里展现出来。但是会因为没有设置消费者就会报错。但是不会影响其他的业务逻辑。不妨碍你的测试。如果你想测试关于Mq的消费,那么你就只能把自己的Ip设置在白名单之内。

package com.rabbitmq;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author Kevin
 * @version DCG 1.0
 * @date 2020/12/24 14:25
 */
@Configuration
@PropertySource("classpath:rabbitmq.properties")
@ComponentScan
@Slf4j
@Data
public class RabbitMqConfig{
    @Value("${consumers.Ip}")
    private String consumersIp;
    @Value("${mq.exchange}")
    private String exchange;
    @Value("${mq.queueName}")
    private String queueName;
    @Value("${routkey.consumer}")
    private String routkeyConsumer;
    @Value("${routkey.producer}")
    private String routkeyProducer;
    @Value("${current.env}")
    private String env;
    @Value("${mq.Listener.prefetch}")
    private String prefetch;
    @Value("${mq.Listener.concurrency}")
    private String concurrency;
    @Autowired
    RabbitmqResultConsumer rabbitmqResultConsumer;  //具体实现的消费者

    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    public DirectExchange directExchange() {
        // 支持持久化,长期不用补删除
        return new DirectExchange(exchange, true, false);
    }


    public Queue mqQueue() {
        // 支持持久化
        return new Queue(queueName, true);
    }


    @Bean
    public ThreadPoolTaskExecutor getThreadPoolTask(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(50);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setQueueCapacity(10000);
        threadPoolTaskExecutor.setKeepAliveSeconds(300);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        return threadPoolTaskExecutor;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        try {
            //实例化容器
            container.setConnectionFactory(connectionFactory);
            //concurrency  设置消费者数量
            container.setConcurrentConsumers(Integer.valueOf(concurrency));
            //prefetch 限制每次发送多少数据
            container.setPrefetchCount(Integer.valueOf(prefetch));
            //自动确认,设置手动会让 消息重试失效,没有重试需求的随意
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //设置线程池
            container.setTaskExecutor(getThreadPoolTask());
            container.setQueueNames(queueName);
            rabbitAdmin(connectionFactory).declareBinding(BindingBuilder.bind(mqQueue()).to(directExchange()).with(routkeyConsumer));
            // 白名单,如果本地Ip是属于我们设置的白名单里的。那么就正常的注册成功。
            // 如果本地IP不属于Ip白名单范围内。那么就不设置消费者,那么就会停止这个消费者:Stopping container from aborted consumer
            if (!includeIp()){
//                throw new RuntimeException("Mq services are not within the scope of the MQ whitelist,Please contact administrator");
                log.error("The current address, MQ is not supported. Please contact the administrator");
            }else{
                container.setMessageListener(rabbitmqResultConsumer);
            }
        }catch (RuntimeException e){
            log.error("Mq service client error message:【{}】",e.getMessage());
        }
        return container;
    }

    /**
     * 判断某个Ip是否属于自己定义的Ip段
     * @param ip
     * @param cidr
     * @return
     */
    public static boolean isInRange(String ip, String cidr) {
        String[] ips = ip.split("\\.");
        int ipAddr = (Integer.parseInt(ips[0]) << 24)
                | (Integer.parseInt(ips[1]) << 16)
                | (Integer.parseInt(ips[2]) << 8) | Integer.parseInt(ips[3]);
        int type = Integer.parseInt(cidr.replaceAll(".*/", ""));
        int mask = 0xFFFFFFFF << (32 - type);
        String cidrIp = cidr.replaceAll("/.*", "");
        String[] cidrIps = cidrIp.split("\\.");
        int cidrIpAddr = (Integer.parseInt(cidrIps[0]) << 24)
                | (Integer.parseInt(cidrIps[1]) << 16)
                | (Integer.parseInt(cidrIps[2]) << 8)
                | Integer.parseInt(cidrIps[3]);

        return (ipAddr & mask) == (cidrIpAddr & mask);
    }

    private boolean includeIp(){
        boolean flag = false;
        try {
            String ip = InetAddress.getLocalHost().getHostAddress();
            List<String> consumersList = Arrays.asList(StringUtils.split(consumersIp, ","));
            for (String e:consumersList){
                if (isInRange(ip,e)){
                    flag =  true;
                }
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return flag;
    }
}

 classpath:rabbitmq.properties:

#环境 远程连接地址
#direct exchanges
mq.exchange=

#routeKey(发送端)
routkey.producer=routkey
#routeKey(接收端)
routkey.consumer=routkey.consumer

# remote Mq Ip List(Ip 白名单)
consumers.Ip=222.222.222.0/24

#队列名称
mq.queueName=queue_name
#限制每次发送一条数据。
mq.Listener.prefetch=1
#同一个队列启动几个消费者
mq.Listener.concurrency=1

创建自己的消费者:



/**
 * 消息模板队列消费者
 */
@Slf4j
@Component
public class RabbitmqDcgCalcResultConsumer implements ChannelAwareMessageListener{
	
	@Override
	public void onMessage(Message msg, Channel arg1) {
		
		} catch (Exception e) {
			log.error("消费计算结果的Message发生错误",e);
		} finally {
			
		}
	}
}

好了这次就写到这里。  如果想知道方案二的实现方式。那么就在下面留言。

本文地址:https://blog.csdn.net/qq_37228713/article/details/111991401

《rabbit Mq 实现定向消费,设置Ip白名单.doc》

下载本文的Word格式文档,以方便收藏与打印。