Unfortunately, the lock can't be changed as it's part of the public API (though it will be eliminated with the new source API in FLIP-27).
Theoretically, the change you've made should improve checkpointing at the cost of throughput. Is it what you see? But the new stack traces seem strange to me as the emission of the checkpoint barrier doesn't require a buffer. I also don't see that the source thread holds the checkpoint lock (something like "locked <0x000000002af646cc> (a java.lang.Object)"). Could you post or attach the full thread dump? Regards, Roman On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <yen...@msn.com> wrote: > > I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on > each record, it inverted behavior - now Legacy Source thread waits for > checkpointLock, while Source requesting memorySegment. > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on > java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1 > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > ... > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 > BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> > Filter (6/6)#0" Id=199 > at > com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54) > - blocked on java.lang.Object@2af646cc > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) > > Thanks, > Alexey > ________________________________ > From: Roman Khachatryan <ro...@apache.org> > Sent: Monday, March 22, 2021 1:36 AM > To: ChangZhuo Chen (陳昌倬) <czc...@czchen.org> > Cc: Alexey Trenikhun <yen...@msn.com>; Flink User Mail List > <user@flink.apache.org> > Subject: Re: Checkpoint fail due to timeout > > Thanks for sharing the thread dump. > > It shows that the source thread is indeed back-pressured > (checkpoint lock is held by a thread which is trying to emit but > unable to acquire any free buffers). > > The lock is per task, so there can be several locks per TM. > > @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely > the same issue (but I can't tell for sure without a full thread dump) > > > Regards, > Roman > > On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <czc...@czchen.org> > wrote: > > > > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote: > > > Hi Roman, > > > I took thread dump: > > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on > > > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: > > > digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > > > - blocked on java.lang.Object@5366a0e2 > > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > > > at > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > > > > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" > > > Id=202 WAITING on > > > java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at sun.misc.Unsafe.park(Native Method) > > > - waiting on > > > java.util.concurrent.CompletableFuture$Signaller@6915c7ef > > > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > > at > > > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > > > at > > > java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > > > at > > > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > > > at > > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > > at > > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > > > at > > > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > > > > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see > > > multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing > > > blocked on different Objects. > > > > Hi, > > > > This call stack is similar to our case as described in [0]. Maybe they > > are the same issue? > > > > [0] > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html > > > > > > -- > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > http://czchen.info/ > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B