[ https://issues.apache.org/jira/browse/FLINK-22946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-22946: ----------------------------------- Labels: stale-blocker (was: ) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as a Blocker but is unassigned and neither itself nor its Sub-Tasks have been updated for 1 days. I have gone ahead and marked it "stale-blocker". If this ticket is a Blocker, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Network buffer deadlock introduced by unaligned checkpoint > ---------------------------------------------------------- > > Key: FLINK-22946 > URL: https://issues.apache.org/jira/browse/FLINK-22946 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.13.0, 1.13.1 > Reporter: Guokuai Huang > Priority: Blocker > Labels: stale-blocker > Attachments: Screen Shot 2021-06-09 at 6.39.47 PM.png, Screen Shot > 2021-06-09 at 7.02.04 PM.png > > > We recently encountered deadlock when using unaligned checkpoint. Below are > two thread stacks that cause deadlock: > {code:java} > "Channel state writer Join(xxxxxx) (34/256)#1": at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) > - waiting to lock <0x00000007296dfa90> (a > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) > at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) > at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) > - locked <0x00000007296bc450> (a > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.write(ChannelStateCheckpointWriter.java:173) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.writeInput(ChannelStateCheckpointWriter.java:131) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$write$0(ChannelStateWriteRequest.java:63) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$785/722492780.accept(Unknown > Source) at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$buildWriteRequest$2(ChannelStateWriteRequest.java:93) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest$$Lambda$786/1360749026.accept(Unknown > Source) at > org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:212) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:82) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl$$Lambda$253/502209879.run(Unknown > Source) at java.lang.Thread.run(Thread.java:745){code} > {code:java} > "Join(xxxxxx) (34/256)#1": at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager.notifyBufferAvailable(BufferManager.java:296) > - waiting to lock <0x00000007296bc450> (a > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.fireBufferAvailableNotification(LocalBufferPool.java:507) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:494) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:460) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) > at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue.addExclusiveBuffer(BufferManager.java:399) > at > org.apache.flink.runtime.io.network.partition.consumer.BufferManager.recycle(BufferManager.java:200) > - locked <0x00000007296dfa90> (a > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:182) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:156) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:95) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:96) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$226/1801850008.runDefaultAction(Unknown > Source) at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$577/1653738667.run(Unknown > Source) at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at > java.lang.Thread.run(Thread.java:745){code} > The root cause of this problem is that unaligned checkpoint makes it possible > that *under the same input gate,* *multiple input channels may recycle > network buffer at the same time.* > Previously, network buffer recycling would only occur serially between input > channels under the same input gate, because each sub-task is process Input > data serially, and an input gate belongs to only one sub-task. When unaligned > checkpoint is enabled, each input channel will take a snapshot of the input > channel when it receives the checkpoint barrier, and the network buffer may > be recycled in the process. > Unfortunately, *the current network buffer recycling mechanism does not take > into account the situation where multiple input channels perform network > buffer recycling at the same time.* The following code block is from > org.apache.flink.runtime.io.network.partition.consumer.BufferManager$AvailableBufferQueue > that causes deadlock when multiple input channels under same input gate > perform network buffer recycling at the same time. > !Screen Shot 2021-06-09 at 7.02.04 PM.png! > The solution to this problem is quite straightforward. Here are two possible > solutions: > *1. Case by case solution.* Note that input channel A (locked A) gave the > released network buffer to input channel B (waiting to lock B), and input > channel B (locked B) gave the released network buffer to input channel A > (waiting to lock A) ), so when an input channel releases the network buffer, > first check whether it is also waiting for the network buffer, and if it is, > directly allocate it to itself, which can avoid the situation that different > input channels exchange network buffers. > 2. *A straightforward solution.* Considering that the input channel occupies > the lock during recycle to remove the network buffer from the bufferQueue, > the subsequent operations do not need to hold this lock. Therefore, we only > need to place Buffer::recycleBuffer outside the bufferQueue lock to avoid > deadlock. -- This message was sent by Atlassian Jira (v8.3.4#803005)