Yu-Lin Chen created KAFKA-17769:
-----------------------------------

             Summary: Fix flaky 
PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
                 Key: KAFKA-17769
                 URL: https://issues.apache.org/jira/browse/KAFKA-17769
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
            Reporter: Yu-Lin Chen
            Assignee: Yu-Lin Chen


4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])

This issue can be reproduced in my local within 50 loops.
 
([Oct 4 2024 at 10:35:49 
CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
{code:java}
org.apache.kafka.common.KafkaException: Failed to close kafka consumer  

at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
   
at 
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
   
at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)  
     
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
       
at 
kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
       
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)     
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)    
at scala.collection.AbstractIterable.foreach(Iterable.scala:935)        
at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)  
at java.lang.reflect.Method.invoke(Method.java:566)     
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)      
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)  
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) 
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)    
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)  
     
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)        
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) 
at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)   
     
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
  
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)        
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)       
at java.util.ArrayList.forEach(ArrayList.java:1541)     
at java.util.ArrayList.forEach(ArrayList.java:1541)

Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
can't be empty. {code}
{*}Root Cause{*}: 
The following hearbeat requests might happen in flight simultaneously in new 
consumer:
 - The first heartbeat, which will get memberId from group coordinator 
(memberEpoch = 0)
 - The next heartbeat after an unsubscribe event (memberEpoch = -1)

If the first heartbeat still in flight. This is not accepted in coordinator.

The second heartbeat will be sent with empty memberId while the first heartbeat 
is still in flight. And the coordinator won't accept it (memberId = empty, 
memberEpoch = -1 ). 

This corner case occurs only when unsubscribe() is called shortly after the 
first poll()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to