I understand the operator wouldn't be causing the checkpoint issue based on your comments. I need to debug further, but the symptoms are not showing up again for my Cx. I will close this as not a problem. Thanks for the detailed explanations.
On Wed, Dec 6, 2023 at 8:22 PM Darin Amos <darin.a...@instacart.com.invalid> wrote: > Just to confirm, I ran a test this morning using a large file with only a > single file split that took over 10 minutes to process. My > checkpoints still executed as expected every minute with minimal latency > (sub second). Though I ran this with 1.15.4. > > What evidence do you have that this operator is causing the timeout? > Typically this operator should handle the checkpoint barrier quickly and > proceed with reading (ignoring alignment). In my early days I suspected a > similar issue but it was other checkpoint alignment issues. > > Darin > > On Wed, Dec 6, 2023 at 4:13 AM Prabhu Joseph <prabhujose.ga...@gmail.com> > wrote: > > > Thanks Darin for the details. > > > > Below is the problematic StreamTask, which has not processed the priority > > event (checkpoint barrier) from the > > mailbox, and has been there throughout the checkpoint timeout interval. > And > > it is reading and collecting all the records from the split. > > Not sure why the executor was idle during that period, even though I > could > > see the upstream source task checkpoint acknowledgement > > message was received by the JobManager. The Flink Version is 1.16. > > > > at > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:400) > > > > at > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:364) > > > > at > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240) > > > > at > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$$Lambda$1199/0x00007f3d2b07a858.run(Unknown > > Source) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > > > > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86) > > > > at > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:466) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1827/0x00007f3d2482c058.run(Unknown > > Source) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1825/0x00007f3d2482a900.run(Unknown > > Source) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > > > > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:152) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:600) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:559) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$765/0x00007f3d326ee058.runDefaultAction(Unknown > > Source) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > > > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > > > > at > > > > > org.apache.flink.runtime.taskmanager.Task$$Lambda$1637/0x00007f3d2a9b1470.run(Unknown > > Source) > > > > at > > > > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > > > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > > > > at java.lang.Thread.run(java.base@11.0.21/Thread.java:829) > > > > On Wed, Dec 6, 2023 at 8:11 AM Darin Amos <darin.a...@instacart.com > > .invalid> > > wrote: > > > > > I apologize, I was a little off with my description, it's been a while > > > since I have looked at this code but I have refreshed myself. > > > > > > The line I referred to earlier was correct though. This operator only > > > processes records in a file split while the operator is idle, meaning > > there > > > are no more incoming file splits. After every read it checks if there > are > > > any incoming file splits before continuing to read from the split. If > > there > > > is indeed a new inbound file split, the loop will exit and it will > > re-queue > > > itself to continue processing records later. You can see that here > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L361 > > > > > > > . > > > > > > When the loop is interrupted by a checkpoint barrier, > snapshotState(...) > > is > > > called and the reader grabs the state from the provided Format. In the > > > normal case the state is simply the split offset (current progress > > > indicator), in more complex scenarios you can create your own format > > class > > > and provide whatever serializable state you desire. In my case we store > > > additional metadata about the progress of the reader. > > > > > > On state restore, the operator calls loadSplit and it will call > *reopen* > > on > > > the format rather than open, passing the checkpoint state so you can > > > continue from where you left off . You can see that here > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407-L411 > > > > > > > . > > > > > > Cheers > > > > > > Darin > > > > > > > > > > > > > > > On Tue, Dec 5, 2023 at 8:08 PM Darin Amos <darin.a...@instacart.com> > > > wrote: > > > > > > > They way I understand this loop is that the > > ContinuiousFileReaderOperator > > > > only processes records in the background while the operator is idle, > > i.e. > > > > while it's not receiving any records. > > > > > > > > At the very bottom of that loop here > > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6 > > > > > > > > it exits if the executor is no longer idle, i.e. there are incoming > > > records. > > > > > > > > If you look here > > > > < > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407 > > > >, > > > > the operator supports checkpointable input splits, meaning it'll save > > > it's > > > > place within a file split. This would only be possible if the reader > > can > > > be > > > > interrupted in the middle of a split. I have written custom splits > that > > > do > > > > this exactly. > > > > > > > > Darin > > > > > > > > On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph < > > > prabhujose.ga...@gmail.com> > > > > wrote: > > > > > > > >> This is the loop - code reference > > > >> < > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346 > > > >> >, > > > >> where it fetches all records from the split, and then only the > > > >> MailboxProcessor gets control to check other mail. This loop was > > > >> introduced > > > >> here > > > >> < > > > >> > > > > > > https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f > > > >> > > > > >> . > > > >> > > > >> > > > >> > > > >> > > > >> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com > > > >> .invalid> > > > >> wrote: > > > >> > > > >> > I thought for sure this was already the existing behavior with > this > > > >> > operator. Does it not check the mailbox executor after every > record > > > >> read? > > > >> > > > > >> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) < > > j...@apache.org> > > > >> > wrote: > > > >> > > > > >> > > Prabhu Joseph created FLINK-33753: > > > >> > > ------------------------------------- > > > >> > > > > > >> > > Summary: ContinuousFileReaderOperator consume > records > > > as > > > >> > mini > > > >> > > batch > > > >> > > Key: FLINK-33753 > > > >> > > URL: > > > >> https://issues.apache.org/jira/browse/FLINK-33753 > > > >> > > Project: Flink > > > >> > > Issue Type: Improvement > > > >> > > Affects Versions: 1.18.0 > > > >> > > Reporter: Prabhu Joseph > > > >> > > > > > >> > > > > > >> > > The ContinuousFileReaderOperator reads and collects the records > > > from a > > > >> > > split in a loop. If the split size is large, then the loop will > > take > > > >> more > > > >> > > time, and then the mailbox executor won't have a chance to > process > > > the > > > >> > > checkpoint barrier. This leads to checkpoint timing out. > > > >> > > ContinuousFileReaderOperator could be improved to consume the > > > records > > > >> in > > > >> > a > > > >> > > mini batch, similar to Hudi's StreamReadOperator ( > > > >> > > https://issues.apache.org/jira/browse/HUDI-2485). > > > >> > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > This message was sent by Atlassian Jira > > > >> > > (v8.20.10#820010) > > > >> > > > > > >> > > > > >> > > > > > > > > > >