且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

阿里云物联网平台之独享实例下的amqp客户端连接

更新时间:2022-04-01 23:46:32

1、首先找到控制***享实例

阿里云物联网平台之独享实例下的amqp客户端连接

2、找到接入域名

阿里云物联网平台之独享实例下的amqp客户端连接

3、Java Sample

import java.net.URI;
import java.util.Hashtable;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

public static void main(String[] args) throws Exception {
    //参数说明,请参见上一篇文档:AMQP客户端接入说明。
    String accessKey = "";
    String accessSecret = "";
    String consumerGroupId ="";
    String iotInstanceId = "";
    long timeStamp = System.currentTimeMillis();
    //签名方法:支持hmacmd5,hmacsha1和hmacsha256
    String signMethod = "hmacsha1";
    //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。
    //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
    String clientId = "112233";

    //UserName组装方法,请参见上一篇文档:AMQP客户端接入说明。
    String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
    //password组装方法,请参见上一篇文档:AMQP客户端接入说明。
    String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
    String password = doSign(signContent,accessSecret, signMethod);
    //按照qpid-jms的规范,组装连接URL。
    String connectionUrl = "failover:(amqps://iot-cn-******.amqp.iothub.aliyuncs.com:5671?amqp.idleTimeout=80000)"
            + "?failover.reconnectDelay=30";

    Hashtable<String, String> hashtable = new Hashtable<String, String>();
    hashtable.put("connectionfactory.SBCF",connectionUrl);
    hashtable.put("queue.QUEUE", "default");
    hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    Context context = new InitialContext(hashtable);
    ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
    Destination queue = (Destination)context.lookup("QUEUE");
    // Create Connection
    Connection connection = cf.createConnection(userName, password);
    ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
    // Create Session
    // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
    // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    connection.start();
    // Create Receiver Link
    MessageConsumer consumer = session.createConsumer(queue);
    consumer.setMessageListener(messageListener);
}

private static MessageListener messageListener = new MessageListener() {
    @Override
    public void onMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
            logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            //如果创建Session选择的是Session.CLIENT_ACKNOWLEDGE,这里需要手动ACK。
            //message.acknowledge();
            //如果要对收到的消息做耗时的处理,请异步处理,确保这里不要有耗时逻辑。
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
};

private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
    /**
     * 连接成功建立。
     */
    @Override
    public void onConnectionEstablished(URI remoteURI) {
        logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
    }

    /**
     * 尝试过最大重试次数之后,最终连接失败。
     */
    @Override
    public void onConnectionFailure(Throwable error) {
        logger.error("onConnectionFailure, {}", error.getMessage());
    }

    /**
     * 连接中断。
     */
    @Override
    public void onConnectionInterrupted(URI remoteURI) {
        logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
    }

    /**
     * 连接中断后又自动重连上。
     */
    @Override
    public void onConnectionRestored(URI remoteURI) {
        logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

    @Override
    public void onSessionClosed(Session session, Throwable cause) {}

    @Override
    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

    @Override
    public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};

/**
 * password签名计算方法,请参见上一篇文档:AMQP客户端接入说明。
 */
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
    SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
    Mac mac = Mac.getInstance(signMethod);
    mac.init(signingKey);
    byte[] rawHmac = mac.doFinal(toSignString.getBytes());
    return Base64.encodeBase64String(rawHmac);
}

}

4、确认客户端在线

阿里云物联网平台之独享实例下的amqp客户端连接