Hi, we are using the RabbitMQ source connector with exactly-once guarantees. For this to work, according to the official Flink documentation, we are supplying correlation IDs with each published message and we use a parallelism of one with the Flink job being the single/only consumer of the queue in question (and we have enabled checkpointing).
The following behavior by the RMQSource seems strange to us. When a job is restarted from a checkpoint and there are unacked messages on the RabbitMQ queue for messages processed in the previous checkpoint interval, those messages will stay unacked until the job either finishes or is restarted again. When the connection to RabbitMQ is later closed (the job finished or is restarted), the unacked messages will be requeued for resend and sent when the next connection is established. When looking at the source code, messages are ACK:ed by the RMQSource after a checkpoint is complete (MessageAcknowledgingSourceBase::notifyCheckpointComplete). Also, when looking at the source code in RMQSource::setMessageIdentifier() (on the master branch, the ACK semantics does not seem to have changed since 1.11.2) it is clear that if a RMQ message carries a correlation ID which has already been handled that message is skipped and not further processed. It is also clear that skipped messages are not added to the sessionIds-list of messages that are targeted for ACK to RMQ. I believe all successfully consumed RMQ messages should be ACK:ed, it is irrelevant if the message is ignored or processed by Flink. RMQ needs to know that the consumer considers the message as handled OK. The following code is from RMQSource::setMessageIdentifier(). Note the return before sessionIds.add(): . . . if (!addId(correlationId)) { // we have already processed this message return false; } } sessionIds.add(deliveryTag); . . . Directly related to the above I also notice that RMQ connections are leaked at internal job restart. From the Flink log (this stack trace is from 1.11.2): 2020-11-18 10:08:25,118 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during disposal of stream operator. com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] AlreadyClosedException is not handled by the RMQSource::close(). This results in a RMQ connection thread somewhere being left behind. I triggered three restarts of the job in a row and noticed one new connection added to the pile of connections for each restart. I triggered the restart by killing the active connection to RMQ using the RMQ admin GUI (management plugin, see above exception details). I also tried to kill one of the leaked connections. But a new one is instantly created when doing so. The traceback when doing this (1.11.2): 2020-11-18 10:27:51,715 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected connection driver error occured java.lang.NoClassDefFoundError: com/rabbitmq/client/AMQP$Connection$CloseOk$Builder at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] at java.lang.Thread.run(Unknown Source) [?:?] I have verified that com/rabbitmq/client/AMQP$Connection$CloseOk$Builder is included in the Job-jar: less <my-flink-jar> | egrep 'AMQP\$Connection\$CloseOk\$Builder' -rw---- 2.0 fat 818 bl defN 20-Nov-11 16:17 com/rabbitmq/client/AMQP$Connection$CloseOk$Builder.class So, to sum up. It looks like there is a bug regarding ACK:s when using correlation IDs. This will break the exactly-once guarantee of the RMQSource since unacked messages will be requeued after re-connect to RMQ and thus might be processed more than once. Also, the clean-up logic of the RMQSource seems buggy. Does my reasoning make sense to you? Best Regards, Thomas Eckestad