且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

ActiveMQ私有云、公有云以及Docker环境高可用集群方案汇总

更新时间:2022-09-26 14:01:10

ActiveMQ软件概述

ActiveMQ提供消息队列服务。

ActiveMQ高可用原理

ActiveMQ高可用由三部分组成。

1.ActiveMQ的master-slave

两个运行的ActiveMQ instance如果同时使用一套持久化存储,那么这两个ActiveMQ instance就会构成master-slave关系。持久化数据放在一个单独的文件系统目录上或者放在一个共享的文件系统目录上,这个目录中会有一个lock锁文件。谁先启动ActiveMQ instance谁就会抢占这个锁,谁抢占了这个锁谁就是master,slave运行在standby状态,只有master服务停止或者中断后,slave就会立刻抢占这个锁,成为新的master,而另一个ActiveMQ instance启动后无法抢占这个锁,会以slave方式运行。

2.ActiveMQ的networkConnectors

ActiveMQ的networkConnectors可以实现多个mq之间进行路由,假设有两个mq,分别为brokerA和brokerB,当有一条消息发送到brokerA的队列queueName中,brokerA中队列queueName的消息就会路由到brokerB的队列queueName上,反之brokerB的消息也会路由到brokerA。    
networkConnectors可以配置成静态的(固定的)也可以配置成动态的,取决于ActiveMQ instance运行的网络环境。

3.ActiveMQ的failover客户端连接协议

与其说是协议不如说是策略。failover传输是指在客户端连接列表中配置多个连接配置信息(称之为URIs list),当list中的某一个连接信息不可用时,就会尝试下一个连接信息,直到找到可用的连接信息。

ActiveMQ集群环境

ActiveMQ可以运行在主机内,也可以运行在虚拟机内、更可以运行在Docker中。但无论运行在哪种平台中,都需要考虑集群容错问题。例如集群节点应该尽可能地位于不同的host上,避免host发生故障导致整个集群不可用的情况。    
根据公有云、私有云环境的不同和ActiveMQ实现集群方式的不同分为两种集群方案。

1.私有云环境下的ActiveMQ高可用集群方案

在私有云环境下,网络是受自己控制的,ActiveMQ集群可以运行在同一个局域网内,利用局域网内特有的一些局域网协议实现集群,这些协议包括多播、VRRP等。多播能使一个或多个多播源只把数据包发送给特定的多播组,而只有加入该多播组的主机才能接收到数据包。一些分布在各处的进程需要以组的方式协同工作,组中的进程通常要给其他所有的成员发送消息。即有这样的一种方法能够给一些明确定义的组发送消息,这些组的成员数量虽然很多,但是与整个网络规模相比却很小。给这样一个组发送消息称为多点点播送,简称多播。多播技术可以用于自动发现,因此它能实现集群中的自动发现节点功能。借助多播技术,可以使用动态发现实现集群节点的自动添加和移除。

2.公有云环境下的ActiveMQ高可用集群方案

在公有云环境中,往往不是受自己控制的,多台云主机也可能不在同一个局域网内,此时就不能利用多播技术实现集群应用。除了利用多播的自动发现实现集群自动发现节点外,同样可以使用静态配置的方式,告诉集群内的每一个节点它的的对等节点有谁。当集群内的某个节点发生故障时,静态配置的节点信息可能发生改变,导致集群内存在一个或多个不可用的节点地址,从而导致集群对于客户端而言不可用。或者当集群内的某个节点故障恢复后,不能动态的告知它的对等节点它已恢复,同样可能造成集群对客户端而言不可用或者无法提供相应的服务标准。这种情况下可以使用一种特殊的方式配置集群与客户端的连接,虽然客户端的集群节点配置是静态的,但是客户端可以通过某种方式智能迅速的判断集群中的节点是否可用,从而实现高可用。    
公有云环境+docker环境还需要考虑多主机环境下的容器间通信问题。

ActiveMQ集群环境配置之私有云环境配置

# ActiveMQ references    
http://activemq.apache.org/version-5-run-broker.html     
http://activemq.apache.org/topologies.html     
http://activemq.apache.org/version-5-topologies.html     
http://activemq.apache.org/clustering.html     
http://activemq.apache.org/replicated-message-store.html     
http://activemq.apache.org/shared-file-system-master-slave.html     
http://activemq.apache.org/failover-transport-reference.html     
http://activemq.apache.org/configuring-version-5-transports.html     
http://dgd2010.blog.51cto.com/1539422/1680244     
http://activemq.apache.org/version-5-performance-tuning.html     
http://activemq.apache.org/networks-of-brokers.html     
# SharedFile System Master Slave and Dynamic Discovery Clustering Design     
192.168.1.241 server1.51devops.com  activemq master,activemq cluster A     
192.168.1.242 server2.51devops.com  activemq slave     
192.168.1.243 server3.51devops.com  nfs server,activemq cluster B     
# add new disk to 192.168.1.243     
fdisk /dev/sdb     
n     
p     
1

