且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

ActiveMQ(14):Destination(目的地)高级特性

更新时间:2022-09-29 23:35:09

一、Wildcards

Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展。ActiveMQ支持以下三种wildcards:

 1:“.” 用于作为路径上名字间的分隔符

 2:“*” 用于匹配路径上的任何名字

 3:">" 用于递归地匹配任何以这个名字开始的destination


示例,设想你有如下两个destinations

  PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价)

  PRICE.STOCK.NYSE.SUNW (SUN在纽约证券交易所的股价)

那么:

 1:PRICE.> :匹配任何产品的价格变动

 2:PRICE.STOCK.> :匹配任何产品的股票价格变动

 3:PRICE.STOCK.NASDAQ.* :匹配任何在NASDAQ下面的产品的股票价格变动

 4:PRICE.STOCK.*.IBM:匹配任何IBM的产品的股票价格变动

二、组合列队Composite Destinations

组合队列允许用一个虚拟的destination代表多个destinations。这样就可以通过composite destinations在一个操作中同时向多个queue发送消息。

2.1 客户端实现的方式

在composite destinations中,多个destination之间采用“,”分割。例如:

1
2
3
    Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
  
  Destination destination = session.createQueue("my-queue,my-queue2");

如果你希望使用不同类型的destination,那么需要加上前缀如queue:// 或topic://,例如:

1
    Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

2.2 在conf/activemq.xml中的broker下配置实现

1
2
3
4
5
6
7
8
9
10
11
12
<destinationInterceptors>
    <virtualDestinationInterceptor>
      <virtualDestinations>
        <compositeQueue name="MY.QUEUE">
          <forwardTo>
            <queue physicalName="my-queue" />
          <queue physicalName="my-queue2" />
          </forwardTo>
            </compositeQueue>
        </virtualDestinations>
    </virtualDestinationInterceptor>
</destinationInterceptors>

再java代码发送的时候,队列的的名字就用MY.QUEUQ

三、Configure Startup Destinations

如果需要在ActiveMQ启动的时候,创建Destination的话,可以如下配置conf/activemq.xml的broker下:

1
2
3
4
<destinations>
    <queue physicalName="FOO.BAR" />
  <topic physicalName="SOME.TOPIC" />
</destinations>

四、Delete Inactive Destinations

一般情况下,ActiveMQ的queue在不使用之后,可以通过web控制台或是JMX方式来删除掉。当然,也可以通过配置,使得broker可以自动探测到无用

的队列(一定时间内为空的队列)并删除掉,回收响应资源。可以如下配置conf/activemq.xml:

1
2
3
4
5
6
7
8
9
<broker schedulePeriodForDestinationPurge="10000">
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
      </policyEntries>
    </policyMap>
    </destinationPolicy>
</broker>

说明:

  schedulePeriodForDestinationPurge:设置多长时间检查一次,这里是10秒,默认为0

  inactiveTimoutBeforeGC:设置当Destination为空后,多长时间被删除,这里是30秒,默认为60

  gcInactiveDestinations: 设置删除掉不活动队列,默认为false

五、Destination Options

队列选项是给consumer在JMS规范之外添加的功能特性,通过在队列名称后面使用类似URL的语法添加多个选项。包括:

 1:consumer.prefetchSize,consumer持有的未确认最大消息数量,默认值 variable

 2:consumer.maximumPendingMessageLimit:用来控制非持久化的topic在存在慢消费者的情况下,丢弃的数量,默认0

 3:consumer.noLocal :默认false

 4:consumer.dispatchAsync :是否异步分发 ,默认true

 5:consumer.retroactive:是否为回溯消费者 ,默认false

 6:consumer.selector:Jms的Selector,默认null

 7:consumer.exclusive:是否为独占消费者 ,默认false

 8:consumer.priority:设置消费者的优先级,默认0


使用示例:

1
2
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);

六、虚拟Destinations(Visual Destinations)

6.1 简介

