[
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354259#comment-16354259
]
Nick Travers commented on KAFKA-4669:
-------------------------------------
Chiming in again to note that we're still running into this issue
intermittently. The failure mode is the same, with a BufferUnderflowException
and stack trace similar to what I posted above.
For some additional context, when this occurs it ultimately leads to a JVM that
cannot exit as it is waiting on a latch that will never be closed. Here's the
hung thread
{code:java}
"async-message-sender-0" #120 daemon prio=5 os_prio=0 tid=0x00007f30b4003000
nid=0x195a1 waiting on condition [0x00007f3105ce1000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@9/Native Method)
- parking to wait for <0x00000007852b1b68> (a
java.util.concurrent.CountDownLatch$Sync)
at
java.util.concurrent.locks.LockSupport.park(java.base@9/LockSupport.java:194)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@9/AbstractQueuedSynchronizer.java:871)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1024)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@9/AbstractQueuedSynchronizer.java:1331)
at
java.util.concurrent.CountDownLatch.await(java.base@9/CountDownLatch.java:232)
at
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
at
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at com.squareup.core.concurrent.Futures2.getAll(Futures2.java:462)
at
com.squareup.kafka.ng.producer.KafkaProducer.sendMessageBatch(KafkaProducer.java:214)
- locked <0x0000000728c71998> (a
com.squareup.kafka.ng.producer.KafkaProducer)
at
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.sendBatch(BufferedKafkaProducer.java:585)
at
com.squareup.kafka.ng.producer.BufferedKafkaProducer$AsyncBatchMessageSender.run(BufferedKafkaProducer.java:536)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@9/ThreadPoolExecutor.java:1167)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@9/ThreadPoolExecutor.java:641)
at java.lang.Thread.run(java.base@9/Thread.java:844)
{code}
[Here is the
latch|https://github.com/apache/kafka/blob/0.11.0.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java#L34]
that is still open in the ProduceRequestResult. I assume that the network
thread is responsible for closing that, but if that thread crashes for whatever
reason, it never gets a chance to callCountDownLatch#countDown.
Arguably, we should probably be using a combination of daemon threads, and the
timed version of Future#get, but it _feels_ like something that could be fixed
in the producer client, even if it's just for the sake of ensuring that failed
ProduceRequestResults can be GC'd eventually, which can't happen if another
thread is hung waiting on the latch.
cc: [~rsivaram] [~hachikuji]
> KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws
> exception
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-4669
> URL: https://issues.apache.org/jira/browse/KAFKA-4669
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1
> Reporter: Cheng Ju
> Assignee: Rajini Sivaram
> Priority: Critical
> Labels: reliability
> Fix For: 0.11.0.1, 1.0.0
>
>
> There is no try catch in NetworkClient.handleCompletedReceives. If an
> exception is thrown after inFlightRequests.completeNext(source), then the
> corresponding RecordBatch's done will never get called, and
> KafkaProducer.flush will hang on this RecordBatch.
> I've checked 0.10 code and think this bug does exist in 0.10 versions.
> A real case. First a correlateId not match exception happens:
> 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21]
> (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught
> error in kafka producer I/O thread:
> java.lang.IllegalStateException: Correlation id for response (703766) does
> not match request (703764)
> at
> org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
> at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> Then jstack shows the thread is hanging on:
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
> at
> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
> at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
> at java.lang.Thread.run(Thread.java:745)
> client code
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)