且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Oozie 使用ActiveMQ实现 JMS通知

更新时间:2022-08-30 15:45:58

一,介绍

提交给Oozie的作业,作业在运行过程中的状态会发生变化如:执行成功了,或者失败了……Oozie能够监控这些作业状态的改变并且将这些消息发送到JMS消息服务器。这里,使用ActiveMQ作为JMS消息服务器。

Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events.

For Oozie to send/receive messages, a JMS-compliant broker should be installed. Apache ActiveMQ is a popular JMS-compliant broker usable for this purpose.

 

二,配置Oozie以允许支持消息服务

需要修改oozie-site.xml并添加若干配置进去,具体可参考官网Notifications Configuration

按照官网给出的步骤进行配置即可。

由于我用的是Cloudera Hadoop,故可以在其管理界面直接进行配置。如下图:

Oozie 使用ActiveMQ实现 JMS通知

 

Oozie 使用ActiveMQ实现 JMS通知

修改Oozie配置之后,需要重启生效。

 

三,安装ActiveMQ接收消息

参考ActiveMQ官方网站,安装及配置。

启动ActiveMQ,过一段时间后,若有作业提交给Oozie,Oozie执行后会给ActiveMQ发消息。还未弄清楚到底在什么情况下,Oozie会向ActiveMQ发送消息???

如下图:ActiveMQ收到的消息:

Oozie 使用ActiveMQ实现 JMS通知

topicName 即为userName,可从Oozie的配置文件oozie-default.xml中看出:

Oozie 使用ActiveMQ实现 JMS通知

 

4,编写ActiveMQ Client程序从Borker中获取消息

实现javax.jms.MessageListener接口,建立连接代码如下:

Oozie 使用ActiveMQ实现 JMS通知
        OozieClient oc = new OozieClient("http://192.168.121.35:11000/oozie");
        JMSConnectionInfo jmsInfo = oc.getJMSConnectionInfo();

        Properties jndiProperties = jmsInfo.getJNDIProperties();
        Context jndiContext = new InitialContext(jndiProperties);

        String connectionFactoryName = (String) jndiContext.getEnvironment()
                .get("connectionFactoryNames");
        ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext
                .lookup(connectionFactoryName);
        Connection connection = connectionFactory.createConnection();

        Session session = connection.createSession(false,
                Session.AUTO_ACKNOWLEDGE);
Oozie 使用ActiveMQ实现 JMS通知

 

当有新消息到达时,自动调用MessageListener的onMessage()方法获取到消息。

Oozie 使用ActiveMQ实现 JMS通知
    @Override
    public void onMessage(Message message) {
        try {
            if (message.getStringProperty(JMSHeaderConstants.MESSAGE_TYPE)
                    .equals(org.apache.oozie.client.event.Event.MessageType.SLA
                            .name())) {
                SLAMessage slaMessage = JMSMessagingUtils
                        .getEventMessage(message);
                String id = slaMessage.getId();
                String noti = slaMessage.getNotificationMessage();
                System.out.println(id + " : " + noti);
                // Further processing
            } else if (message.getStringProperty(JMSHeaderConstants.APP_TYPE)
                    .equals(AppType.WORKFLOW_JOB.name())) {
                WorkflowJobMessage wfJobMessage = JMSMessagingUtils
                        .getEventMessage(message);
                String jobId = wfJobMessage.getId();//获得作业的ID
                Date startTime = wfJobMessage.getStartTime();//获得作业的启动时间
                System.out.println(jobId + startTime);
                // Further processing
            }
Oozie 使用ActiveMQ实现 JMS通知

 

5,测试

给Oozie提交workflow作业,在MyEclipse控制台中看到,该作业的ID和启动时间已经查询到了。

Oozie 使用ActiveMQ实现 JMS通知

 

6,总结

由于原来当作业提交之后,不断地向Oozie Server发HTTP请求(每隔10秒一次)的方式来查询作业是否执行完成,然后Oozie Server以JSON格式的字符串返回该JobID相关的执行结果,这种情况有两个问题:①同步的HTTP轮询方式,效率低下,耦合严重; ②需要自己解析JSON,处理异常情况(比如,执行失败的作业Json字符串中的EndTime字段为null)

而现在引入ActiveMQ后,异步获取消息,查询作业的执行结果变得更加容易了。而且还可以使用Oozie提供的一些更加高级的特性了。比如:SLA(Service Level Agreement)

 

本文转自hapjin博客园博客,原文链接:http://www.cnblogs.com/hapjin/p/5420799.html,如需转载请自行联系原作者