虚拟Destinations用来创建逻辑Destinations,客户端可以通过它来生产和消费消息,它会把消息映射到物理Destinations。

ActiveMQ支持两种方式:

 1:虚拟主题(Virtual Topics)

 2:组合Destinations(Composite Destinations)


6.2 为何使用虚拟主题

ActiveMQ中,topic只有在持久订阅下才是持久化的。持久订阅时,每个持久订阅者,都相当于一个queue的客户端,它会收取所有消息。这种情况下存在两个问题:

 1:同一应用内consumer端负载均衡的问题:也即是同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个consumer都会获取所有消息。

    queue模式可以解决这个问题,但broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能JMS规范本身是没有的。

  2:同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高


 为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能

6.3 如何使用虚拟主题

1:对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.Orders,代码示例如下:

1
Topic destination = session.createTopic("VirtualTopic.myTest");

2:对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。

  例如Consumer.A.VirtualTopic.Orders,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.Orders说明是一个名称为B的客户端。可以在同一个应用里使用

  多个consumer消费此queue,则可以实现上面两个功能。


  又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多

  个消费者共同来承担消费任务。代码示例如下:

1
    Destination destination = session.createQueue("Consumer.A.VirtualTopic.myTest");

注意:将destinationInterceptors注释掉


ActiveMQ(14):Destination(目的地)高级特性


java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void topic() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
         
  Connection connection = connectionFactory.createConnection();
         
  Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
          
  Destination destination = session.createTopic("VirtualTopic.myTest");
         
  MessageProducer producer = session.createProducer(destination);
  producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 设置DeliveryMode.PERSISTENT模式
         
  connection.start();
}
 
public void queue() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
    Connection connection = cf.createConnection();
    connection.start();
         
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("Consumer.A.VirtualTopic.myTest");
    MessageConsumer consumer = session.createConsumer(destination);
    int i=0;
    while(i<3) {
        i++;
    TextMessage message = (TextMessage) consumer.receive();
    session.commit();
    System.out.println("收到消 息:" + message.getText());
    }
    session.close();
    connection.close();
}

3:默认虚拟主题的前缀是 :VirtualTopic.>

 自定义消费虚拟地址默认格式:Consumer.*.VirtualTopic.>

 自定义消费虚拟地址可以改,比如下面的配置就把它修改了。

 xml配置示例如下:

1
2
3
4
5
6
7
8
9
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
        <virtualDestinationInterceptor>
            <virtualDestinations>
                <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
            </virtualDestinations>
        </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>

七、镜像Queues(Mirrored Queues)

7.1 简介

ActiveMQ中每个queue中的消息只能被一个consumer消费。然而,有时候你可能希望能够监视生产者和消费者之间的消息流。你可以通过使用Virtual Destinations 来建立一

个virtual queue 来把消息转发到多个queues中。但是 为系统中每个queue都进行如此的配置可能会很麻烦。

7.2 使用

ActiveMQ支持Mirrored Queues。Broker会把发送到某个queue的所有消息转发到一个名称类似的topic,因此监控程序只需要订阅这个mirrored queue topic。为了启用

Mirrored Queues,首先要将BrokerService的useMirroredQueues属性设置成true,然后可以通过destinationInterceptors设置其它属性,如mirror topic的前缀,缺省

是“VirtualTopic.Mirror.”。

比如修改后缀的配置示例如下conf/activemq.xml:

1
2
3
4
5
6
7
8
9
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"
        useMirroredQueues="true"
    >
    ...
<destinationInterceptors>
    <mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/>
</destinationInterceptors>
    ...
</broker>

发送列队的时候,就会自动在topic里也发布

  ActiveMQ(14):Destination(目的地)高级特性


八、Per Destination Policies

ActiveMQ支持多种不同的策略,来单独配置每一个Destination它的属性很多,可以参见官方文档:

http://activemq.apache.org/per-destination-policies.html 

本文转自我爱大金子博客51CTO博客,原文链接http://blog.51cto.com/1754966750/1920339如需转载请自行联系原作者


我爱大金子