On 3/11/24 16:54, Ciaran wrote:
Hi all,

I'm taking a look at using ProtonJ2 to talk to Azure Service Bus and I've
successfully gotten everything up and running, with and without websockets,
now I'm turning my attention to the 'unhappy path' of network drop-outs and
misconfigurations leading to connection failures.

Is it possible to have a behaviour where:

    - If an initial connection fails, the error is immediately propagated
    and background reconnect attempts are not attempted.
    - If an initial connection was successful then background reconnect
    attempts are attempted.

>From looking through the api docs and reading the  code, I believed that
configuring the following would exhibit the behaviours described.

options.reconnectOptions().reconnectEnabled(true);

options.reconnectOptions().maxInitialConnectionAttempts(1);

options.reconnectOptions().useReconnectBackOff();

*However, when connecting to Azure Service Bus with a deliberately invalid
password, it appears as though the reconnect logic is continuing in the
background (exceptions spamming my log ), not aborting on first connection
attempt. *Is this the expected behaviour?

I took a brief look at the code and can see that 'connectionEstablished' in
the ClientConnection class is being called even though the connection can't
possibly succeed as the authentication is wrong, is that expected behaviour
of the internals?

I've tried to summarise the code I have below, I've removed error handling,
resource cleanup for clarity:

    final String serverHost = "MY_HOST.servicebus.windows.net";
    final int serverPort = 443;
    final String address = "testq";
    final String user = "MY_USER";
    final String password= "MY_PASSWORD_BUT_INVALIDATED_FOR_THIS_TEST";
    final boolean useSSL = true;
    final boolean useWebSockets = true;

     final Client client = Client.create();
    ConnectionOptions options = new
ConnectionOptions().user(user).password(password);
    options.sslOptions().sslEnabled(useSSL);
    options.transportOptions().useWebSockets(useWebSockets);
    options.transportOptions().webSocketPath("/$servicebus/websocket");
    options.reconnectOptions().reconnectEnabled(true);
    options.reconnectOptions().maxInitialConnectionAttempts(1);
    options.reconnectOptions().useReconnectBackOff();
    Connection connection = client.connect(serverHost, serverPort, options);
    ReceiverOptions ro = new ReceiverOptions();
    ro.autoAccept(false);
    Receiver receiver = connection.openReceiver( address, ro);

My exception / warning logs are contained at the end of the email, the
second email repeats until a ClientOperationTimedOutException is thrown
with the message 'Link open timed out waiting for remote to respond'.

Obviously, I could seperate the code into one client that does not have
reconnect logic enabled to determine if a connection is even *possible*
before engaging my proper client, but it seemed like the API offerred the
behaviour I was after?

Thank you for your patience if you made it this far
- Cj.

The client re-connection logic treats a SASL authentication error as a terminal state and will not continue reconnect attempts if it receives a SASL outcome that indicates anything other than a temporary failure state so it should be stopping the reconnect if it was actually failing to authenticate.  This is true on the first attempt as well as on subsequent attempts to other hosts if you add more than one and it has failed to reach any of the preceding hosts while attempt to recover the connection.

From the error logs attached it doesn't appear as though SASL authentication is your issue though, at least in so much as it isn't logging anything indicating SASL authentication as the error. Instead the connection appears to be failing because the remote has sent a response to the Receiver attach that does not include the initial delivery count value as required by the specification.

You can capture more information about the AMQP frames being sent and received by enabling frame tracing, see the docs for how to do that.

https://github.com/apache/qpid-protonj2/blob/main/protonj2-client-docs/Configuration.md#logging



---- Exception / logs below -------

