且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

RabbitMQ详解(二)

更新时间:2022-09-17 08:18:10

一、RabbitMQ架构浅析

二、收发“hello world”

三、任务分发机制


一、RabbitMQ架构浅析

1.MQ架构图  

RabbitMQ详解(二)  

RabbitMQ Server: 也叫broker server,是一种传输服务,维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。

Client A & B: 也叫Producer,数据的发送方。createmessages and publish (send) them to a broker server (RabbitMQ).

一个Message有两个部分:payload(有效载荷)和label(标签)。payload顾名思义就是传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,而且RabbitMQ也是通过这个label来决定把这个Message发给哪个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

Client 1,2,3:也叫Consumer,数据的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer。当然可能会把同一个Message发送给很多的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来说,它是不知道谁发送的这个信息的。就是协议本身不支持。但是当然了如果Producer发送的payload包含了Producer的信息就另当别论了。


2.channle和connection

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。

对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。 

 

3.ack确认机制

如果Message被某个consumer消费了,那么该Message就会被从queue中移除。//当然也可以让同一到个Message发送到很多Consumer

如果没有被任何consumer消费,那么这个Message会被Cache,不会被丢弃。数据被consumer正确的Consumer收到时,数据就会被从queue中删除

正确的收到:使用ack机制实现//可以显式在程序中去ack,也可以自动的ack。如果数据没有被ack:rabbitmq server会把该消息传输到下一个consumer

如果这个app忘记了ack。那么rabbitmq server不会再发送数据给它。因为server认为这个consumer的处理能力有限

使用ack也可以起到一定的限流的作用:在consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance consumer的load

当然对于实际的例子,比如我们可能会对某些数据进行merge,比如merge 4s内的数据,然后sleep 4s后再获取数据。特别是在监听系统的state,我们不希望所有的state实时的传递上去,而是希望有一定的延时。这样可以减少某些IO,而且终端用户也不会感觉到。


4.Reject a message 

有两种方式,第一种的Reject可以让RabbitMQ Server将该Message 发送到下一个Consumer。第二种是从queue中立即删除该Message。


5.Creating a queue

Consumer和Procuder都可以通过 queue.declare 创建queue。对于某个Channel来说,Consumer不能declare一个queue,却订阅其他的queue。当然也可以创建私有的queue。这样只有app本身才可以使用这个queue。queue也可以自动删除,被标为auto-delete的queue在最后一个Consumer unsubscribe后就会被自动删除。那么如果是创建一个已经存在的queue呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

那么谁应该负责创建这个queue呢?是Consumer,还是Producer?

如果queue不存在,当然Consumer不会得到任何的Message。但是如果queue不存在,那么Producer Publish的Message会被丢弃。所以,还是为了数据不丢失,Consumer和Producer都try to create the queue!反正不管怎么样,这个接口都不会出问题。

queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循环的方式(round-robin)的方式均衡的发送给不同的Consumer。


6.Exchanges

从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

7.Virtual hosts

每个virtual host本质上都是一个RabbitMQ Server,拥有它自己的queue,exchagne,和bings rule等等。这保证了你可以在多个不同的application中使用RabbitMQ。

二、收发“hello world”

python --version //用python2的 安装python2-pika 

1.发送消息

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

=====================================================================

建立连接->创建channel->创建名字为hello的队列->发送消息->关闭连接

从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。

关闭连接

[root@node112 test]# rabbitmqctl list_queues //查看已经发送的队列

Listing queues ...

Hello 1 //被消费后,会变成0

...done.


2.接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()

=====================================================================

建立连接->创建channel->创建名字为hello的队列->消费消息->关闭连接

subscribe了。在这之前,需要声明一个回调函数来处理接收到的数据。

3.运行测试

$ python send.py  

[x] Sent 'Hello World!'

send.py 每次运行完都会停止。注意:现在数据已经存到queue里了。接收它:

$ python receive.py  

[*] Waiting for messages. To exit press CTRL+C  

[x] Received 'Hello World!'  


三、任务分发机制

