[ 
https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu-Lin Chen updated KAFKA-17769:
--------------------------------
    Description: 
4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])

This issue can be reproduced in my local within 50 loops.
 
([Oct 4 2024 at 10:35:49 
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer  

at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
   
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
   
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)  
     
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
       
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
       
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)     
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)    
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)        
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)  
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)  
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)

Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
can't be empty. {code}
{*}Root Cause{*}: 
The following hearbeat requests might happen in flight simultaneously in new 
consumer:
 - The first heartbeat, which will get memberId from group coordinator 
(memberEpoch = 0)
 - The next heartbeat after an unsubscribe event (memberEpoch = -1)

The second heartbeat will be sent with empty memberId while the first heartbeat 
is still in flight. And the coordinator won't accept it (memberId = empty, 
memberEpoch = -1 ). 

This corner case occurs only when unsubscribe() is called shortly after the 
first poll()

  was:
4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])

This issue can be reproduced in my local within 50 loops.
 
([Oct 4 2024 at 10:35:49 
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer  

at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
   
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
   
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)  
     
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
       
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
       
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)     
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)    
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)        
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)  
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)  
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)

Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
can't be empty. {code}
{*}Root Cause{*}: 
The following hearbeat requests might happen in flight simultaneously in new 
consumer:
 - The first heartbeat, which will get memberId from group coordinator 
(memberEpoch = 0)
 - The next heartbeat after an unsubscribe event (memberEpoch = -1)

If the first heartbeat still in flight. This is not accepted in coordinator.

The second heartbeat will be sent with empty memberId while the first heartbeat 
is still in flight. And the coordinator won't accept it (memberId = empty, 
memberEpoch = -1 ). 

This corner case occurs only when unsubscribe() is called shortly after the 
first poll()


> Fix flaky 
> PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17769
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Yu-Lin Chen
>            Assignee: Yu-Lin Chen
>            Priority: Major
>
> 4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
> This issue can be reproduced in my local within 50 loops.
>  
> ([Oct 4 2024 at 10:35:49 
> CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
> {code:java}
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer        
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
>      
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)   
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)  
> at scala.collection.AbstractIterable.foreach(Iterable.scala:935)      
> at 
> kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)   
>      
> at java.lang.reflect.Method.invoke(Method.java:566)   
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)   
>      
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.ArrayList.forEach(ArrayList.java:1541)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
> can't be empty. {code}
> {*}Root Cause{*}: 
> The following hearbeat requests might happen in flight simultaneously in new 
> consumer:
>  - The first heartbeat, which will get memberId from group coordinator 
> (memberEpoch = 0)
>  - The next heartbeat after an unsubscribe event (memberEpoch = -1)
> The second heartbeat will be sent with empty memberId while the first 
> heartbeat is still in flight. And the coordinator won't accept it (memberId = 
> empty, memberEpoch = -1 ). 
> This corner case occurs only when unsubscribe() is called shortly after the 
> first poll()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to