w    
mkfs.xfs /dev/sdb1     
mkdir -p /data     
mount /dev/sdb1 /data     
mkdir /data/ActivemqSharedBrokerData     
# install nfs on 192.168.1.243     
yum -y install nfs-utils nfs-utils-lib     
chkconfig --levels 235 nfs on     
cat >/etc/exports<<eof     
/data  192.168.1.0/255.255.255.0(rw,no_root_squash,no_all_squash,sync)     
eof     
exportfs -a     
exportfs -r     
http://www.linuxquestions.org/questions/linux-networking-3/rpc-statd-not-running-872348/     
/etc/init.d/rpcbind start     
/etc/init.d/nfslock start     
/etc/init.d/nfs restart     
# end install nfs on 192.168.1.243     
# mount nfs on 192.168.1.241 192.168.1.242     
yum -y install nfs-utils nfs-utils-lib     
mkdir /data     
mount -t nfs -o rw 192.168.1.243:/data /data     
# 192.168.1.243:/data on /data type nfs (rw,vers=4,addr=192.168.1.243,clientaddr=192.168.1.241)     
# install java     
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel     
# install activemq     
wget -c http://www.apache.org/dist/activemq/KEYS     
gpg --import KEYS     
wget -c https://www.apache.org/dist/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz.asc     
wget -c http://apache.fayea.com/activemq/5.13.1/apache-activemq-5.13.1-bin.tar.gz     
gpg --verify apache-activemq-5.13.1-bin.tar.gz.asc     
tar zxf apache-activemq-5.13.1-bin.tar.gz     
mv apache-activemq-5.13.1 /usr/local/activemq     
ls /usr/local/activemq     
cd /usr/local/activemq     
# end install activemq     
# on 192.168.1.241    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.241" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>     
<persistenceAdapter>     
  <kahaDB directory="/data/ActivemqSharedBrokerData"/>     
</persistenceAdapter>

# end on 192.168.1.241     
# on 192.168.1.242     
vim conf/activemq.xml     

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.242" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>     
<persistenceAdapter>     
  <kahaDB directory="/data/ActivemqSharedBrokerData"/>     
</persistenceAdapter>

# end on 192.168.1.242     
# on 192.168.1.241 and 192.168.1.242     
cd /usr/local/activemq     
bin/activemq start     
bin/activemq status     
bin/activemq stop     
true > data/activemq.log     
bin/activemq restart     
sleep 2     
tail -n30 data/activemq.log     
# end on 192.168.1.241 and 192.168.1.242     
# on 192.168.1.243     
cd /usr/local/activemq     
vim conf/activemq.xml    

1
2
3
4
5
6
7
8
9
10
11
<broker xmlns="http://activemq.apache.org/schema/core"     
dataDirectory="${activemq.data}"     
brokerName="192.168.1.243" useJmx="true" advisorySupport="false"     
persistent="true" deleteAllMessagesOnStartup="false"     
useShutdownHook="false" schedulerSupport="true">     
<networkConnectors>     
    <networkConnector uri="multicast://default" />     
</networkConnectors>     
<transportConnectors>     
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61618" discoveryUri="multicast://default" />     
</transportConnectors>

bin/activemq start     
bin/activemq status     
bin/activemq stop     
true > data/activemq.log     
bin/activemq restart     
sleep 2     
tail -n30 data/activemq.log     

ActiveMQ集群环境配置之共有云环境配置

公有云环境的配置与私有云环境的差异就在必须将动态配置换成静态配置。    

1
2
3
<networkConnectors>     
    <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>   
</networkConnectors>

ActiveMQ集群环境配置之共有云+Docker环境配置

