[ 
https://issues.apache.org/jira/browse/FLINK-20244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367701#comment-17367701
 ] 

Austin Cawley-Edwards commented on FLINK-20244:
-----------------------------------------------

Thanks for the ping [~cmick] + follow-up on this ticket. The issues you've 
decomposed make sense to me – as for splitting them up, I think it depends on 
how big the changes are if they can fit into a single PR. Multiple tickets + 
commits seem reasonable though. I'll pass this off to [~fabian.paul] and 
[~arvid] though, as they're working more in this area and will know the 
procedures better than I. Anyway, thanks for your continued contributions :)

> RMQSource does not ACK duplicated messages
> ------------------------------------------
>
>                 Key: FLINK-20244
>                 URL: https://issues.apache.org/jira/browse/FLINK-20244
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors/ RabbitMQ
>    Affects Versions: 1.11.2, 1.12.0
>            Reporter: Thomas Eckestad
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> *Background*
> The following has been observed when using RMQSource with 
> exactly-once-guarantees.
> When a job is restarted from a checkpoint, and there were unacked messages 
> referenced by that checkpoint at time of the restart, those messages will be 
> requeued by RMQ and resent to the restarted job. But they will not be acked. 
> Later when the connection to RMQ is closed (the job either finishes or is 
> restarted) they will be requeued again.
> When looking at the source code, messages are ACK:ed by the RMQSource after a 
> checkpoint is complete 
> (MessageAcknowledgingSourceBase::notifyCheckpointComplete).
> Also, when looking at the source code in RMQSource::setMessageIdentifier() 
> (on the master branch, the ACK semantics does not seem to have changed since 
> 1.11.2) it is clear that if a RMQ message carries a correlation ID which has 
> already been handled, that message is skipped and not further processed. It 
> is also clear that skipped messages are not added to the sessionIds-list of 
> messages that are targeted for ACK to RMQ.
> I believe all successfully consumed RMQ messages should be ACK:ed, it is 
> irrelevant if the message is ignored or processed by Flink. RMQ needs to know 
> that the consumer considers the message as handled OK.
> The following code is from RMQSource::setMessageIdentifier(). Note the return 
> before sessionIds.add():
>  .
>  .
>  .
>  if (!addId(correlationId))
> { // we have already processed this message return false; }
> }
>  sessionIds.add(deliveryTag);
>  .
>  .
>  .
> Directly related to the above I also noticed that RMQ connections are leaked 
> at internal job restart. From the Flink log (this stack trace is from 1.11.2):
> 2020-11-18 10:08:25,118 ERROR 
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error during 
> disposal of stream operator.
>  com.rabbitmq.client.AlreadyClosedException: connection is already closed due 
> to connection error; protocol method: 
> #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - 
> Closed via management plugin, class-id=0, method-id=0)
>  at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:303) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1294) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicCancel(AutorecoveringChannel.java:482)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:192)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
> AlreadyClosedException is not handled by the RMQSource::close(). This results 
> in a RMQ connection thread somewhere being left behind. I triggered three 
> restarts of the job in a row and noticed one new connection added to the pile 
> of connections for each restart. I triggered the restart by killing the 
> active connection to RMQ using the RMQ admin GUI (management plugin, see 
> above exception details).
> I also tried to kill one of the leaked connections. But a new one is 
> instantly created when doing so. The traceback when doing this (1.11.2):
> 2020-11-18 10:27:51,715 ERROR 
> com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An unexpected 
> connection driver error occured
>  java.lang.NoClassDefFoundError: 
> com/rabbitmq/client/AMQP$Connection$CloseOk$Builder
>  at 
> com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:800)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:753)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:237) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162)
>  
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) 
> ~[blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:623) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at 
> com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) 
> [blob_p-ca82946ac30c169a36c8631b9c5ed87eac51082c-df5535f2bff73c43c02410fbc19aca4a:?]
>  at java.lang.Thread.run(Unknown Source) [?:?]
> These problems are most probably present in more versions than 1.11.2 and 
> 1.12.0.
> *Proposed fix (not verified)*
> The fix to ACK all messages should be simple. Just move 
> sessionIds.add(deliveryTag) up a few rows before the test that checks if 
> correlation ID:s are used or not in RMQSource::setMessageIdentifier().
> The fix for close() and the stale RMQ connections would be to ignore and log 
> any exceptions thrown by channel.basicCancel(), channel.close() and 
> connection.close(). So that failing to call channel.basicCancel() does not 
> inhibit channel.close() or connection.close() from being called. Or maybe 
> just call connection.close() which should close the channel as well.
> Another minor thing that could be improved, while working on this, is to 
> leverage the support to ack multiple messages at once, by doing 
> channel.basicAck(ldeliveryTagMax, true);. Since the RMQSource never NACK:s a 
> message and deliveryTags are monotonically increasing that should be doable, 
> or?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to