[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 1:17 PM:
------------------------------------------------------------

> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

>From inspecting the code I am still unsure why record batches aren't being 
>expired properly. [~kwadhwa] if you could provide a thread dump when the 
>hanging occurs and enable trace logging that will be help us diagnose the 
>issue. 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

>From inspecting the code I am still unsure why record batches aren't being 
>expired properly. [~kwadhwa] if you could provide a thread dump when the 
>hanging occurs that will be help us diagnose the issue. 

 

> Kafka producer stuck after broker crash
> ---------------------------------------
>
>                 Key: KAFKA-10114
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10114
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Itamar Benjamin
>            Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 60000 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 
> sleeping [0x00007ff55c177000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
>         at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
>         at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>         at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
>         - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to