且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何包装一个JMS到WebSphere MQ桥使用请求 - 应答模式同步调用?

更新时间:2023-12-03 14:11:34

。我使用标准的EJB3与JAX-WS批注和标准JMS。

在code到目前为止,我已经写了符合要求如下。这是一个无状态会话Bean与Bean管理的事务(BMT)由于使用非标准的容器管理事务(CMT)是导致某种挂就可以了,我相信,因为我试图把两种JMS的相互作用在相同的事务,因为它们在同样的方法,以便通知我必须为与JMS队列的每个交互开始和结束交易。我使用的WebLogic此解决方案。而且我也codeD的MDB基本上消耗从队列端点JMS / Pergunta的信息以及JMS放置一个响应消息/ Resposta队列我这样做是为了模拟在这个问题上的MQ侧的预期行为。实际上,在真实的情景我们可能对主机甚至其他的Java应用程序处理的消息,并放置在响应队列的响应有些COBOL应用程序。

如果有人需要尝试这个code基本上所有你需要的是有一个容器J2EE5并配置2队列,JNDI名称:JMS / Pergunta和jms / Resposta ​​ P>

在EJB / web服务code:

  @Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(NAME =DJOWebService)
公共类DJOSessionBeanWS实现DJOSessionBeanWSLocal {    记录器记录= Logger.getLogger(DJOSessionBeanWS.class.getName());    @Resource
    SessionContext被EJBContext的;    //定义JMS连接工厂。
    公共最后静态字符串JMS_FACTORY =weblogic.jms.ConnectionFactory;    //定义请求队列
    公共最后静态字符串QUEUE_PERG =JMS / Pergunta    //定义响应队列
    公共最后静态字符串QUEUE_RESP =JMS / Resposta
    上下文CTX;
    的QueueConnectionFactory qconFactory;    / **
     *默认构造函数。
     * /
    公共DJOSessionBeanWS(){
        log.info(Construtor DJOSessionBeanWS);
    }    @WebMethod(operationName =processaMensagem)
    公共字符串processaMensagem(字符串mensagemEntrada,字符串idUnica)
    {
        //获取UserTransaction的引用,因为这是一个BMT EJB。
        UserTransaction的UT = ejbContext.getUserTransaction();
        尝试{            CTX =新的InitialContext();
            //获取工厂任何交易之前,它是WebLogic资源。
            qconFactory =(的QueueConnectionFactory)ctx.lookup(JMS_FACTORY);
            log.info(得到的QueueConnectionFactory);
            ut.begin();
            QueueConnection QCon大会= qconFactory.createQueueConnection();
            QueueSession的qsession = qcon.createQueueSession(假,Session.AUTO_ACKNOWLEDGE);
            队列适量=(队列)(新的InitialContext()查找(JMS / Pergunta)。);
            TextMessage的消息= qsession.createTextMessage(这是一个请求消息);
            message.setJMSCorrelationID(idUnica);
            qsession.createSender(QS)。发送(消息);
            ut.commit();
            qcon.close();
            //必须完成,并开始一个新的事务,我决定也得到新的参考,如果这真的是需要所有JMS相关的对象,不知道
            ut.begin();
            QueueConnection queuecon = qconFactory.createQueueConnection();
            队列qreceive =(队列)(新的InitialContext()查找(JMS / Resposta)。);
            QueueSession的QueueSession的= queuecon.createQueueSession(假,Session.AUTO_ACKNOWLEDGE);
            字符串messageSelector =JMSCorrelationID ='+ idUnica +';
            //创建阙接收器并设置消息选择从响应队列只得到相关消息。
                    QueueReceiver时QR = queuesession.createReceiver(qreceive,messageSelector);
            queuecon.start();
            //设置超时时间,以保持等待响应...
            TextMessage的tresposta =(TextMessage的)qr.receive(10000);
            如果(tresposta!= NULL)
            {
                ut.commit();
                queuecon.close();
                返回(tresposta.toString());
            }
            其他{
                // commints反正..不具有反应虽然
                ut.commit();
                queuecon.close();
                log.info(空回复,通过超时返回..);
                返回没有得到消息效应初探;
            }        }赶上(例外五){
            log.severe(发生意外错误==>>中+ e.getMessage());
            e.printStackTrace();
            尝试{
                ut.commit();
            }赶上(例外前){
                ex.printStackTrace();
            }
            回归后的一些其他错误执行==&GT错误提交事务;+ e.getMessage();
        }    }
}

