消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式、消费者(接收方)自行确认模式、生产者(发送方)确认模式)

2023-03-10,,

准备工作:

1)安装RabbitMQ,参考文章:消息中间件系列二:RabbitMQ入门(基本概念、RabbitMQ的安装和运行)

2.)分别新建名为OriginalRabbitMQProducer和OriginalRabbitMQConsumer的maven工程

在pom.xml文件里面引入如下依赖:

    <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>

说明:5系列的版本最好使用JDK8及以上, 低于JDK8可以使用4.x(具体的版本号到Maven的中央仓库查)的版本

一、消费者(接收方)自动确认模式

 前面有谈到消费者收到的每一条消息都必须进行确认,消息的确认机制分为自动确认和消费者自行确认。下面我们来看一下自动确认的示例:

示例1:交换器是direct

1. 在工程OriginalRabbitMQProducer新建一个一个direct的生产者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是direct的生产者 路由键完全匹配时,消息才投放到对应队列
* @author leeSmall
* @date 2018年9月15日
*
*/
public class DirectProducer { //定义交换器的名字
private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的
//factory.setUsername(..); //设置连接断开 这里使用缺省的
//factory.setPort(); //设置虚拟主机 这里使用缺省的
//factory.setVirtualHost(); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //定义一组路由键
String[]routingKeys = {"error","info","warning"}; //发布消息的交换器上
for(int i=0;i<3;i++){
//路由键
String routingKey = routingKeys[i]; //要发送的消息
String message = "Hello world_"+(i+1); /**
* 发送消息到交换器上
* 参数1:交换器的名字
* 参数2:路由键
* 参数3:BasicProperties
* 参数4:要发送的消息
*/
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("Sent "+routingKey+":"+message); } channel.close();
connection.close(); } }

2. 在工程OriginalRabbitMQConsumer新建一个direct的只消费error日志的消费者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是direct 只消费error日志的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ConsumerError { //定义交换器的名字
private static final String EXCHANGE_NAME = "direct_logs";
// private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明随机队列
String queueName = channel.queueDeclare().getQueue(); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
     //9.自动确认:autoAck参数为true
channel.basicConsume(queueName,true,consumerB);
} }

3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

启动消费者ConsumerError:

启动生产者DirectProducer:

查看消费者ConsumerError现在的状况:

可以看到消费者只消费了error级别的消息,这是因为在direct模式下,消费者只定义了路由键routingKey = "error";

4. 在工程OriginalRabbitMQConsumer新建一个direct的消费所有日志的消费者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是direct 消费所有日志的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ConsumerAll {
//定义交换器的名字
private static final String EXCHANGE_NAME = "direct_logs";
// private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException { //1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明随机队列
String queueName = channel.queueDeclare().getQueue(); //6.定义一组路由键消费所有日志
String[]routingKeys = {"error","info","warning"}; //7.队列通过路由键绑定到交换器上
for(String routingKey:routingKeys){
//队列和交换器的绑定
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
}
System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerA = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
//9.自动确认:autoAck参数为true
channel.basicConsume(queueName,true,consumerA);     }   }

5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

启动消费者ConsumerAll:

启动生产者DirectProducer:

查看消费者ConsumerAll的状态:

 可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在direct模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

示例2:交换器是fanout

1. 在工程OriginalRabbitMQProducer新建一个交换器是fanout的生产者

package study.demo.normal;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是fanout的生产者 可以理解为广播,会把所有消息投放到绑定到这个交换器上的队列上
* @author leeSmall
* @date 2018年9月15日
*
*/
public class FanoutProducer { private final static String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的
//factory.setUsername(..); //设置连接断开 这里使用缺省的
//factory.setPort(); //设置虚拟主机 这里使用缺省的
//factory.setVirtualHost(); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //定义一组路由键
String[]routingKeys = {"error","info","warning"}; //发布消息的交换器上
for(int i=0;i<3;i++){
//路由键
String routingKey = routingKeys[i]; //要发送的消息
String message = "Hello world_"+(i+1); /**
* 发送消息到交换器上
* 参数1:交换器的名字
* 参数2:路由键
* 参数3:BasicProperties
* 参数4:要发送的消息
*/
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());
System.out.println("Sent "+routingKey+":"+message); } channel.close();
connection.close(); } }