RabbitMQ Server将queue的Message发送给不同的Consumer以处理计算密集型的任务

1.任务分发机制


new_task.py //发送者

====================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python  
import pika  
import sys  
connection = pika.BlockingConnection(pika.ConnectionParameters(  
host='localhost'))  
channel = connection.channel()  
channel.queue_declare(queue='task_queue', durable=True)    
message = ' '.join(sys.argv[1:]) or "Hello World!"  
channel.basic_publish(exchange='',  
routing_key='task_queue',  
body=message,  
properties=pika.BasicProperties(  
delivery_mode = 2# make message persistent  
))  
print " [x] Sent %r" % (message,)  
connection.close()

worker.py //收集者

===================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python  
import pika  
import time    
connection = pika.BlockingConnection(pika.ConnectionParameters(  
host='localhost'))  
channel = connection.channel()  
channel.queue_declare(queue='task_queue', durable=True)  
print ' [*] Waiting for messages. To exit press CTRL+C'    
def callback(ch, method, properties, body):  
print " [x] Received %r" % (body,)  
time.sleep( body.count('.') )  
print " [x] Done"  
ch.basic_ack(delivery_tag = method.delivery_tag)    
channel.basic_qos(prefetch_count=1)  
channel.basic_consume(callback,  
queue='task_queue')
channel.start_consuming()

2.Round-robin循环分发

RabbitMQ对于load较大的情况,可以通过增加consumer和多创建VirtualHost解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Consumer1:# python worker.py 
Consumer2:# python worker.py 
Producer:#[root@node112 test]# for i in First Second Third Fourth Fifth ; do python new_task.py $i messages  ; done
 [x] Sent 'First messages'
 [x] Sent 'Second messages'
 [x] Sent 'Third messages'
 [x] Sent 'Fourth messages'
 [x] Sent 'Fifth messages'
验证:
Consumer1:
[root@node112 test]# python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second messages'
[x] Done
[x] Received 'Fourth messages'
[x] Done
Consumer2:
[root@node112 test]# python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First messages'
[x] Done
[x] Received 'Third messages'
[x] Done
[x] Received 'Fifth messages'
[x] Done

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin。


3.消息确认

no-ack:Consumer收到消息后,RabbitMQ Server会立即把这个message标记为完成,然后从queue中退出 //

ack:数据被接收并且被处理后(RabbitMQ Server收到ACK)才会去安全的删除数据

如果Consumer退出了但是没有发送ack,RabbitMQ会把这个Message发送到下一个Consumer。保证在Consumer异常退出的情况下数据不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

默认情况下,消息确认是打开的(enabled)。

1
2
3
4
5
6
7
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)  
time.sleep( body.count('.') )  
print " [x] Done"  
ch.basic_ack(delivery_tag = method.delivery_tag)  
channel.basic_consume(callback,  
queue='hello')

这样即使你通过Ctr-C中断了worker.py,那么Message也不会丢失了,它会被分发到下一个Consumer。

如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged  


4.消息持久化

将queue和Message持久化

队列持久化:channel.queue_declare(queue='hello', durable=True)  

再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:

接下来,需要持久化Message,即在Publish的时候指定一个properties,方式如下:

1
2
3
4
5
6
channel.basic_publish(exchange='',  
routing_key="task_queue",  
body=message,  
properties=pika.BasicProperties(  
delivery_mode = 2# make message persistent  
))

防止数据丢失:

1.Consumer在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。

2.持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.持久化Message,理由同上。

但是数据依然存在丢失的风险。//例如在存储到磁盘的时间过程中

RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

方案:把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。

或者在{系统panic/异常重启/断电}时,给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。


5.公平分发

默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。当然n是取余后的。它不管Consumer是否还有unacked Message,只是按照这个默认机制进行分发。

那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

通过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:

channel.basic_qos(prefetch_count=1)  


注意:这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

转自:http://blog.csdn.net/column/details/rabbitmq.html

官网:http://www.rabbitmq.com

http://blog.csdn.net/anzhsoft/article/details/19563091











本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051211,如需转载请自行联系原作者