[ 
https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17888408#comment-17888408
 ] 

Lianet Magrans commented on KAFKA-17769:
----------------------------------------

Hey [~Yu-Lin Chen] , thanks for getting into this. I don't quite get the root 
cause you describe though. The expectation is that the new consumer does not 
send a HB request if there's a previous one in-flight 

[https://github.com/apache/kafka/blob/167e2f71f051fe7672265d1b5ae008622d3ea526/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java#L80]

(called from 
[https://github.com/apache/kafka/blob/167e2f71f051fe7672265d1b5ae008622d3ea526/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L186]
 )

Isn't that the case?

 

> Fix flaky 
> PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17769
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Yu-Lin Chen
>            Assignee: Yu-Lin Chen
>            Priority: Major
>
> 4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
> This issue can be reproduced in my local within 50 loops.
>  
> ([Oct 4 2024 at 10:35:49 
> CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
> {code:java}
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer        
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
>      
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)   
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)  
> at scala.collection.AbstractIterable.foreach(Iterable.scala:935)      
> at 
> kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)   
>      
> at java.lang.reflect.Method.invoke(Method.java:566)   
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)   
>      
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.ArrayList.forEach(ArrayList.java:1541)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
> can't be empty. {code}
> {*}Root Cause{*}: 
> The following hearbeat requests might happen in flight simultaneously in new 
> consumer:
>  - The first heartbeat, which will get memberId from group coordinator 
> (memberEpoch = 0)
>  - The next heartbeat after an unsubscribe event (memberEpoch = -1)
> The second heartbeat will be sent with empty memberId while the first 
> heartbeat is still in flight. And the coordinator won't accept it (memberId = 
> empty, memberEpoch = -1 ). 
> This corner case occurs only when unsubscribe() is called shortly after the 
> first poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to