RocketMq顺序消费

2023-05-10,,

部分内容出处   https://www.jianshu.com/p/453c6e7ff81c

rocketmq内部有4个默认的队里,在发送消息时,同一组的消息需要按照顺序,发送到相应的mq中,同一组消息按照顺序进行消费,不同组的消息可以并行的进行消费。

下面看一下producer的代码:

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException; import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List; /**
* @author : Jixiaohu
* @Date : 2018-04-19.
* @Time : 9:20.
* @Description :
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
String groupName = "order_producer";
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
producer.start(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
String dateStr = sdf.format(new Date());
try {
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 0); //0是队列的下标
System.out.println(sendResult);
}
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 1); //1是队列的下标
System.out.println(sendResult);
}
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 2); //2是队列的下标
System.out.println(sendResult);
}
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 3); //3是队列的下标
System.out.println(sendResult);
}
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 4); //4是队列的下标
System.out.println(sendResult);
}
for (int i = 1; i <= 3; i++) {
String body = dateStr + "Hello RoctetMq : " + i;
Message msg = new Message("Topic1", "Tag1", "Key" + i,
body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
return list.get(id);
}
}, 5); //5是队列的下标
System.out.println(sendResult);
}
} catch (RemotingException e) {
e.printStackTrace();
Thread.sleep(1000);
}
producer.shutdown();
}
}

这边发送多组消息,每组消息的顺序分别为1,2,3,

下面查看consumer1,和consumer2,因为要顺序消费,需要注意的是,这两个消费者的监听器是MessageListenerOrderly,两个的代码一样,我这边就只展示一个consumer的代码

package com.alibaba.rocketmq.example.message.order;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit; /**
* @author : Jixiaohu
* @Date : 2018-04-23.
* @Time : 9:35.
* @Description : 顺序消息消费
*/
public class Consumer1 { public Consumer1() throws Exception {
String groupName = "order_producer";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
/**
* 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
* 非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅的主题,以及过滤的标签内容
consumer.subscribe("Topic1", "*");
//注册监听
consumer.registerMessageListener(new Listener());
consumer.start();
System.out.println("Consumer1 Started.");
} class Listener implements MessageListenerOrderly { private Random random = new Random(); @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : list) {
System.out.println(msg + ",context" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(random.nextInt(1));
} catch (InterruptedException e) {
e.printStackTrace();
} return ConsumeOrderlyStatus.SUCCESS;
}
} public static void main(String[] args) throws Exception {
new Consumer1();
} }

还是按照先启动consumer的顺序,在启动producer的顺序。

这边看一下控制台的信息

总共6组消息,broker-a上接收到4组消息,broker-b上接收到2组消息,同一组的3条消息会发送到同一个broker的同一个队列中,这样才能保证顺序消费,

下面看一下consumer1和consumer2的打印结果

由于顺序消费只能单线程,一个线程只能去一个队列获取数据。

可以看到,这边的queueid都是3个 3个打印,不会出现交替,下面看一下一组消息的消费顺序,

可以看到,消息是按照发送的顺序,进行消费,consumer2的打印结果也是类似的,不过consumer2消费了6条消息。

这样就实现了rocket的顺序消费,虽然实现了顺序消费,由于网络通信,会存在着重复数据的问题,

重复数据的问题,rocket不提供解决方案,让业务方自行解决,主要有两个方法:

    消费端处理消息的业务逻辑保持幂等性
    保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

下面把consumer修改成多线程的模式,再次查看一下运行的结果:

package com.alibaba.rocketmq.example.message.thread;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit; /**
* @author : Jixiaohu
* @Date : 2018-04-23.
* @Time : 9:35.
* @Description : 顺序消息消费
*/
public class Consumer { public Consumer() throws Exception {
String groupName = "order_producer";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
/**
* 设置Consumer第一次启动是从队列头开始消费还是队列尾开始消费
* 非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //最小线程数
consumer.setConsumeThreadMin(10); //最大线程数
consumer.setConsumeThreadMax(20); //订阅的主题,以及过滤的标签内容
consumer.subscribe("Topic1", "*");
//注册监听
consumer.registerMessageListener(new Listener());
consumer.start();
System.out.println("Consumer Started.");
} class Listener implements MessageListenerConcurrently { private Random random = new Random(); @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { for (MessageExt msg : list) {
System.out.println(msg + ",context" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(random.nextInt(1));
} catch (InterruptedException e) {
e.printStackTrace();
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} public static void main(String[] args) throws Exception {
new Consumer();
} }

同样先启动consumer,在启动producer,查看一下打印结果:

从打印结果,可以看出consumer消费并不能保证严格的顺序,多线程和顺序,只能保证其中的一个。producer能保证消息发送的顺序,不能保证消息消费的顺序,要保证消息消费的顺序,consumer端必须实现 MessageListenerOrderly 接口。

RocketMq顺序消费的相关教程结束。

《RocketMq顺序消费.doc》

下载本文的Word格式文档,以方便收藏与打印。