Hi,

we are using the RabbitMQ source connector with exactly-once guarantees. For 
this to work, according to the official Flink documentation, we are supplying 
correlation IDs with each published message and we use a parallelism of one 
with the Flink job being the single/only consumer of the queue in question (and 
we have enabled checkpointing).

The following behavior by the RMQSource seems strange to us. When a job is 
restarted from a checkpoint and there are unacked messages on the RabbitMQ 
queue for messages processed in the previous checkpoint interval, those 
messages will stay unacked until the job either finishes or is restarted again. 
When the connection to RabbitMQ is later closed (the job finished or is 
restarted), the unacked messages will be requeued for resend and sent when the 
next connection is established.

When looking at the source code, messages are ACK:ed by the RMQSource after a 
checkpoint is complete 
(MessageAcknowledgingSourceBase::notifyCheckpointComplete).

Also, when looking at the source code in RMQSource::setMessageIdentifier() (on 
the master branch, the ACK semantics does not seem to have changed since 
1.11.2) it is clear that if a RMQ message carries a correlation ID which has 
already been handled that message is skipped and not further processed. It is 
also clear that skipped messages are not added to the sessionIds-list of 
messages that are targeted for ACK to RMQ. I believe all successfully consumed 
RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or 
processed by Flink. RMQ needs to know that the consumer considers the message 
as handled OK.

The following code is from RMQSource::setMessageIdentifier(). Note the return 
before sessionIds.add():
.
.
.
  if (!addId(correlationId)) {
    // we have already processed this message
    return false;
  }
}
sessionIds.add(deliveryTag);
.
.
.

Directly related to the above I also notice that RMQ connections are leaked at 
internal job restart. From the Flink log (this stack trace is from 1.11.2):

2020-11-18 10:08:25,118 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Error during 
disposal of stream operator.
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to 
connection error; protocol method: #method<connection.close>(reply-code=320, 
reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, 
method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]

AlreadyClosedException is not handled by the RMQSource::close(). This results 
in a RMQ connection thread somewhere being left behind. I triggered three 
restarts of the job in a row and noticed one new connection added to the pile 
of connections for each restart. I triggered the restart by killing the active 
connection to RMQ using the RMQ admin GUI (management plugin, see above 
exception details).

I also tried to kill one of the leaked connections. But a new one is instantly 
created when doing so. The traceback when doing this (1.11.2):

2020-11-18 10:27:51,715 ERROR 
com.rabbitmq.client.impl.ForgivingExceptionHandler           [] - An unexpected 
connection driver error occured
java.lang.NoClassDefFoundError: 
com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
at 
com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at 
com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) 
~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) 
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) 
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) 
[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
at java.lang.Thread.run(Unknown Source) [?:?]

I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is 
included in the Job-jar:

less <my-flink-jar> | egrep 'AMQP\$Connection\$CloseOk\$Builder'
-rw----     2.0 fat      818 bl defN 20-Nov-11 16:17 
com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class

So, to sum up. It looks like there is a bug regarding ACK:s when using 
correlation IDs. This will break the exactly-once guarantee of the RMQSource 
since unacked messages will be requeued after re-connect to RMQ and thus might 
be processed more than once.

Also, the clean-up logic of the RMQSource seems buggy.

Does my reasoning make sense to you?

Best Regards,
Thomas Eckestad

Reply via email to