Mule
ESB是一个基于Java的轻量级企业服务总线和集成平台,允许开发人员快速便利地连接多个应用,并支持应用间的数据交换。Mule
ESB支持集成现有系统而无论其底层采用何种技术,如JMS、Web
Services、JDBC、HTTP以及其他技术。
2. 整体结构
图 整体结构
从上图可见,Mule通过Transports/Connectors与外围的异构系统连接,提供Routing(路由)、Transaction
Management(事务管理)、Transformation(转换)、Message
Broker(消息代理)、Transportation Management(传输管理)、Security(安全)等核心模块。Mule可以单独使用,也可以架设在常用的应用服务器上。
图 架构简图
外围系统的服务请求通过Mule ESB的Transport接入,Mule通过Transformer进行数据的格式转换,然后经过Inbound
Router进行消息过滤(内部通过配置filter实现)后交给Mule的Component进行业务逻辑处理,处理后的结果通过Outbound Router确定传递给哪个接收方,然后通过Transformer进行数据格式转换,通过Transport连接至接收方,传递信息。
此图描述的是Mule中的一个典型场景的处理过程,涵盖了Mule中的各个关键组件。其中某些处理步骤不是必须的,如Inbound
Router、Transformer。后续可以看到一些其他场景的处理。
3. 功能
a. 服务中介
-
将业务逻辑和消息发送分离
- 屏蔽服务的消息格式和协议
- 提供任意位置的服务调用
- 提供协议桥接
b. 数据转换
- 在应用间交换不同格式的信息
- 操作消息的负载内容,包括加密、压缩和编码转换
- 在异构的传输协议的数据类型间格式化消息
c. 消息路由
- 基于消息内容和复杂规则路由消息
- 消息的过滤、聚合以及重新排列序号
d. 服务创建和托管
- 暴露端点、EJB、Spring
Bean以及POJO作为服务
- 作为轻量级的服务容器进行服务托管
Mule ESB中有一些基本的概念,理解这些基本概念后才能理解Mule的内部机制。从中也可以看到Mule解决问题的基本思路。
4. 基本概念
4.1 Model
Model表示托管各个服务的运行时环境。
图 Model
4.2 Service
Service是用来处理服务请求的基本单位,它调用各个组件进行服务请求的处理。
图 Service
4.3 Transport
Transport管理消息的接收和发送,数据转换的过程也是在Transport中通过调用Transformer完成的。
图 Transport
4.3.1 Connector
Connector用于管控特定协议的使用,如HTTP
Connector、JMS Connector等。
4.3.2 End-Point
Endpoint用于表示一种协议的特定使用方式,如listening/polling、从中读取、向指定地址写入等,定义了发送和接收消息的通道。Endpoint控制的是底层的实体在Connector中如何被使用。
Endpoint定义于Inbound和Outbound
Router中。
4.4 Transformer
Transformer用于转换消息的内容。
图 Transformer
4.5 Router
Router使用Filter基于消息中的属性信息进行消息的分发。
图 Router
Router在Service中的位置决定了Router的性质(inbound、outbound和response)和担任的角色(pass-through、aggregator等)。
4.6 Component
Component是Service的核心部件,是Service的业务逻辑的实现。
图 Component: implicit bridge component
Component可以是Java Class(POJO、Spring Bean)、Web Service、Script等。
Component可定义自己的生命周期:initialise、start、stop、dispose,不过需要实现Mule的LifeCycle接口。Mule
3.0版本开始提供@PostConstruct和@PreDestroy的注解,对应生命周期的initialise和dispose阶段,不需要实现Mule的LifeCycle接口了。
4.7 Flow(@since 3.0)
Flow是Mule
3.0新引入的,包含一个消息源(Message Source)和多个消息处理器组成的处理器链。
图 Flow
根据实际需求着重检查了一下Mule ESB的消息传递方式。Mule支持常用的几种消息传递方式,能够满足要求。
5. 消息传递方式
5.1 异步方式
异步方式是一种单向调用,调用者不需要获得响应。
图 Asynchronous
异步方式通过inbound和outbound
endpoint的exchange-pattern=”one-way”实现。
使用基本的Stdio Transport验证,通过标准输入传输字符串,将其原样传递给标准输出进行显示。相应配置如下:
xml 代码
- <service
name="echo">
-
<inbound>
-
<stdio:inbound-endpoint
system="IN"
exchange-pattern="one-way"
/>
-
</inbound>
-
-
<component>
-
<singleton-object
class="demo.mule.umo.StdIo"
/>
-
</component>
-
-
<outbound>
-
<pass-through-router>
-
<stdio:outbound-endpoint
system="OUT"
exchange-pattern="one-way"
/>
-
</pass-through-router>
-
</outbound>
- </service>
运行服务,控制台显示结果如下:
- Please enter: Hello, world!
- INFO 2010-12-07 19:21:18,877 [ConsoleConnector.dispatcher.1]
- org.mule.lifecycle.AbstractLifecycleManager: Initialising:
- 'ConsoleConnector.dispatcher.23255376'. Object is: StdioMessageDispatcher
- INFO 2010-12-07 19:21:18,877 [ConsoleConnector.dispatcher.1]
- org.mule.lifecycle.AbstractLifecycleManager: Starting:
- 'ConsoleConnector.dispatcher.23255376'. Object is: StdioMessageDispatcher
- Hello, world!
其中INFO输出是Mule第一次初始化相应Connector打印出来的,之后调用服务不会再次显示。
异步方式适用于简单的消息传递的场景。
5.2
请求-响应方式
请求-响应方式即请求方调用服务后,服务立即处理并返回响应结果,不需将消息再次传递。
图 Request-Response
请求-响应方式通过input
endpoint的exchange-pattern=”request-response”实现,相应配置如下:
xml
代码
- <strong>
-
<strong>
-
<model name="services">
-
<service name="echoService">
-
<inbound>
-
<inbound-endpoint
address="http://localhost:7007/services/Echo"
-
exchange-pattern="request-response">
-
<cxf:jaxws-service
/>
-
</inbound-endpoint>
-
</inbound>
-
<component>
-
<singleton-object
class="demo.mule.umo.Echo"
/>
-
</component>
-
</service>
-
</model>
-
</strong>
- </strong>
上边是通过service配置的,通过flow配置如下:
xml 代码
- <flow
name="EchoFlow">
- <inbound-endpoint
address="http://localhost:7007/services/Echo"
- exchange-pattern="request-response"
/>
- <cxf:jaxws-service
serviceClass="demo.mule.umo.Echo"
/>
- <component>
- <singleton-object
class="demo.mule.umo.Echo"
/>
- </component>
- </flow>
在浏览器中输入“http://localhost:7007/services/Echo/echo/text/hello,world”,浏览器中会显示“hello,world”的输出信息。
请求-响应方式适用于单次服务调用的场景。
5.3 同步方式
同步方式即请求方调用服务后,component将处理结果发送给另一个外部服务处理,并将处理结果反方向返回。
图 Synchronous
同步方式通过inbound和outbound
endpoint的exchange-pattern=”request-response”实现,相应配置如下:
xml 代码
- <flow
name="echo">
-
<inbound-endpoint
address="http://localhost:7007/services/Echo"
-
exchange-pattern="request-response"
/>
-
<cxf:jaxws-service
serviceClass="demo.mule.umo.Echo"
/>
-
<component>
-
<singleton-object
class="demo.mule.umo.StdIo"
/>
-
</component>
-
<vm:outbound-endpoint
path="vm"
exchange-pattern="request-response"
/>
- </flow>
- <flow
name="vm">
-
<vm:inbound-endpoint
path="vm"
exchange-pattern="request-response"
/>
-
<component>
-
<singleton-object
class="demo.mule.umo.Vm"
/>
-
</component>
-
<stdio:outbound-endpoint
system="OUT"
exchange-pattern="one-way"
/>
- </flow>
同步方式适用于通过Mule调用远程服务的场景。
5.4 异步请求-响应方式
异步请求-响应方式即请求方调用服务后不需要立即获得返回结果,component将请求发送给其他外围系统处理(可能有多个),全部处理完毕后通过指定的异步应答Router返回给请求方。
图
Asynchronous Request-Response
异步请求-响应方式通过在OutBound
Endpoint中增加reply-to以及增加async-reply节点实现,响应配置如下:
xml 代码
- <flow
name="echo">
-
<inbound-endpoint
address="http://localhost:7007/services/Echo"
-
exchange-pattern="request-response"
/>
-
<cxf:jaxws-service
serviceClass="demo.mule.umo.Echo"
/>
-
<component>
-
<singleton-object
class="demo.mule.umo.StdIo"
/>
-
</component>
-
<vm:outbound-endpoint
path="vm"
exchange-pattern="request-response"
/>
- </flow>
- <flow
name="vm">
-
<vm:inbound-endpoint
path="vm"
exchange-pattern="request-response"
/>
-
<component>
-
<singleton-object
class="demo.mule.umo.Vm"
/>
-
</component>
-
<stdio:outbound-endpoint
system="OUT"
exchange-pattern="one-way"
/>
- </flow>
异步请求-响应方式适用于请求需要被多个远程服务并行处理,结果需要汇总处理后返回的场景。
注:上述代码未运行通过,queue1和queue2获得了请求消息并正常处理,但返回至async-reply时抛出异常,暂未定位到问题。
后将collection-async-reply-router改为single-async-reply-router未报异常,代码示例如下:
xml 代码
- <em><service
name="async req-rep">
- <inbound>
- <stdio:inbound-endpoint
ref="stdioInEndpoint"
/>
- </inbound>
- <component
class="demo.mule.umo.Echo"
/>
- <outbound>
- <multicasting-router>
- <vm:outbound-endpoint
path="async.queue1"
exchange-pattern="one-way"
/>
- <vm:outbound-endpoint
path="async.queue2"
exchange-pattern="one-way"
/>
- <reply-to
address="vm://reply"
/>
- </multicasting-router>
- </outbound>
- <async-reply
timeout="5000"
failOnTimeout="true">
- <vm:inbound-endpoint
path="reply"
exchange-pattern="one-way"
/>
- <single-async-reply-router
/>
- </async-reply>
- </service></em>
等有空看看collection-async-reply-router吧,或者自定义router。