I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 at sun.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) ... "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54) - blocked on java.lang.Object@2af646cc at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) Thanks, Alexey ________________________________ From: Roman Khachatryan <ro...@apache.org> Sent: Monday, March 22, 2021 1:36 AM To: ChangZhuo Chen (陳昌倬) <czc...@czchen.org> Cc: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List <user@flink.apache.org> Subject: Re: Checkpoint fail due to timeout Thanks for sharing the thread dump. It shows that the source thread is indeed back-pressured (checkpoint lock is held by a thread which is trying to emit but unable to acquire any free buffers). The lock is per task, so there can be several locks per TM. @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely the same issue (but I can't tell for sure without a full thread dump) Regards, Roman On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <czc...@czchen.org> wrote: > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > Hi Roman, > > I took thread dump: > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > - blocked on java.lang.Object@5366a0e2 > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" > > Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at sun.misc.Unsafe.park(Native Method) > > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > at > > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > at > > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > at > > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > at > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > at > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see > > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing > > blocked on different Objects. > > Hi, > > This call stack is similar to our case as described in [0]. Maybe they > are the same issue? > > [0] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > -- > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > http://czchen.info/ > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B