Guozhang Wang created KAFKA-8135:
------------------------------------

             Summary: Kafka Producer deadlocked on flush call with intermittent 
broker unavailability
                 Key: KAFKA-8135
                 URL: https://issues.apache.org/jira/browse/KAFKA-8135
             Project: Kafka
          Issue Type: Improvement
          Components: clients
    Affects Versions: 2.1.0
            Reporter: Guozhang Wang


In KIP-91 we added the config {{delivery.timeout.ms}} to replace {{retries}}, 
and the value is default to 2 minutes. We've observed that when it was set to 
MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the 
{{broker.flush}} call would be blocked during the time when its destination 
brokers are undergoing some unavailability:

{code}
java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
    - parking to wait for  <0x00000006aeb21a00> (a 
java.util.concurrent.CountDownLatch$Sync)
    at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown 
Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown
 Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown
 Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown
 Source)
    at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown 
Source)
    at 
org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
    at 
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
{code}

And even after the broker went back to normal, producers would still be 
blocked. One suspicion is that when broker's not able to handle the request in 
time, the responses are dropped somehow inside the Sender, and hence whoever 
waiting on this response would be blocked forever.

We've observed such scenarios when 1) broker's transiently failed for a while, 
2) network partitioned transiently, and 3) broker's bad config like ACL caused 
it to not be able to handle requests for a while.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to