在Docker的加入后就有很大的不同了。首先Docker的配置文件(如/etc/hosts)中必须灵活的指定主机名称与IP地址的映射关系,这样使得ActiveMQ的配置文件更加灵活,不依赖于IP地址、具有更好的应用范围。其次要解决主机间容器通信的问题,每一个主机上运行的Docker容器之间必须能相互访问。    
如果想使用Docker,建议使用Linux 3.10版本以上的内核的操作系统,这样的Linux内核在CentOS7、Ubuntu14以上都支持。     
此处以CentOS7 1511为例,跨主机网络互通靠docker native plugin中的overlay实现。此方案中会用到一个kv存储,这个kv存储可以使用Consul、etcd等实现,此处用Consul实现。     
yum -y update     
history -c && shutdown -r now     
uname -a     
# Linux localhost.localdomain 3.10.0-327.10.1.el7.x86_64 #1 SMP Tue Feb 16 17:03:50 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux     
# set FQDN hostname use edit file or hostname command or nmtui command     
vim /etc/hostname     
setenforce 0     
# for service docker     
# Refer: https://www.docker.com/     
# Refer: https://docs.docker.com/linux/step_one/     
which curl 2>/dev/null || yum -y -q install curl     
curl -fsSL https://get.docker.com/gpg | gpg --import     
curl -fsSL https://get.docker.com/ | sh     
# for program docker-enter     
# Refer: http://dockerpool.com/static/books/docker_practice/container/enter.html     
which curl > /dev/null || apt-get -qq install -y curl     
# # cd /tmp; curl https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz | tar -zxf-; cd util-linux-2.24;     
# cd /tmp; wget -q https://www.kernel.org/pub/linux/utils/util-linux/v2.24/util-linux-2.24.tar.gz; tar xzvf util-linux-2.24.tar.gz     
# cd util-linux-2.24     
# ./configure --without-ncurses && make nsenter     
# cp nsenter '/usr/local/bin'     
which nsenter     
cd     
which wget 2>/dev/null || yum -y -q install wget     
wget -P ~ https://github.com/yeasy/docker_practice/raw/master/_local/.bashrc_docker;     
echo "[ -f ~/.bashrc_docker ] && . ~/.bashrc_docker" >> ~/.bashrc; source ~/.bashrc     
service docker start     
docker version     
rpm -ql docker-engine     
# if there are only two docker hosts commmunicated each other, then nameserver set to each other     
# if there are more than 3 docker hosts, then first node's nameserver set to last one, second set to first one, third set to second one     
vim /etc/resolv.conf     
vim /usr/lib/systemd/system/docker.service     
-H tcp://0.0.0.0:2376 -H unix:///var/run/docker.sock --cluster-store=consul://consul.service.dc1.consul.:8500 --cluster-advertise=eno16777728:2376     
systemctl daemon-reload     
systemctl restart docker     
systemctl status docker -l     
docker network create -d overlay interconnection     
docker network ls     
# after this, start a docker container with "--net interconnection", then containers running on different hosts can communited each other.     
mkdir -p /data/docker/activemq/data     
mkdir -p /data/docker/activemq/data/kahadb     
mkdir -p /data/docker/activemq/log-master     
mkdir -p /data/docker/activemq/log-slave     
mkdir -p /data/docker/activemq/conf

vim /data/docker/activemq/conf/activemq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
      <value>file:${activemq.conf}/credentials.properties</value>
    </property>
  </bean>
  <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"></bean>
  <broker xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.data}" brokerName="localhost" useJmx="true" advisorySupport="false" persistent="true" deleteAllMessagesOnStartup="false" useShutdownHook="false" >
    <networkConnectors>
      <networkConnector uri="static:(tcp://server1-activemq-01-master:61616,tcp://server2-activemq-02-master:61616)"/> 
    </networkConnectors>
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="&gt;">
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="1000" />
            </pendingMessageLimitStrategy>
          </policyEntry>
        </policyEntries>
      </policyMap>
    </destinationPolicy>
    <managementContext>
      <managementContext createConnector="false" />
    </managementContext>
    <persistenceAdapter>
      <kahaDB directory="/data/activemq/kahadb" />
    </persistenceAdapter>
    <systemUsage>
      <systemUsage>
        <memoryUsage>
          <memoryUsage percentOfJvmHeap="70" />
        </memoryUsage>
        <storeUsage>
          <storeUsage limit="100 gb" />
        </storeUsage>
        <tempUsage>
          <tempUsage limit="50 gb" />
        </tempUsage>
      </systemUsage>
    </systemUsage>
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" />
    </transportConnectors>
    <shutdownHooks>
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
    </shutdownHooks>
  </broker>
  <import resource="jetty.xml" />
</beans>

docker run --restart="always" --name='server1-activemq-01-master' --net interconnection -d --hostname=server1-activemq-01-master \    
-e 'ACTIVEMQ_NAME=amqp-srv1-master' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-master:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8161:8161 \     
-p 61616:61616 \     
-p 61613:61613 \     
webcenter/activemq

docker run --restart="always" --name='server1-activemq-01-slave' --net interconnection -d --hostname=server1-activemq-01-slave \    
-e 'ACTIVEMQ_NAME=amqp-srv1-slave' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-slave:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8171:8161 \     
-p 61626:61616 \     
-p 61623:61613 \     
webcenter/activemq

