且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

ActiveMQ(02):JMS基本概念和模型

更新时间:2022-09-29 23:35:33

一、基本概念

1.1 JMS是什么

JMS Java Message Service,Java消息服务,是JavaEE中的一个技术。

1.2 JMS规范

JMS定义了Java中访问消息中间件的接口,并没有给予实现,实现JMS接口的消息中间件称为JMS Provider,例如ActiveMQ

1.3 JMS provider

实现JMS接口和规范的消息中间件

1.4 JMS message

JMS的消息,JMS消息由一下三部组成:

 1:消息头:每个消息头字段都有相应的getter和setter方法

 2:消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性

 3:消息体:封装具体的消息数据

1.5 JMS producer

消息生产者,创建和发送JMS消息的客户端应用

1.6 JMS consumer

消息消费者,接收和处理JMS消息的客户端应用

消息的消费可以采用以下两种方法之一:

 1:同步消费:通过调用消费者的receive方法从目的地中显式提取消息,receive 方法可以一直阻塞到消息到达。

 2:异步消费:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作

1.7 JMS domains

消息传递域,JMS规范中定义了两种消息传递域:

 1:点对点 (point-to-point,简写成PTP)消息传递域:

 2:发布/订阅消息传递域(publish/subscribe,简写:成pub/sub)


点对点消息传递域的特点如下:

 (1)每个消息只能有一个消费者

 (2)消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

  ActiveMQ(02):JMS基本概念和模型

发布/订阅消息传递域的特点如下:

 (1)每个消息可以有多个消费者

 (2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。

 ActiveMQ(02):JMS基本概念和模型

注意:在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)

1.8 接口简介

1.8.1 Connection factory

连接工厂,用来创建连接对象,以连接到JMS的provider

1.8.2 JMS Connection

封装了客户与JMS 提供者之间的一个虚拟的连接

1.8.3 JMS session

是生产和消费消息的一个单线程上下文会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

1.8.4 Destination

消息发送到的目的地

1.8.5 Acknowledge

签收

1.8.6 Transaction

事务

1.8.7 JMS client

用来收发消息的Java应用

1.8.8 Non-JMS client

使用JMS provider本地API写的应用,用来替换JMS API实现收发消息的功能,通常会提供其他的一些特性,比如:CORBA、RMI等。

1.8.9 Administered objects

预定义的JMS对象,通常在provider规范中有定义,提供给JMS客户端来访问,比如: ConnectionFactory和Destination

二、JMS的消息结构

JMS消息的组成:消息头,属性和消息体

2.1 消息头

消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如下:

 1:JMSDestination:消息发送的目的地,由send方法设置

   消息发送的目的地,主要是指Queue和Topic,自动分配

 2:JMSDeliveryMode:传送模式,由send方法设置

   传送模式,有两种 :持久模式和非持久模式。

     一条持久性的消息应该被传送“一次仅仅一次”,这就意味者如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。

     一条非持久的消息最多会传送一次,这意味这服务器出现故障,该消息将永远丢失。自动分配

 3:JMSExpiration:消息过期时间,由send方法设置

   消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。

     如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。

     如果timeToLive值不等于零,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

 4:JMSPriority:消息优先级,由send方法设置

   从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。

   JMS不要求JMSProvider严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是4级。

 5:JMSMessageID:由send方法设置

   唯一识别每个消息的标识,由JMS Provider 产生。

 6:JMSTimestamp:由客户端设置

   一个JMS Provider在调用send()方法时自动设置的。它是消息被发送和消费者实际接收的时间差。

 7:JMSCorrelationID :由客户端设置

   用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。在大多数情况下,JMSCorrelationID用于将一条消息标记为对JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可以是任何值,不仅仅是JMSMessageID。由开发者设置

 8:JMSReplyTo :由客户端设置

   提供本消息回复消息的目的地址。由开发者设置

 9:JMSType :由客户端设置

   消息类型的识别符。由开发者设置

 10:JMSRedelivered:由JMS Provider设置

   如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。如果该消息被重新传送,JMSRedelivered=true反之,JMSRedelivered =false。

2.2 消息属性

消息属性,包含以下三种类型的属性:

 1:应用程序设置和添加的属性,比如:

   Message.setStringProperty(“username”,username);

 2:JMS定义的属性

   使用“JMSX”作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(),方法返回所有连接支持的JMSX 属性的名字。

 3:JMS供应商特定的属性


JMS定义的属性如下:

 1:JMSXUserID:发送消息的用户标识,发送时提供商设置

 2:JMSXAppID:发送消息的应用标识,发送时提供商设置

 3:JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2,… ,发送时提供商设置

 4:JMSXGroupID:消息所在消息组的标识,由客户端设置

 5:JMSXGroupSeq:组内消息的序号第一个消息是1,第二个是2,…,由客户端设置

 6:JMSXProducerTXID :产生消息的事务的事务标识,发送时提供商设置

 7:JMSXConsumerTXID :消费消息的事务的事务标识,接收时提供商设置

 8:JMSXRcvTimestamp :JMS 转发消息到消费者的时间,接收时提供商设置

 9:JMSXState:假定存在一个消息仓库,它存储了每个消息的单独拷贝,且这些消息从原始消息被发送时开始。每个拷贝的状态有:1(等待),2(准备),3(到期)或4(保留)。

   由于状态与生产者和消费者无关,所以它不是由它们来提供。它只和在仓库中查找消息相关,因此JMS没有提供这种API。由提供商设置

2.3 消息体

消息体,JMS API定义了5种消息体格式,也叫消息类型,可以使用不同形式发送接收数据,并可以兼容现有的消息格式。

包括:TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage


三、示例

3.1 发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void test2() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    for (int i = 0; i < 3; i++) {
        MapMessage message = session.createMapMessage();
    message.setIntProperty("orderId" + i, 1);
    message.setString("nro" + i, "X2015478464");
    producer.send(message);
    }
    session.commit();
    session.close();
    connection.close();
}

3.2 接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void test2() throws Exception {
    ConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","tcp://192.168.91.8:61616");
    Connection connection = cf.createConnection();
    connection.start();
             
    Enumeration names = connection.getMetaData().getJMSXPropertyNames();
    while (names.hasMoreElements()) {
        String name = (String) names.nextElement();
    System.out.println("JMSX name = " + name);
    }
             
    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("my-queue");
    MessageConsumer consumer = session.createConsumer(destination);
    int i=0;
    while(i<3) {
        MapMessage message = (MapMessage) consumer.receive();
    session.commit();
    System.out.println("收到消息:" + message.getString("nro" + i) + ", poperty : " + message.getStringProperty("orderId" + i));
    i++;
    }
    session.close();
    connection.close();
}

3.3 效果

ActiveMQ(02):JMS基本概念和模型

本文转自我爱大金子博客51CTO博客,原文链接http://blog.51cto.com/1754966750/1914850如需转载请自行联系原作者


我爱大金子