2. 在工程OriginalRabbitMQConsumer新建一个fanout的只消费error日志的消费者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是fanout,只消费error日志的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ConsumerError { //定义交换器的名字
//private static final String EXCHANGE_NAME = "direct_logs";
private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.声明随机队列
String queueName = channel.queueDeclare().getQueue(); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey); System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
     //9.自动确认:autoAck参数为true
     channel.basicConsume(queueName,true,consumerA); } }

3. 启动消费者ConsumerError,再启动生产者DirectProducer,查看效果

启动消费者ConsumerError:

启动生产者FanoutProducer:

查看消费者ConsumerError现在的状况:

可以看到消费者消费了所有级别的消息,这是因为在fanout模式下,虽然消费者只定义了路由键routingKey = "error",但是因为fanut是广播模式,会把所有消息投放到绑定到这个交换器上的队列上

4. 在工程OriginalRabbitMQConsumer新建一个fanout的消费所有日志的消费者

package study.demo.normal;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 交换器是fanout 消费所有日志的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ConsumerAll {
//定义交换器的名字
//private static final String EXCHANGE_NAME = "direct_logs";
private static final String EXCHANGE_NAME = "fanout_logs_1"; public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException { //1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.声明随机队列
String queueName = channel.queueDeclare().getQueue(); //6.定义一组路由键消费所有日志
String[]routingKeys = {"error","info","warning"}; //7.队列通过路由键绑定到交换器上
for(String routingKey:routingKeys){
//队列和交换器的绑定
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
}
System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerA = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
}
};
     //9.自动确认:autoAck参数为true
     channel.basicConsume(queueName,true,consumerA); }
}

5. 启动消费者ConsumerAll,再启动生产者DirectProducer,查看效果

启动消费者ConsumerAll:

启动生产者FanoutProducer:

查看消费者ConsumerAll的状态:

 可以看到消费者ConsumerAll消费了生产者DirectProducer产生的所有消息,这是因为在fanout模式下,消费者定义了和生产者一样个数的路由键String[]routingKeys = {"error","info","warning"};

三、消费者(接收方)自行确认模式

1. 在工程OriginalRabbitMQProducer新建一个消费者自行确认生产者

package study.demo.consumerconfirm;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 消费者自行确认生产者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ConsumerConfirmProducer { //交换器
private final static String EXCHANGE_NAME = "direct_cc_confirm_1"; //路由键
private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException,
InterruptedException { //1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的
//factory.setUsername(..); //设置连接断开 这里使用缺省的
//factory.setPort(); //设置虚拟主机 这里使用缺省的
//factory.setVirtualHost(); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //发布消息的交换器上
for(int i=0;i<10;i++){
//要发送的消息
String message = "Hello world_"+(i+1); /**
* 发送消息到交换器上
* 参数1:交换器的名字
* 参数2:路由键
* 参数3:BasicProperties
* 参数4:要发送的消息
*/
channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,message.getBytes());
System.out.println("Sent "+ROUTE_KEY+":"+message); } channel.close();
connection.close(); } }

2. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认消费者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 消费者自行确认消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ClientConsumerAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列
String queueName = "consumer_confirm";
channel.queueDeclare(queueName,false,false,
false,null); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
//消费者自行确认
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}; //9.消费者自行确认:autoAck参数为false
channel.basicConsume(queueName,false,consumerB);
} } 

3. 在工程OriginalRabbitMQConsumer新建一个消费者自行确认休眠不回复ack的消费者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 消费者自行确认休眠不回复ack的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ClientConsumerSlowAck { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列
String queueName = "consumer_confirm";
channel.queueDeclare(queueName,false,false,
false,null); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
//消费者自行确认时不回复ack,一直休眠
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body,"UTF-8");
System.out.println("Accept:"+envelope.getRoutingKey()+":"+message);
//this.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}; //9.消费者自行确认:autoAck参数为false
channel.basicConsume(queueName,false,consumerB);
} } 

4. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

启动消费者自行确认消费者ClientConsumerAck:

启动消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck

启动消费者自行确认生产者ConsumerConfirmProducer

查看消费者自行确认消费者ClientConsumerAck的状态:

查看消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck的状态:

查看RabbitMQ服务器上的队列情况:

