Yu-Lin Chen created KAFKA-17769: ----------------------------------- Summary: Fix flaky PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe Key: KAFKA-17769 URL: https://issues.apache.org/jira/browse/KAFKA-17769 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Yu-Lin Chen Assignee: Yu-Lin Chen
4 flaky out of 110 trunk builds in past 2 weeks. ([Report Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D]) This issue can be reproduced in my local within 50 loops. ([Oct 4 2024 at 10:35:49 CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]): {code:java} org.apache.kafka.common.KafkaException: Failed to close kafka consumer at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) at scala.collection.AbstractIterable.foreach(Iterable.scala:935) at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249) at java.lang.reflect.Method.invoke(Method.java:566) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.ArrayList.forEach(ArrayList.java:1541) at java.util.ArrayList.forEach(ArrayList.java:1541) Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId can't be empty. {code} {*}Root Cause{*}: The following hearbeat requests might happen in flight simultaneously in new consumer: - The first heartbeat, which will get memberId from group coordinator (memberEpoch = 0) - The next heartbeat after an unsubscribe event (memberEpoch = -1) If the first heartbeat still in flight. This is not accepted in coordinator. The second heartbeat will be sent with empty memberId while the first heartbeat is still in flight. And the coordinator won't accept it (memberId = empty, memberEpoch = -1 ). This corner case occurs only when unsubscribe() is called shortly after the first poll() -- This message was sent by Atlassian Jira (v8.20.10#820010)