20:49:52,348  WARN SaslMechanismSelector:130 - Caught exception while
trying to create SASL mechanism MSSBCBS: No Matching SASL Mechanism with
name: MSSBCBS
20:49:52,652  WARN ClientTransportListener:63 - Caught problem during
incoming data processing: Sending peer attach had no initial delivery count
org.apache.qpid.protonj2.engine.exceptions.EngineFailedException: Sending
peer attach had no initial delivery count
at
org.apache.qpid.protonj2.engine.exceptions.ProtonExceptionSupport.createFailedException(ProtonExceptionSupport.java:63)
at
org.apache.qpid.protonj2.engine.exceptions.ProtonExceptionSupport.createFailedException(ProtonExceptionSupport.java:31)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.engineFailed(ProtonEngine.java:306)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:271)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:54)
at
org.apache.qpid.protonj2.client.impl.ClientTransportListener.transportRead(ClientTransportListener.java:59)
at
org.apache.qpid.protonj2.client.transport.netty4.TcpTransport$NettyDefaultHandler.dispatchReadBuffer(TcpTransport.java:522)
at
org.apache.qpid.protonj2.client.transport.netty4.WebSocketTransport$NettyWebSocketTransportHandler.channelRead0(WebSocketTransport.java:239)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by:
org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException:
Sending peer attach had no initial delivery count
at
org.apache.qpid.protonj2.engine.impl.ProtonSession.remoteAttach(ProtonSession.java:513)
at
org.apache.qpid.protonj2.engine.impl.ProtonConnection.handleAttach(ProtonConnection.java:537)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleAttach(ProtonPerformativeHandler.java:128)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleAttach(ProtonPerformativeHandler.java:43)
at org.apache.qpid.protonj2.types.transport.Attach.invoke(Attach.java:452)
at
org.apache.qpid.protonj2.engine.IncomingAMQPEnvelope.invoke(IncomingAMQPEnvelope.java:69)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleRead(ProtonPerformativeHandler.java:68)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:187)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:147)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameLoggingHandler.handleRead(ProtonFrameLoggingHandler.java:101)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:187)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:147)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler$FrameBodyParsingStage.parse(ProtonFrameDecodingHandler.java:387)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler$FrameSizeParsingStage.parse(ProtonFrameDecodingHandler.java:265)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler.handleRead(ProtonFrameDecodingHandler.java:99)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:199)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:132)
at
org.apache.qpid.protonj2.engine.impl.ProtonEnginePipeline.fireRead(ProtonEnginePipeline.java:301)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:266)
... 34 more
20:49:52,654  WARN ClientTransportListener:63 - Caught problem during
incoming data processing: Cannot ingest data into an Engine that has been
shutdown or failed
org.apache.qpid.protonj2.engine.exceptions.EngineFailedException: Cannot
ingest data into an Engine that has been shutdown or failed
at
org.apache.qpid.protonj2.engine.exceptions.ProtonExceptionSupport.createFailedException(ProtonExceptionSupport.java:63)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.checkShutdownOrFailed(ProtonEngine.java:422)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:258)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:54)
at
org.apache.qpid.protonj2.client.impl.ClientTransportListener.transportRead(ClientTransportListener.java:59)
at
org.apache.qpid.protonj2.client.transport.netty4.TcpTransport$NettyDefaultHandler.dispatchReadBuffer(TcpTransport.java:522)
at
org.apache.qpid.protonj2.client.transport.netty4.WebSocketTransport$NettyWebSocketTransportHandler.channelRead0(WebSocketTransport.java:239)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by:
org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException:
Sending peer attach had no initial delivery count
at
org.apache.qpid.protonj2.engine.impl.ProtonSession.remoteAttach(ProtonSession.java:513)
at
org.apache.qpid.protonj2.engine.impl.ProtonConnection.handleAttach(ProtonConnection.java:537)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleAttach(ProtonPerformativeHandler.java:128)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleAttach(ProtonPerformativeHandler.java:43)
at org.apache.qpid.protonj2.types.transport.Attach.invoke(Attach.java:452)
at
org.apache.qpid.protonj2.engine.IncomingAMQPEnvelope.invoke(IncomingAMQPEnvelope.java:69)
at
org.apache.qpid.protonj2.engine.impl.ProtonPerformativeHandler.handleRead(ProtonPerformativeHandler.java:68)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:187)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:147)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameLoggingHandler.handleRead(ProtonFrameLoggingHandler.java:101)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:187)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:147)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler$FrameBodyParsingStage.parse(ProtonFrameDecodingHandler.java:387)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler$FrameSizeParsingStage.parse(ProtonFrameDecodingHandler.java:265)
at
org.apache.qpid.protonj2.engine.impl.ProtonFrameDecodingHandler.handleRead(ProtonFrameDecodingHandler.java:99)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.invokeHandlerRead(ProtonEngineHandlerContext.java:199)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngineHandlerContext.fireRead(ProtonEngineHandlerContext.java:132)
at
org.apache.qpid.protonj2.engine.impl.ProtonEnginePipeline.fireRead(ProtonEnginePipeline.java:301)
at
org.apache.qpid.protonj2.engine.impl.ProtonEngine.ingest(ProtonEngine.java:266)
... 34 more


--
Tim Bish


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to