Lianet Magrans created KAFKA-18569:
--------------------------------------

             Summary: New consumer close may wait on unneeded FindCoordinator
                 Key: KAFKA-18569
                 URL: https://issues.apache.org/jira/browse/KAFKA-18569
             Project: Kafka
          Issue Type: Bug
          Components: consumer
            Reporter: Lianet Magrans
             Fix For: 4.0.0


A flaky test revealed that the new consumer close may wait for a 
FindCoordinator unsent request to go out when closing the consumer, even after 
the commit/leaveGroup stages of close are done.

This could happen because the CoordinatorRequestManager poll continues to 
attempt FindCoordinator if the coordinator is unknown , even if this happens 
during consumer close, after the consumer has completed the commit/leave 
attempts (which are the only steps in close that require a coordinator), and 
before the network shutdown that stops polling managers here

[https://github.com/apache/kafka/blob/5c20aa187aa8f51af4270d7d1b0db4963b0cd10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1343]
 

If the unneeded FindCoordinator is generated and the brokers are down (like 
could happen in the flaky test), the consumer would wait for that request 
unnecessarily here  
[https://github.com/apache/kafka/blob/5c20aa187aa8f51af4270d7d1b0db4963b0cd10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L327]

I expect we shouldn't block the close on a FindCoordinator request if the 
consumer already completed the commit/leave attempts.

An option could be to consider "signal close" to the CoordinatorRequestManager 
after the consumer.close completes commit/leave, so that it does not generate 
any more requests on poll (similar to what is already done for the 
CommitRequestManager with the CommitOnCloseEvent

[https://github.com/apache/kafka/blob/5c20aa187aa8f51af4270d7d1b0db4963b0cd10b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1335]
 

This fix should allow to enable this test for the new consumer reliably.

https://github.com/apache/kafka/blob/5c20aa187aa8f51af4270d7d1b0db4963b0cd10b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala#L404

Without the fix, the test is flaky (fails locally after a few repeated runs, 
fails in CI).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to