这是code,对于它嘲笑这个问题的MQ侧的MDB。我在我的测试中有一个视频下载片段模拟和测试在客户端的超时验证解决方案,但它不是在这个版本present。

  / **
 *模拟摆脱请求队列消息并发布在响应队列一个新的。
 * /
@MessageDriven(
        activati​​onConfig = {@Activati​​onConfigProperty(
                propertyName的=destinationType,为PropertyValue =的javax.jms.Queue
        )}
        mappedName =JMS / Pergunta)
公共类ConsomePerguntaPublicaRespostaMDB实现消息监听{    记录器记录= Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());    //定义JMS连接工厂。
    公共最后静态字符串JMS_FACTORY =weblogic.jms.ConnectionFactory;    //定义队列德resposta
    公共最后静态字符串QUEUE_RESP =JMS / Resposta
    上下文CTX;
    的QueueConnectionFactory qconFactory;    / **
     *默认构造函数。
     * /
    公共ConsomePerguntaPublicaRespostaMDB(){
        log.info(Executou construtor ConsomePerguntaPublicaRespostaMDB);
        尝试{
            CTX =新的InitialContext();
        }赶上(NamingException的E){
            e.printStackTrace();
        }
    }    / **
     * @see的MessageListener的onMessage#(消息)
     * /
    公共无效的onMessage(消息消息){
        log.info(Recuperou mensagem DA菲拉JMS / FilaPergunta,executando ConsomePerguntaPublicaRespostaMDB.onMessage);
        TextMessage的TM =(TextMessage的)消息;        尝试{
            log.info(Mensagem recebida没有的onMessage ==>>中+ tm.getText());            // PEGA ID达mensagem呐菲拉德pergunta第亚罗士打corretamente呐菲拉德resposta。
             串idMensagem = tm.getJMSCorrelationID();
             log.info(ID德mensagem阙血清USADA呐resposta ==>>中+ idMensagem);            qconFactory =(的QueueConnectionFactory)ctx.lookup(JMS_FACTORY);
            log.info(Inicializou contexto JNDIË申娜查找做的QueueConnectionFactory weblogic的COM Sucesso的Enviando mensagem。);
            QueueConnection QCon大会= qconFactory.createQueueConnection();
            QueueSession的qsession = qcon.createQueueSession(假,Session.AUTO_ACKNOWLEDGE);
            排队长龙=(队列)(ctx.lookup(JMS / Resposta));
            TextMessage的tmessage = qsession.createTextMessage(Mensagem JMS对postar呐菲拉德resposta ......);
            tmessage.setJMSCorrelationID(idMensagem);
            qsession.createSender(队列)。发送(tmessage);
        }赶上(ē的JMSException){
            log.severe(ERRO没有的onMessage ==>>中+ e.getMessage());
            e.printStackTrace();
        }赶上(NamingException的E){
            log.severe(ERRO没有查找==>>中+ e.getMessage());
            e.printStackTrace();
        }    }}

[] S

I am just dealing with a new scenario for me, which I believe might be common to some :)..

As per requirements I need to build a user experience to be like a synchronous on-line transaction for a web service call, which actually delegates the call to a IBM MQ Series using an asynchronous JMS-MQ Bridge.

The client calls the web service and than his message should be published in a JMS queue on the App server which will be delivered to WebSphere MQ and than after processing a response will delivered back to App server in a FIXED JMS queue endpoint.

The requirement deals with this transaction that will need to time out in case WebSphere MQ does not delivery the response in a defined amount of time, than the web service should send a time-out signal to client and ignore this transaction.

The sketch of the problem follows.

I need to block the request on the web service until the response arrives or time-out.

Than I am looking for some open library to help me on this task. Or the only solution is blocking a thread and keep pooling for the response? Maybe I could implement some block with a listener to be notified when the response arrives?

A bit of discussion would be very helpful for me now to try to clear my ideas on this. Any suggestions?

