且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

尝试使用Java EventHubClient API将事件发送到Azure Event Hub时出现“连接异常中止"错误

更新时间:2023-09-08 07:49:10

是的,这绝对是防火墙问题,您需要使用以下代码来帮助您解决将数据写入EventHub时的连接问题./p>

希望这会有所帮助!

final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("Your namespace name")
                .setEventHubName("Your eventHub Name")
                .setSasKeyName("Enter the SAS key name")
                .setSasKey("SAS Key details");

        connStr.setTransportType(TransportType.AMQP_WEB_SOCKETS);
        ProxySelector.setDefault(new ProxySelector() {
            @Override
            public List<Proxy> select(URI uri) {
                LinkedList<Proxy> proxies = new LinkedList<>();
                proxies.add(new Proxy(Proxy.Type.HTTP,
                        new InetSocketAddress("proxy information" ,port)));
                return proxies;
            }
            @Override
            public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
                // trace and follow up on why proxy server is down
            }
        });

I am trying to write a java publisher program, to send a single message to event hub. but, when i try to execute it i get a connection aborted error. I am using maven dependency specified in below link(azure-eventhubs) and version of 0.9.0 Below is the link i am trying to follow and the main method .

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-ephcs-getstarted

public static void main(String[] args)
        throws ServiceBusException, ExecutionException, InterruptedException, IOException {
    final String namespaceName = "<namespace name>";
    final String eventHubName = "<created with same name as namespace>";
    final String sasKeyName = "RootManageSharedAccessKey";
    final String sasKey = "<primary key from shared acccess policies>";

    ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespaceName, eventHubName, sasKeyName, sasKey);

    byte[] payloadBytes = "Test AMQP message from JMS, Yaay it works".getBytes("UTF-8");
    EventData sendEvent = new EventData(payloadBytes);

    EventHubClient ehClient = EventHubClient.createFromConnectionStringSync(connStr.toString());
    ehClient.sendSync(sendEvent);
}

Here is the stacktrace of the error i get. Any help is appreciated.

  error[connection aborted]
  Exception in thread "main" com.microsoft.azure.servicebus.ServiceBusException: connection aborted
    at com.microsoft.azure.servicebus.ExceptionUtil.toException(ExceptionUtil.java:93)
    at com.microsoft.azure.servicebus.MessagingFactory.onConnectionError(MessagingFactory.java:187)
    at com.microsoft.azure.servicebus.amqp.ConnectionHandler.onTransportError(ConnectionHandler.java:105)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:276)
    at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
    at java.lang.Thread.run(Thread.java:745)

Yes, it's definitely a firewall issue, you will need to use the below code which would help you in solving the connection issue when writing the data to EventHub.

Hope this helps!

final ConnectionStringBuilder connStr = new ConnectionStringBuilder()
                .setNamespaceName("Your namespace name")
                .setEventHubName("Your eventHub Name")
                .setSasKeyName("Enter the SAS key name")
                .setSasKey("SAS Key details");

        connStr.setTransportType(TransportType.AMQP_WEB_SOCKETS);
        ProxySelector.setDefault(new ProxySelector() {
            @Override
            public List<Proxy> select(URI uri) {
                LinkedList<Proxy> proxies = new LinkedList<>();
                proxies.add(new Proxy(Proxy.Type.HTTP,
                        new InetSocketAddress("proxy information" ,port)));
                return proxies;
            }
            @Override
            public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
                // trace and follow up on why proxy server is down
            }
        });