[ 
https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17769.
------------------------------------
    Resolution: Fixed

> Fix flaky 
> PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-17769
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17769
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Yu-Lin Chen
>            Assignee: Yu-Lin Chen
>            Priority: Major
>              Labels: flaky-test, integration-test, kip-848-client-support
>             Fix For: 4.0.0
>
>
> 4 flaky out of 110 trunk builds in past 2 weeks. ([Report 
> Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D])
> This issue can be reproduced in my local within 50 loops.
>  
> ([Oct 4 2024 at 10:35:49 
> CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]):
> {code:java}
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer        
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249)
>      
> at 
> kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249)
>      
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619)   
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617)  
> at scala.collection.AbstractIterable.foreach(Iterable.scala:935)      
> at 
> kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249)   
>      
> at java.lang.reflect.Method.invoke(Method.java:566)   
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)    
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)   
>      
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
>     
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)  
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
>      
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)      
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)  
>      
> at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) 
>      
> at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>         
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)      
> at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)     
> at java.util.ArrayList.forEach(ArrayList.java:1541)   
> at java.util.ArrayList.forEach(ArrayList.java:1541)
> Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId 
> can't be empty. {code}
> {*}Root Cause{*}: 
> The following hearbeat requests might happen in flight simultaneously in new 
> consumer:
>  - The first heartbeat, which will get memberId from group coordinator 
> (memberEpoch = 0)
>  - The next heartbeat after an unsubscribe event (memberEpoch = -1)
> The second heartbeat will be sent with empty memberId while the first 
> heartbeat is still in flight. And the coordinator won't accept it (memberId = 
> empty, memberEpoch = -1 ). 
> This corner case occurs only when unsubscribe() is called shortly after the 
> first poll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to