[ 
https://issues.apache.org/jira/browse/KAFKA-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703381#comment-16703381
 ] 

ASF GitHub Bot commented on KAFKA-4615:
---------------------------------------

Mogztter closed pull request #2395: KAFKA-4615: Use a timeout when polling the 
request in AdminClient
URL: https://github.com/apache/kafka/pull/2395
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 33089d107a9..f2dc5030c6b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -45,7 +45,7 @@ class AdminClient(val time: Time,
     var future: RequestFuture[ClientResponse] = null
 
     future = client.send(target, request)
-    client.poll(future)
+    client.poll(future, requestTimeoutMs)
 
     if (future.succeeded())
       future.value().responseBody()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient.send function poll without timeout
> ----------------------------------------------
>
>                 Key: KAFKA-4615
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4615
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin
>    Affects Versions: 0.10.1.1
>         Environment: Red Hat Enterprise Linux Server release 7.1
>            Reporter: Guillaume Grossetie
>            Priority: Major
>
> I'm using the AdminClient to fetch the consumer offsets of my topics.
> When the Kafka cluster is unavailable, I can see the following messages:
> {code}
> 2017-01-11 05:36:45,432 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64444 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,569 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64445 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,673 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64446 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,776 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64447 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,880 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64448 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,984 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64449 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:46,091 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64450 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:46,198 WARN  [kafka-producer-network-thread | producer-1] 
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with 
> correlation id 64451 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> // ...
> {code}
> The problem is that the request never ends.
> 4 hours later, my process is stuck and the request is pending.
> Here's a jstack on my Java process :
> {code}
> $ jstack  -l 19841
> 2017-01-11 10:32:58
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):
> "Attach Listener" #12 daemon prio=9 os_prio=0 tid=0x00007f7c5c001000 
> nid=0x6c0b waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "DestroyJavaVM" #11 prio=5 os_prio=0 tid=0x00007f7c90008800 nid=0x4d88 
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "pool-1-thread-1" #10 prio=5 os_prio=0 tid=0x00007f7c90468800 nid=0x4d96 
> runnable [0x00007f7c78adc000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x00000000c52cbc40> (a sun.nio.ch.Util$2)
>         - locked <0x00000000c52cbc30> (a 
> java.util.Collections$UnmodifiableSet)
>         - locked <0x00000000c52caf88> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at org.apache.kafka.common.network.Selector.select(Selector.java:454)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
>         at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
>         at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>         at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
>         at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
>         at 
> kafka.admin.AdminClient.listAllConsumerGroups(AdminClient.scala:111)
>         at 
> org.indusbox.idatha.kafka.lag.provider.KafkaLagConsumerOffsetsTopicProvider.fetch(KafkaLagConsumerOffsetsTopicProvider.java:65)
>         at 
> org.indusbox.idatha.kafka.lag.KafkaLagPollingReporter.run(KafkaLagPollingReporter.java:49)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>    Locked ownable synchronizers:
>         - <0x00000000c51c7090> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "kafka-producer-network-thread | producer-1" #9 daemon prio=5 os_prio=0 
> tid=0x00007f7c9032f800 nid=0x4d95 runnable [0x00007f7c78dde000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x00000000c51a2a28> (a sun.nio.ch.Util$2)
>         - locked <0x00000000c51a2a18> (a 
> java.util.Collections$UnmodifiableSet)
>         - locked <0x00000000c51a1bd0> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at org.apache.kafka.common.network.Selector.select(Selector.java:454)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
>         at java.lang.Thread.run(Thread.java:745)
>    Locked ownable synchronizers:
>         - None
> "Service Thread" #7 daemon prio=9 os_prio=0 tid=0x00007f7c90140000 nid=0x4d91 
> runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "C1 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f7c9013d800 
> nid=0x4d90 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f7c9013a800 
> nid=0x4d8f waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f7c90139000 
> nid=0x4d8e runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>    Locked ownable synchronizers:
>         - None
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f7c90105800 nid=0x4d8d in 
> Object.wait() [0x00007f7c7a24e000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00000000c52c5168> (a 
> java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>         - locked <0x00000000c52c5168> (a java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
>         at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>    Locked ownable synchronizers:
>         - None
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f7c90101000 
> nid=0x4d8c in Object.wait() [0x00007f7c801f8000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00000000c52c5198> (a java.lang.ref.Reference$Lock)
>         at java.lang.Object.wait(Object.java:502)
>         at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
>         - locked <0x00000000c52c5198> (a java.lang.ref.Reference$Lock)
>         at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
>    Locked ownable synchronizers:
>         - None
> "VM Thread" os_prio=0 tid=0x00007f7c900f9800 nid=0x4d8b runnable
> "GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f7c9001d800 nid=0x4d89 
> runnable
> "GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f7c9001f800 nid=0x4d8a 
> runnable
> "VM Periodic Task Thread" os_prio=0 tid=0x00007f7c90142800 nid=0x4d92 waiting 
> on condition
> JNI global references: 287
> {code}
> As you can my thread is still running 
> {{org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)}}
> I think we should use the poll method with a timeout and give up after a 
> short period of time :
> {code}public boolean poll(RequestFuture<?> future, long timeout){code}
> Ref: 
> https://github.com/apache/kafka/blob/75469a3b602c26ea81d6fc0a409d39d321195ea4/core/src/main/scala/kafka/admin/AdminClient.scala#L46



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to