rm -rf /data/docker/activemq    
docker stop server1-activemq-01-master && docker rm server1-activemq-01-master     
docker stop server1-activemq-01-slave && docker rm server1-activemq-01-slave     
    
mkdir -p /data/docker/activemq/data     
mkdir -p /data/docker/activemq/data/kahadb     
mkdir -p /data/docker/activemq/log-master     
mkdir -p /data/docker/activemq/log-slave     
mkdir -p /data/docker/activemq/conf

docker run --restart="always" --name='server2-activemq-02-master' -d --hostname=server2-activemq-02-master \    
-e 'ACTIVEMQ_NAME=amqp-srv2-master' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-master:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8161:8161 \     
-p 61616:61616 \     
-p 61613:61613 \     
webcenter/activemq

docker run --restart="always" --name='server2-activemq-02-slave' -d --hostname=server2-activemq-02-slave \    
-e 'ACTIVEMQ_NAME=amqp-srv2-slave' \     
-e 'ACTIVEMQ_REMOVE_DEFAULT_ACCOUNT=true' \     
-e 'ACTIVEMQ_ADMIN_LOGIN=admin' -e 'ACTIVEMQ_ADMIN_PASSWORD=your_password' \     
-e 'ACTIVEMQ_WRITE_LOGIN=producer_login' -e 'ACTIVEMQ_WRITE_PASSWORD=producer_password' \     
-e 'ACTIVEMQ_READ_LOGIN=consumer_login' -e 'ACTIVEMQ_READ_PASSWORD=consumer_password' \     
-e 'ACTIVEMQ_JMX_LOGIN=jmx_login' -e 'ACTIVEMQ_JMX_PASSWORD=jmx_password' \     
-e 'ACTIVEMQ_STATIC_TOPICS=topic1;topic2;topic3' \     
-e 'ACTIVEMQ_STATIC_QUEUES=queue1;queue2;queue3' \     
-e 'ACTIVEMQ_MIN_MEMORY=1024' -e  'ACTIVEMQ_MAX_MEMORY=4096' \     
-e 'ACTIVEMQ_ENABLED_SCHEDULER=true' \     
-v /data/docker/activemq/data:/data/activemq \     
-v /data/docker/activemq/data/kahadb:/data/activemq/kahadb \     
-v /data/docker/activemq/log-slave:/var/log/activemq \     
-v /data/docker/activemq/conf/activemq.xml:/opt/activemq/conf/activemq.xml \     
-p 8171:8161 \     
-p 61626:61616 \     
-p 61623:61613 \     
webcenter/activemq

rm -rf /data/docker/activemq    
docker stop server2-activemq-02-master && docker rm server2-activemq-02-master     
docker stop server2-activemq-02-slave && docker rm server2-activemq-02-slave     

cat /data/docker/activemq/log-master/activemq.log    
cat /data/docker/activemq/log-slave/activemq.log


客户端连接ActiveMQ集群

# About: "Caught: javax.jms.JMSSecurityException: User name [xxx] or password is invalid. javax.jms.JMSSecurityException: User name [xxx] or password is invalid."    
# Refer to https://github.com/disaster37/activemq/issues/15     
# At last, I find that only set "-e 'ACTIVEMQ_ADMIN_LOGIN=yourName' -e 'ACTIVEMQ_ADMIN_PASSWORD=yourPassword' \" like this self can login success, then I got a success!


Client connection example:    

1
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");

完整的例子如下:

activemq hello world writen with java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import org.apache.activemq.ActiveMQConnectionFactory;
 
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
/**
 * Hello world!
 */
public class activemq5Failover {
 
    public static void main(String[] args) throws Exception {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(1000);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
    }
 
    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
 
    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
//                Refer: http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
 
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
 
                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
                // Create a messages
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " this.hashCode();
                TextMessage message = session.createTextMessage(text);
 
                // Tell the producer to send the message
                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);
 
                // Clean up
                session.close();
                connection.close();
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }
 
    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {
 
                // Create a ConnectionFactory
 
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","your_password","failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false");
 
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
 
                connection.setExceptionListener(this);
 
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
 
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
 
                // Wait for a message
                Message message = consumer.receive(1000);
 
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                else {
                    System.out.println("Received: " + message);
                }
 
                consumer.close();
                session.close();
                connection.close();
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
 
        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}

tag:activemq clustering,ActiveMQ 集群,ActiveMQ 负载均衡,ActiveMQ 主备,ActiveMQ 高可用    
--end--



本文转自 urey_pp 51CTO博客,原文链接:http://blog.51cto.com/dgd2010/1749983,如需转载请自行联系原作者