I have a sketch that I hope will help clearing the picture ;)

after a couple of days coding I got to a solution for this. I am using standard EJB3 with JAX-WS annotations and Standard JMS.

The code I have written so far to meet the requirements follows. It is a Stateless Session Bean with bean managed transaction(BMT) as using standart container managed transaction (CMT) was causing some kind of hang on it, I believe because I was trying to put both JMS interactions in the same transaction as they are in the same method so notice I had to start and finish transactions for each interaction with the JMS queues. I am using weblogic for this solution. And I have also coded an MDB which basically consumes the message from queue endpoint jms/Pergunta and places a response message on the jms/Resposta queue I did this to mock the expected behavior on the MQ side of this problem. Actually in a real scenario we would probably have some COBOL application on the mainframe or even other java application dealing with the messages and placing the response on the response queue.

If someone need to try this code basically all you need is to have a container J2EE5 and configure 2 queues with jndi names: jms/Pergunta and jms/Resposta.

The EJB/Webservice code:

@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(name="DJOWebService")
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {

    Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());

    @Resource
    SessionContext ejbContext;

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Defines request queue
    public final static String QUEUE_PERG = "jms/Pergunta";

    // Defines response queue
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;

    /**
     * Default constructor. 
     */
    public DJOSessionBeanWS() {
        log.info("Construtor DJOSessionBeanWS");
    }

    @WebMethod(operationName = "processaMensagem")
    public String processaMensagem(String mensagemEntrada, String idUnica)
    {
        //gets UserTransaction reference as this is a BMT EJB.
        UserTransaction ut = ejbContext.getUserTransaction();
        try {

            ctx = new InitialContext();
            //get the factory before any transaction it is a weblogic resource.
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Got QueueConnectionFactory");
            ut.begin();
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
            TextMessage message = qsession.createTextMessage("this is a request message");
            message.setJMSCorrelationID(idUnica);
            qsession.createSender(qs).send(message);
            ut.commit();
            qcon.close();
            //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
            ut.begin();
            QueueConnection queuecon = qconFactory.createQueueConnection();
            Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
            QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
            //creates que receiver and sets a message selector to get only related message from the response queue.
                    QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
            queuecon.start();
            //sets the timeout to keep waiting for the response...
            TextMessage tresposta = (TextMessage) qr.receive(10000);
            if(tresposta != null)
            {
                ut.commit();
                queuecon.close();
                return(tresposta.toString());
            }
            else{
                //commints anyway.. does not have a response though 
                ut.commit();
                queuecon.close();
                log.info("null reply, returned by timeout..");
                return "Got no reponse message.";
            }



        } catch (Exception e) {
            log.severe("Unexpected error occurred ==>> " + e.getMessage());
            e.printStackTrace();
            try {
                ut.commit();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            return "Error committing transaction after some other error executing ==> " + e.getMessage();
        } 

    }
}   

And this is the code for the MDB which mocks the MQ side of this problem. I had a Thread.sleep fragment during my tests to simulate and test the timeout on the client side to validate the solution but it is not present in this version.

/**
 * Mock to get message from request queue and publish a new one on the response queue.
 */
@MessageDriven(
        activationConfig = { @ActivationConfigProperty(
                propertyName = "destinationType", propertyValue = "javax.jms.Queue"
        ) }, 
        mappedName = "jms/Pergunta")
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {

    Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Define Queue de resposta
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;



    /**
     * Default constructor. 
     */
    public ConsomePerguntaPublicaRespostaMDB() {
        log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
        try {
            ctx = new InitialContext();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
        TextMessage tm = (TextMessage) message;

        try {
            log.info("Mensagem recebida no onMessage ==>> " + tm.getText());

            //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
             String idMensagem = tm.getJMSCorrelationID();
             log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);

            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
            TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
            tmessage.setJMSCorrelationID(idMensagem);
            qsession.createSender(queue).send(tmessage);
        } catch (JMSException e) {
            log.severe("Erro no onMessage ==>> " + e.getMessage());
            e.printStackTrace();
        }  catch (NamingException e) {
            log.severe("Erro no lookup ==>> " + e.getMessage());
            e.printStackTrace();
        }

    }

}

[]s