Kyle Kingsbury created KAFKA-17734:
--------------------------------------

             Summary: KafkaConsumer.close(0) can block indefinitely in 
ConsumerNetworkClient.poll
                 Key: KAFKA-17734
                 URL: https://issues.apache.org/jira/browse/KAFKA-17734
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
    Affects Versions: 3.8.0
         Environment: openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Ubuntu-1ubuntu222.04)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Ubuntu-1ubuntu222.04, mixed mode, 
sharing)
            Reporter: Kyle Kingsbury


This might be related to KAFKA-17263, but I've got a slightly different 
stacktrace.

With the official Java Kafka client library, version 3.8.0, calls to 
`consumer.close()` can stall indefinitely even though I've provided a 
zero-second timeout. I've tried spawning a separate thread which calls 
consumer.wakeup() after one second just to make sure the client gets woken up, 
and that doesn't seem to work either. Jstack indicates the thread calling 
consumer.close() is stuck on AbstractCoordinator.maybeLeaveGroup -> 
ConsumerNetworkClient.poll, which in turn is stuck on a future completion 
handler, which goes *back* into the AbstractCoordinator, which in turn hits 
onLeaderElected, which eventually issues ConsumerNetworkClient.poll(), and that 
looks to ground out in a blocking `select()` call.

It would be really nice if there were a way to reliably release client 
resources--threadpools, network connections, etc--that didn't do in-line 
network IO.

{{"jepsen worker 24" #24101 prio=5 os_prio=0 cpu=6680.43ms elapsed=10760.37s 
tid=0x00007462200059e0 nid=0x4909a runnable  [0x00007474133fd000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPoll.wait(java.base@17.0.12/Native Method)
        at 
sun.nio.ch.EPollSelectorImpl.doSelect(java.base@17.0.12/EPollSelectorImpl.java:118)
        at 
sun.nio.ch.SelectorImpl.lockAndDoSelect(java.base@17.0.12/SelectorImpl.java:129)
        - locked <0x00000002dcbe20d0> (a sun.nio.ch.Util$2)
        - locked <0x00000002dcbe2080> (a sun.nio.ch.EPollSelectorImpl)
        at 
sun.nio.ch.SelectorImpl.select(java.base@17.0.12/SelectorImpl.java:141)
        at org.apache.kafka.common.network.Selector.select(Selector.java:878)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:469)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:595)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:252)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:243)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:165)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.ensureFreshMetadata(ConsumerNetworkClient.java:176)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.updateGroupSubscription(ConsumerCoordinator.java:557)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected(ConsumerCoordinator.java:636)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected(AbstractCoordinator.java:766)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:117)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:670)
        - locked <0x00000002dcbe2228> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:631)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1310)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1285)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:616)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:428)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:313)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:322)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1181)
        - locked <0x00000002dcbe2228> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:1131)
        - locked <0x00000002dcbe2228> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:986)
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.lambda$close$3(LegacyKafkaConsumer.java:1135)
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer$$Lambda$466/0x0000747234d46198.run(Unknown
 Source)
        at org.apache.kafka.common.utils.Utils.swallow(Utils.java:1042)
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1135)
        at 
org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.close(LegacyKafkaConsumer.java:1104)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1777)
        at 
jepsen.redpanda.client$close_consumer_BANG_$fn__7227.invoke(client.clj:212)
        at 
jepsen.redpanda.client$close_consumer_BANG_.invokeStatic(client.clj:211)
        at jepsen.redpanda.client$close_consumer_BANG_.invoke(client.clj:200)
        at jepsen.redpanda.workload.queue.Client.close_BANG_(queue.clj:782)
        at jepsen.client.Validate.close_BANG_(client.clj:81)
        at 
jepsen.generator.interpreter.ClientWorker.close_BANG_(interpreter.clj:69)
        at 
jepsen.generator.interpreter.ClientWorker.invoke_BANG_(interpreter.clj:47)
        at 
jepsen.generator.interpreter$spawn_worker$fn__13745$fn__13746.invoke(interpreter.clj:140)
        at 
jepsen.generator.interpreter$spawn_worker$fn__13745.invoke(interpreter.clj:123)
        at clojure.core$binding_conveyor_fn$fn__5842.invoke(core.clj:2047)
        at clojure.lang.AFn.call(AFn.java:18)
        at 
java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)}}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to