可以看到队列consumer_confirm里面有5条消息未消费,这是因为消费者自行确认休眠不回复ack的消费者ClientConsumerSlowAck收到了这5条消息,但是没有向RabbitMQ服务器发送确认消息,RabbitMQMQ认为这5条消息还没有被消费就一直存在队列里面

下面停掉ClientConsumerSlowAck,查看ClientConsumerAck和RabbitMQ服务器里面队列consumer_confirm的状态

可以看到停掉ClientConsumerSlowAck以后,之前的5条消息被ClientConsumerAck消费了

5. 在工程OriginalRabbitMQConsumer 新建一个消费者自行确认拒绝消息的消费者

package study.demo.consumerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 消费者自行确认拒绝消息的消费者
* @author leeSmall
* @date 2018年9月15日
*
*/
public class ClientConsumerReject { private static final String EXCHANGE_NAME = "direct_cc_confirm_1"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列
String queueName = "consumer_confirm";
channel.queueDeclare(queueName,false,false,
false,null); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("Waiting message......."); //8.设置一个监听器监听消费消息
Consumer consumerB = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
//消费者自行拒绝消息 参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息
this.getChannel().basicReject(envelope.getDeliveryTag(),true);
System.out.println("Reject:"+envelope.getRoutingKey()
+":"+new String(body,"UTF-8"));
}
}; //9.消费者自行确认:autoAck参数为false
channel.basicConsume(queueName,false,consumerB);
} } 

6. 分别启动消费者自行确认消费者ClientConsumerAck和消费者自行确认拒绝消息的消费者ClientConsumerReject,再启动消费者自行确认生产者ConsumerConfirmProducer查看状态

启动消费者自行确认消费者ClientConsumerAck:

启动消费者自行确认拒绝消息的消费者ClientConsumerReject:

启动消费者自行确认生产者ConsumerConfirmProducer

查看消费者自行确认消费者ClientConsumerAck的状态:

查看消费者自行确认拒绝消息的消费者ClientConsumerReject的状态:

可以看到消息都被ClientConsumerAck消费了,这是因为消费者ClientConsumerReject拒绝了所有消息,这里要注意

this.getChannel().basicReject(envelope.getDeliveryTag(),true);

这段代码的basicReject的第二个参数requeue,参数requeue=true,让RabbitMQ服务器重新分发消息,requeue=false让RabbitMQ服务器移除消息

 requeue=false时,RabbitMQ服务器会删掉被ClientConsumerReject拒绝的消息,消费者ClientConsumerAck就不能消费所有消息了

四、生产者(发送方)确认模式

为什么要有个发送方确认模式?

生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。
AMQP协议层面为我们提供的事务机制解决了这个问题,但是事务机制本身也会带来问题:
1)严重的性能问题
2)使生产者应用程序产生同步
RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性能影响几乎可以忽略不计。

1. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认同步模式的类

package study.demo.producerconfirm;

import java.io.IOException;
import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
*
* @Description: 生产者(发送方)确认同步模式
* @author leeSmall
* @date 2018年9月16日
*
*/
public class ProducerConfirm { private final static String EXCHANGE_NAME = "producer_confirm";
private final static String ROUTE_KEY = "error"; public static void main(String[] args) throws IOException, TimeoutException,
InterruptedException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的
//factory.setUsername(..); //设置连接断开 这里使用缺省的
//factory.setPort(); //设置虚拟主机 这里使用缺省的
//factory.setVirtualHost(); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel();
//将信道设置为发送方确认
channel.confirmSelect(); //发布消息的交换器上
for(int i=0;i<2;i++){
String msg = "Hello "+(i+1);
channel.basicPublish(EXCHANGE_NAME,ROUTE_KEY,null,msg.getBytes());
//等待RabbitMQ返回消息确认消息已送达RabbitMQ服务器
if (channel.waitForConfirms()){
System.out.println("发送方同步确认: "+ROUTE_KEY+":"+msg);
}
} // 关闭频道和连接
channel.close();
connection.close();
} }

