[ https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chia-Ping Tsai resolved KAFKA-17769. ------------------------------------ Resolution: Fixed > Fix flaky > PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe > ----------------------------------------------------------------------------------- > > Key: KAFKA-17769 > URL: https://issues.apache.org/jira/browse/KAFKA-17769 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Yu-Lin Chen > Assignee: Yu-Lin Chen > Priority: Major > Labels: flaky-test, integration-test, kip-848-client-support > Fix For: 4.0.0 > > > 4 flaky out of 110 trunk builds in past 2 weeks. ([Report > Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D]) > This issue can be reproduced in my local within 50 loops. > > ([Oct 4 2024 at 10:35:49 > CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]): > {code:java} > org.apache.kafka.common.KafkaException: Failed to close kafka consumer > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249) > > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718) > > at > kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249) > > at > kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249) > > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) > at scala.collection.AbstractIterable.foreach(Iterable.scala:935) > at > kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249) > > at java.lang.reflect.Method.invoke(Method.java:566) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) > > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.ArrayList.forEach(ArrayList.java:1541) > at java.util.ArrayList.forEach(ArrayList.java:1541) > Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId > can't be empty. {code} > {*}Root Cause{*}: > The following hearbeat requests might happen in flight simultaneously in new > consumer: > - The first heartbeat, which will get memberId from group coordinator > (memberEpoch = 0) > - The next heartbeat after an unsubscribe event (memberEpoch = -1) > The second heartbeat will be sent with empty memberId while the first > heartbeat is still in flight. And the coordinator won't accept it (memberId = > empty, memberEpoch = -1 ). > This corner case occurs only when unsubscribe() is called shortly after the > first poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)