Python之RabbitMQ操作

2023-06-12,,

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

实现的协议:AMQP。
 
术语(Jargon)
 
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
 
RabbitMQ安装

 安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang
$ yum -y install erlang 安装RabbitMQ
$ yum -y install rabbitmq-server

安装rabbitmq API

 pip install pika
or
easy_install pika
or
源码 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

 #!/usr/bin/env python
import pika # ######################### 生产者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
 #!/usr/bin/env python
import pika # ########################## 消费者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

1、acknowledgment 消息不丢失(订阅端消息不丢失)

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 channel.basic_consume(callback,queue='hai',no_ack=False)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world')
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

2、durable   消息不丢失(服务端消息不丢失)

 # time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 channel.basic_consume(callback,queue='hai',no_ack=True)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world',
properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
channel.basic_consume(callback,queue='hai',no_ack=True)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world',
properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

4、发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
message=''.join(sys.argv[1:]) or "info: Hello Wrold" channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
print('[x] sent %r'%message)
connection.close()

发布者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
queue_name=result.method.queue channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch,method,properties,body):
print('[x] %r' %body) channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息 channel.start_consuming()

订阅者

5、关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.138'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
#message = ' '.join(sys.argv[2:]) or 'Hello World!'
severity='info'
message='test'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

发布者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.8'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct')#设置exchange类型为direct result = channel.queue_declare(exclusive=True) #创建随机队列
queue_name = result.method.queue # severities = sys.argv[1:]
# if not severities:
# sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
# sys.exit(1)
severities=['error']
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)#绑定关键字 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅在1

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
#!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.8'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare( )
#声明queue,确认要从中接收message的queue
#queue_declare函数是幂等的,可运行多次,但只会创建一次
#若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
#但在producer和consumer中重复声明queue是一个好的习惯
#例如: channel.queue_declare(queue='hello')
queue_name = result.method.queue # severities = sys.argv[1:]
# if not severities:
# sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
# sys.exit(1)
severities=['error','info']
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅在2

6、模糊匹配

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

# 表示可以匹配 0 个 或 多个 单词
*  表示只能匹配 一个 单词

 #!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅者

 #!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

发布者

注:

订阅/发布Demo
 
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
 
Exchanges
 
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
 

exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列
 

Python之RabbitMQ操作的相关教程结束。

《Python之RabbitMQ操作.doc》

下载本文的Word格式文档,以方便收藏与打印。