Itamar Benjamin created KAFKA-10114:
---------------------------------------

             Summary: Kafka producer stuck after broker crash
                 Key: KAFKA-10114
                 URL: https://issues.apache.org/jira/browse/KAFKA-10114
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 2.4.1, 2.3.1
            Reporter: Itamar Benjamin


Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
were not able to send new messages. After brokers started again all producers 
resumed sending data except for a single one.

at the beginning producer rejected all new messages with TimeoutException:

 
{code:java}
 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation
{code}
 

then after sometime exception changed to

 
{code:java}
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
within the configured max blocking time 60000 ms.
{code}
 

 

jstack shows kafka-producer-network-thread is waiting to get producer id:

 
{code:java}
"kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 
sleeping [0x00007ff55c177000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
        at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
        at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
ownable synchronizers:
        - None
{code}
 

digging into maybeWaitForProducerId(), it waits until some broker is ready 
(awaitNodeReady function) which in return calls leastLoadedNode() on 
NetworkClient. This one iterates over all brokers and checks if a request can 
be sent to it using canSendRequest().

This is the code for canSendRequest():

 
{code:java}
return connectionStates.isReady(node, now) && selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)
{code}
 

 

using some debugging tools i saw this expression always evaluates to false 
since the last part (canSendMore) is false. 

 

This is the code for canSendMore:
{code:java}
public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> 
queue = requests.get(node); return queue == null || queue.isEmpty() || 
(queue.peekFirst().send.completed() && queue.size() < 
this.maxInFlightRequestsPerConnection); }
{code}
 

 

i verified 
{code:java}
queue.peekFirst().send.completed()
{code}
is true, and that leads to the live lock - since requests queues are full for 
all nodes a new request to check broker availability and reconnect to it cannot 
be submitted.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to