2. 在OriginalRabbitMQProducer工程新建一个生产者(发送方)确认异步模式的类

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 生产者(发送方)确认异步模式
* @author leeSmall
* @date 2018年9月16日
*
*/
public class ProducerConfirmAsync { private final static String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] args) throws IOException, TimeoutException,
InterruptedException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory(); //设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //设置用户名 这里使用缺省的
//factory.setUsername(..); //设置连接断开 这里使用缺省的
//factory.setPort(); //设置虚拟主机 这里使用缺省的
//factory.setVirtualHost(); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //将信道设置为发送方确认
channel.confirmSelect(); //信道被关闭监听器 用于RabbitMQ服务器断线重连
//channel.addShutdownListener(); /**
* 生产者异步确认监听
* 参数deliveryTag代表了当前channel唯一的投递
* 参数multiple:false
*
*/
channel.addConfirmListener(new ConfirmListener() {
//RabbitMQ服务器确认收到消息
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("RabbitMQ服务器确认收到消息Ack deliveryTag="+deliveryTag
+"multiple:"+multiple);
} //RabbitMQ服务器由于自己内部出现故障没有收到消息
public void handleNack(long deliveryTag, boolean multiple)
throws IOException {
System.out.println("RabbitMQ服务没有收到消息Ack deliveryTag="+deliveryTag
+"multiple:"+multiple);
}
}); //生产者异步返回监听 这里和发布消息时的mandatory参数有关
//参数mandatory:mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,mandatory=false 丢弃消息(缺省)
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("msg:"+msg);
}
}); //声明一组路由键
String[] routingKeys={"error","info","warning"};
//发送消息到交换器上
for(int i=0;i<3;i++){
String routingKey = routingKeys[i%3];
// 发送的消息
String message = "Hello World_"+(i+1)+("_"+System.currentTimeMillis()); //通过路由键把消息发送到交换器上
//参数mandatory: mandatory=true,投递消息时无法找到一个合适的队列,把消息返回给生产者,
// mandatory=false 丢弃消息(缺省)
channel.basicPublish(EXCHANGE_NAME, routingKey, false,
null, message.getBytes());
System.out.println("----------------------------------------------------");
System.out.println(" Sent Message: [" + routingKey +"]:'"+ message + "'");
//sleep一下让程序不快速结束 可以看到RabbitMQ服务器的响应
Thread.sleep(1000);
} // 关闭信道和连接
channel.close();
connection.close();
} }

3. 在OriginalRabbitMQConsumer工程新建一个发送方确认消费者

package study.demo.producerconfirm;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException; /**
*
* @Description: 发送方确认消费者
* @author leeSmall
* @date 2018年9月16日
*
*/
public class ProducerConfirmConsumer { private static final String EXCHANGE_NAME = "producer_confirm"; public static void main(String[] argv) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置要连接的RabbitMQ服务器的地址
factory.setHost("127.0.0.1"); //2.通过连接工厂创建一个连接
Connection connection = factory.newConnection(); //3.通过连接创建一个信道 信道是用来传送数据的
Channel channel = connection.createChannel(); //4.通过信道声明一个交换器 第一个参数时交换器的名字 第二个参数时交换器的种类
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列
String queueName = "producer_confirm";
channel.queueDeclare(queueName,false,false,
false,null); //6.声明一个只消费错误日志的路由键error
String routingKey = "error"; //7.队列通过路由键绑定到交换器上
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("Waiting message......."); // 8.创建队列消费者 设置一个监听器监听消费消息
final Consumer consumerB = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println( "Received ["+ envelope.getRoutingKey() + "] "+message);
}
};
//9.消费者自动确认:autoAck参数为true
channel.basicConsume(queueName, true, consumerB);
} }

4. 启动发送方确认消费者ProducerConfirmConsumer,再分别启动生产者(发送方)确认同步模式的类ProducerConfirm和生产者(发送方)确认异步模式ProducerConfirmAsync

启动发送方确认消费者ProducerConfirmConsumer:

启动生产者(发送方)确认同步模式的类ProducerConfirm:

查看发送方确认消费者ProducerConfirmConsumer的状态:

启动生产者(发送方)确认异步模式ProducerConfirmAsync:

查看发送方确认消费者ProducerConfirmConsumer的状态:

示例代码获取地址

消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式、消费者(接收方)自行确认模式、生产者(发送方)确认模式)的相关教程结束。

《消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式、消费者(接收方)自行确认模式、生产者(发送方)确认模式).doc》

下载本文的Word格式文档,以方便收藏与打印。