[ https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367701#comment-17367701 ]
Austin Cawley-Edwards commented on FLINK-20244: ----------------------------------------------- Thanks for the ping [~cmick] + follow-up on this ticket. The issues you've decomposed make sense to me – as for splitting them up, I think it depends on how big the changes are if they can fit into a single PR. Multiple tickets + commits seem reasonable though. I'll pass this off to [~fabian.paul] and [~arvid] though, as they're working more in this area and will know the procedures better than I. Anyway, thanks for your continued contributions :) > RMQSource does not ACK duplicated messages > ------------------------------------------ > > Key: FLINK-20244 > URL: https://issues.apache.org/jira/browse/FLINK-20244 > Project: Flink > Issue Type: Bug > Components: Connectors/ RabbitMQ > Affects Versions: 1.11.2, 1.12.0 > Reporter: Thomas Eckestad > Priority: Minor > Labels: auto-deprioritized-major > > *Background* > The following has been observed when using RMQSource with > exactly-once-guarantees. > When a job is restarted from a checkpoint, and there were unacked messages > referenced by that checkpoint at time of the restart, those messages will be > requeued by RMQ and resent to the restarted job. But they will not be acked. > Later when the connection to RMQ is closed (the job either finishes or is > restarted) they will be requeued again. > When looking at the source code, messages are ACK:ed by the RMQSource after a > checkpoint is complete > (MessageAcknowledgingSourceBase::notifyCheckpointComplete). > Also, when looking at the source code in RMQSource::setMessageIdentifier() > (on the master branch, the ACK semantics does not seem to have changed since > 1.11.2) it is clear that if a RMQ message carries a correlation ID which has > already been handled, that message is skipped and not further processed. It > is also clear that skipped messages are not added to the sessionIds-list of > messages that are targeted for ACK to RMQ. > I believe all successfully consumed RMQ messages should be ACK:ed, it is > irrelevant if the message is ignored or processed by Flink. RMQ needs to know > that the consumer considers the message as handled OK. > The following code is from RMQSource::setMessageIdentifier(). Note the return > before sessionIds.add(): > . > . > . > if (!addId(correlationId)) > { // we have already processed this message return false; } > } > sessionIds.add(deliveryTag); > . > . > . > Directly related to the above I also noticed that RMQ connections are leaked > at internal job restart. From the Flink log (this stack trace is from 1.11.2): > 2020-11-18 10:08:25,118 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during > disposal of stream operator. > com.rabbitmq.client.AlreadyClosedException: connection is already closed due > to connection error; protocol method: > #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - > Closed via management plugin, class-id=0, method-id=0) > at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > AlreadyClosedException is not handled by the RMQSource::close(). This results > in a RMQ connection thread somewhere being left behind. I triggered three > restarts of the job in a row and noticed one new connection added to the pile > of connections for each restart. I triggered the restart by killing the > active connection to RMQ using the RMQ admin GUI (management plugin, see > above exception details). > I also tried to kill one of the leaked connections. But a new one is > instantly created when doing so. The traceback when doing this (1.11.2): > 2020-11-18 10:27:51,715 ERROR > com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected > connection driver error occured > java.lang.NoClassDefFoundError: > com/rabbitmq/client/AMQP$Connection$CloseOk$Builder > at > com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) > > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) > ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) > [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) > [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) > [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?] > at java.lang.Thread.run(Unknown Source) [?:?] > These problems are most probably present in more versions than 1.11.2 and > 1.12.0. > *Proposed fix (not verified)* > The fix to ACK all messages should be simple. Just move > sessionIds.add(deliveryTag) up a few rows before the test that checks if > correlation ID:s are used or not in RMQSource::setMessageIdentifier(). > The fix for close() and the stale RMQ connections would be to ignore and log > any exceptions thrown by channel.basicCancel(), channel.close() and > connection.close(). So that failing to call channel.basicCancel() does not > inhibit channel.close() or connection.close() from being called. Or maybe > just call connection.close() which should close the channel as well. > Another minor thing that could be improved, while working on this, is to > leverage the support to ack multiple messages at once, by doing > channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a > message and deliveryTags are monotonically increasing that should be doable, > or? -- This message was sent by Atlassian Jira (v8.3.4#803005)