Guokuai Huang created FLINK-22946:
-------------------------------------

             Summary: Network buffer deadlock introduced by unaligned checkpoint
                 Key: FLINK-22946
                 URL: https://issues.apache.org/jira/browse/FLINK-22946
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.13.1, 1.13.0
            Reporter: Guokuai Huang
         Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot 
2021-06-09 at 7.02.04 PM.png

We recently encountered deadlock when using unaligned checkpoint. Below are two 
thread stacks that cause deadlock:
```
"Channel state writer Join(xxxxxx) (34/256)#1":"Channel state writer 
Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown
 Source) at 
org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
 at 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown
 Source) at java.lang.Thread.run(Thread.java:745)"Join(xxxxxx) (34/256)#1": at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296)
 - waiting to lock <0x00000007296bc450> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399)
 at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200)
 - locked <0x00000007296dfa90> (a 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
 at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
java.lang.Thread.run(Thread.java:745)
```

The root cause of this problem is that unaligned checkpoint makes it possible 
that *under the same input gate,* *multiple input channels may recycle network 
buffer at the same time.*

Previously, network buffer recycling would only occur serially between input 
channels under the same input gate, because each sub-task is process Input data 
serially, and an input gate belongs to only one sub-task. When unaligned 
checkpoint is enabled, each input channel will take a snapshot of the input 
channel when it receives the checkpoint barrier, and the network buffer may be 
recycled in the process.

Unfortunately, *the current network buffer recovery mechanism does not take 
into account the situation where multiple input channels perform network buffer 
recovery at the same time.* The following code block is from that causes 
deadlock when multiple input channels under same input gate perform network 
buffer recycling at the same time.

!Screen Shot 2021-06-09 at 7.02.04 PM.png!

The solution to this problem is very straightforward. Here are two possible 
solutions:
*1. Case by case solution.* Note that input channel A (locked A) gave the 
released network buffer to input channel B (waiting to lock B), and input 
channel B (locked B) gave the released network buffer to input channel A 
(waiting to lock A) ), so when an input channel releases the network buffer, 
first check whether it is also waiting for the network buffer, and if it is, 
directly allocate it to itself, which can avoid the situation that different 
input channels exchange network buffers.
2. *A straightforward solution.* Considering that the input channel occupies 
the lock during recycle to remove the network buffer from the bufferQueue, the 
subsequent operations do not need to hold this lock. Therefore, we only need to 
place Buffer::recycleBuffer outside the bufferQueue lock to avoid deadlock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to