[ https://issues.apache.org/jira/browse/KAFKA-17769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu-Lin Chen updated KAFKA-17769: -------------------------------- Description: 4 flaky out of 110 trunk builds in past 2 weeks. ([Report Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D]) This issue can be reproduced in my local within 50 loops. ([Oct 4 2024 at 10:35:49 CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]): {code:java} org.apache.kafka.common.KafkaException: Failed to close kafka consumer at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) at scala.collection.AbstractIterable.foreach(Iterable.scala:935) at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249) at java.lang.reflect.Method.invoke(Method.java:566) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.ArrayList.forEach(ArrayList.java:1541) at java.util.ArrayList.forEach(ArrayList.java:1541) Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId can't be empty. {code} {*}Root Cause{*}: The following hearbeat requests might happen in flight simultaneously in new consumer: - The first heartbeat, which will get memberId from group coordinator (memberEpoch = 0) - The next heartbeat after an unsubscribe event (memberEpoch = -1) The second heartbeat will be sent with empty memberId while the first heartbeat is still in flight. And the coordinator won't accept it (memberId = empty, memberEpoch = -1 ). This corner case occurs only when unsubscribe() is called shortly after the first poll() was: 4 flaky out of 110 trunk builds in past 2 weeks. ([Report Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D]) This issue can be reproduced in my local within 50 loops. ([Oct 4 2024 at 10:35:49 CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]): {code:java} org.apache.kafka.common.KafkaException: Failed to close kafka consumer at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249) at kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) at scala.collection.AbstractIterable.foreach(Iterable.scala:935) at kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249) at java.lang.reflect.Method.invoke(Method.java:566) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at java.util.ArrayList.forEach(ArrayList.java:1541) at java.util.ArrayList.forEach(ArrayList.java:1541) Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId can't be empty. {code} {*}Root Cause{*}: The following hearbeat requests might happen in flight simultaneously in new consumer: - The first heartbeat, which will get memberId from group coordinator (memberEpoch = 0) - The next heartbeat after an unsubscribe event (memberEpoch = -1) If the first heartbeat still in flight. This is not accepted in coordinator. The second heartbeat will be sent with empty memberId while the first heartbeat is still in flight. And the coordinator won't accept it (memberId = empty, memberEpoch = -1 ). This corner case occurs only when unsubscribe() is called shortly after the first poll() > Fix flaky > PlaintextConsumerSubscriptionTest.testSubscribeInvalidTopicCanUnsubscribe > ----------------------------------------------------------------------------------- > > Key: KAFKA-17769 > URL: https://issues.apache.org/jira/browse/KAFKA-17769 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Yu-Lin Chen > Assignee: Yu-Lin Chen > Priority: Major > > 4 flaky out of 110 trunk builds in past 2 weeks. ([Report > Link|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1728584869905&search.startTimeMin=1726156800000&search.tags=trunk&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerSubscriptionTest&tests.test=testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D]) > This issue can be reproduced in my local within 50 loops. > > ([Oct 4 2024 at 10:35:49 > CST|https://ge.apache.org/s/o4ir4xtitsu52/tests/task/:core:test/details/kafka.api.PlaintextConsumerSubscriptionTest/testSubscribeInvalidTopicCanUnsubscribe(String%2C%20String)%5B3%5D?top-execution=1]): > {code:java} > org.apache.kafka.common.KafkaException: Failed to close kafka consumer > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1249) > > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.close(AsyncKafkaConsumer.java:1204) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1718) > > at > kafka.api.IntegrationTestHarness.$anonfun$tearDown$3(IntegrationTestHarness.scala:249) > > at > kafka.api.IntegrationTestHarness.$anonfun$tearDown$3$adapted(IntegrationTestHarness.scala:249) > > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:619) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:617) > at scala.collection.AbstractIterable.foreach(Iterable.scala:935) > at > kafka.api.IntegrationTestHarness.tearDown(IntegrationTestHarness.scala:249) > > at java.lang.reflect.Method.invoke(Method.java:566) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) > > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > > at > java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) > > at > java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) > at java.util.ArrayList.forEach(ArrayList.java:1541) > at java.util.ArrayList.forEach(ArrayList.java:1541) > Caused by: org.apache.kafka.common.errors.InvalidRequestException: MemberId > can't be empty. {code} > {*}Root Cause{*}: > The following hearbeat requests might happen in flight simultaneously in new > consumer: > - The first heartbeat, which will get memberId from group coordinator > (memberEpoch = 0) > - The next heartbeat after an unsubscribe event (memberEpoch = -1) > The second heartbeat will be sent with empty memberId while the first > heartbeat is still in flight. And the coordinator won't accept it (memberId = > empty, memberEpoch = -1 ). > This corner case occurs only when unsubscribe() is called shortly after the > first poll() -- This message was sent by Atlassian Jira (v8.20.10#820010)