Kyle Kingsbury created KAFKA-17734: -------------------------------------- Summary: KafkaConsumer.close(0) can block indefinitely in ConsumerNetworkClient.poll Key: KAFKA-17734 URL: https://issues.apache.org/jira/browse/KAFKA-17734 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.8.0 Environment: openjdk version "17.0.12" 2024-07-16 OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04) OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, sharing) Reporter: Kyle Kingsbury
This might be related to KAFKA-17263, but I've got a slightly different stacktrace. With the official Java Kafka client library, version 3.8.0, calls to `consumer.close()` can stall indefinitely even though I've provided a zero-second timeout. I've tried spawning a separate thread which calls consumer.wakeup() after one second just to make sure the client gets woken up, and that doesn't seem to work either. Jstack indicates the thread calling consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup -> ConsumerNetworkClient.poll, which in turn is stuck on a future completion handler, which goes *back* into the AbstractCoordinator, which in turn hits onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and that looks to ground out in a blocking `select()` call. It would be really nice if there were a way to reliably release client resources--threadpools, network connections, etc--that didn't do in-line network IO. {{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s tid=0x00007462200059e0 nid=0x4909a runnable [0x00007474133fd000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method) at sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118) at sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129) - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2) - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141) at org.apache.kafka.common.network.Selector.select(Selector.java:878) at org.apache.kafka.common.network.Selector.poll(Selector.java:469) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131) - locked <0x00000002dcbe2228> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown Source) at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135) at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777) at jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212) at jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211) at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200) at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782) at jepsen.client.Validate.close_BANG_(client.clj:81) at jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69) at jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47) at jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140) at jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123) at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047) at clojure.lang.AFn.call(AFn.java:18) at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)