且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Python3 通过 pika 连接 RabbitMQ 的基本用法

更新时间:2022-04-03 19:37:16

【RabbitMQ 服务器】

1
2
3
4
5
6
# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定
Broker: 192.168.0.xx
virtual host: vhosttest
Exchange: exchangetest 
Queue: queuetest 
Routing key: rkeytest


【Python 环境】

1
2
3
OS: Windows 10
Python: 3.6.3 x64
pika: 0.11.2


【查看队列状态】

1
2
3
4
5
6
7
8
9
# 通过浏览器查看队列状态
http://192.168.0.xx:15672/api/queues/vhosttest/queuetest 
 
# 通过命令行查看队列状态
curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq
 
# 通过命令行查看队列长度
curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \
    jq '.backing_queue_status.len'


【send.py】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 发送方/生产者
 
import os, sys, time
import pika
 
def Main():
    credentials = pika.PlainCredentials("test""test")
    parameters = pika.ConnectionParameters(host="192.168.0.xx"
                                            virtual_host='vhosttest'
                                            credentials=credentials)
    connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
 
    channel = connection.channel()          # 创建频道
     
    queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
     
    while True:  # 循环向队列中发送信息
        message = time.strftime('%H:%M:%S', time.localtime())
        channel.basic_publish(exchange='exchangetest',  
                                routing_key='rkeytest',
                                body=message)
         
        print('send message: %s' %  message)     
 
        while True:   
            # 检查队列,以重新得到消息计数
            queue = channel.queue_declare(queue='queuetest', passive=True)     
            messageCount = queue.method.message_count
            print('messageCount: %d' % messageCount)
            if messageCount < 100:
                break
            time.sleep(1)
     
    # 关闭连接
    connection.close()
 
if __name__ == '__main__':
    Main()


【recv.py】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#encoding: utf-8
#author: walker
#date: 2018-01-31
#summary: 接收方/消费者
 
import os, sys, time
import pika
 
# 接收处理消息的回调函数
def ConsumerCallback (channel, method, properties, body):
    print("Received %s" % body)
 
 
def Main():
    credentials = pika.PlainCredentials("test""test")
    parameters = pika.ConnectionParameters(host="192.168.0.xx"
                                            virtual_host='vhosttest'
                                            credentials=credentials)
    connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ
     
    channel = connection.channel()          # 创建频道
     
    queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列
     
    # no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面
    # no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式
    channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)
    print('Wait Message ...')
     
    channel.start_consuming()
 
if __name__ == '__main__':
    Main()


【相关阅读】

本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/2067244如需转载请自行联系原作者

RQSLT