I understand the operator wouldn't be causing the checkpoint issue based on
your comments. I need to debug
further, but the symptoms are not showing up again for my Cx. I will close
this as not a problem. Thanks for the
detailed explanations.

On Wed, Dec 6, 2023 at 8:22 PM Darin Amos <darin.a...@instacart.com.invalid>
wrote:

> Just to confirm, I ran a test this morning using a large file with only a
> single file split that took over 10 minutes to process. My
> checkpoints still executed as expected every minute with minimal latency
> (sub second). Though I ran this with 1.15.4.
>
> What evidence do you have that this operator is causing the timeout?
> Typically this operator should handle the checkpoint barrier quickly and
> proceed with reading (ignoring alignment). In my early days I suspected a
> similar issue but it was other checkpoint alignment issues.
>
> Darin
>
> On Wed, Dec 6, 2023 at 4:13 AM Prabhu Joseph <prabhujose.ga...@gmail.com>
> wrote:
>
> > Thanks Darin for the details.
> >
> > Below is the problematic StreamTask, which has not processed the priority
> > event (checkpoint barrier) from the
> > mailbox, and has been there throughout the checkpoint timeout interval.
> And
> > it is reading and collecting all the records from the split.
> > Not sure why the executor was idle during that period, even though I
> could
> > see the upstream source task checkpoint acknowledgement
> > message was received by the JobManager. The Flink Version is 1.16.
> >
> > at
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.readAndCollectRecord(ContinuousFileReaderOperator.java:400)
> >
> > at
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.processRecord(ContinuousFileReaderOperator.java:364)
> >
> > at
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.lambda$new$0(ContinuousFileReaderOperator.java:240)
> >
> > at
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$$Lambda$1199/0x00007f3d2b07a858.run(Unknown
> > Source)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> >
> > at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:86)
> >
> > at
> >
> >
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:466)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1827/0x00007f3d2482c058.run(Unknown
> > Source)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:239)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:213)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$1825/0x00007f3d2482a900.run(Unknown
> > Source)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> >
> > at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:186)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:152)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:600)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:559)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$765/0x00007f3d326ee058.runDefaultAction(Unknown
> > Source)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> >
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> >
> > at
> >
> >
> org.apache.flink.runtime.taskmanager.Task$$Lambda$1637/0x00007f3d2a9b1470.run(Unknown
> > Source)
> >
> > at
> >
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> >
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> >
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> >
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> >
> > at java.lang.Thread.run(java.base@11.0.21/Thread.java:829)
> >
> > On Wed, Dec 6, 2023 at 8:11 AM Darin Amos <darin.a...@instacart.com
> > .invalid>
> > wrote:
> >
> > > I apologize, I was a little off with my description, it's been a while
> > > since I have looked at this code but I have refreshed myself.
> > >
> > > The line I referred to earlier was correct though. This operator only
> > > processes records in a file split while the operator is idle, meaning
> > there
> > > are no more incoming file splits. After every read it checks if there
> are
> > > any incoming file splits before continuing to read from the split. If
> > there
> > > is indeed a new inbound file split, the loop will exit and it will
> > re-queue
> > > itself to continue processing records later. You can see that here
> > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L361
> > > >
> > > .
> > >
> > > When the loop is interrupted by a checkpoint barrier,
> snapshotState(...)
> > is
> > > called and the reader grabs the state from the provided Format. In the
> > > normal case the state is simply the split offset (current progress
> > > indicator), in more complex scenarios you can create your own format
> > class
> > > and provide whatever serializable state you desire. In my case we store
> > > additional metadata about the progress of the reader.
> > >
> > > On state restore, the operator calls loadSplit and it will call
> *reopen*
> > on
> > > the format rather than open, passing the checkpoint state so you can
> > > continue from where you left off . You can see that here
> > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407-L411
> > > >
> > > .
> > >
> > > Cheers
> > >
> > > Darin
> > >
> > >
> > >
> > >
> > > On Tue, Dec 5, 2023 at 8:08 PM Darin Amos <darin.a...@instacart.com>
> > > wrote:
> > >
> > > > They way I understand this loop is that the
> > ContinuiousFileReaderOperator
> > > > only processes records in the background while the operator is idle,
> > i.e.
> > > > while it's not receiving any records.
> > > >
> > > > At the very bottom of that loop here
> > > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L358C5-L358C6
> > > >
> > > > it exits if the executor is no longer idle, i.e. there are incoming
> > > records.
> > > >
> > > > If you look here
> > > > <
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L407
> > > >,
> > > > the operator supports checkpointable input splits, meaning it'll save
> > > it's
> > > > place within a file split. This would only be possible if the reader
> > can
> > > be
> > > > interrupted in the middle of a split. I have written custom splits
> that
> > > do
> > > > this exactly.
> > > >
> > > > Darin
> > > >
> > > > On Tue, Dec 5, 2023 at 11:31 AM Prabhu Joseph <
> > > prabhujose.ga...@gmail.com>
> > > > wrote:
> > > >
> > > >> This is the loop - code reference
> > > >> <
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java#L346
> > > >> >,
> > > >> where it fetches all records from the split, and then only the
> > > >> MailboxProcessor gets control to check other mail. This loop was
> > > >> introduced
> > > >> here
> > > >> <
> > > >>
> > >
> >
> https://github.com/apache/flink/commit/1a69cb9fce629b0c458f5ea514d9ac8de008687f
> > > >> >
> > > >> .
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Dec 5, 2023 at 9:00 PM Darin Amos <darin.a...@instacart.com
> > > >> .invalid>
> > > >> wrote:
> > > >>
> > > >> > I thought for sure this was already the existing behavior with
> this
> > > >> > operator. Does it not check the mailbox executor after every
> record
> > > >> read?
> > > >> >
> > > >> > On Tue, Dec 5, 2023 at 6:48 AM Prabhu Joseph (Jira) <
> > j...@apache.org>
> > > >> > wrote:
> > > >> >
> > > >> > > Prabhu Joseph created FLINK-33753:
> > > >> > > -------------------------------------
> > > >> > >
> > > >> > >              Summary: ContinuousFileReaderOperator consume
> records
> > > as
> > > >> > mini
> > > >> > > batch
> > > >> > >                  Key: FLINK-33753
> > > >> > >                  URL:
> > > >> https://issues.apache.org/jira/browse/FLINK-33753
> > > >> > >              Project: Flink
> > > >> > >           Issue Type: Improvement
> > > >> > >     Affects Versions: 1.18.0
> > > >> > >             Reporter: Prabhu Joseph
> > > >> > >
> > > >> > >
> > > >> > > The ContinuousFileReaderOperator reads and collects the records
> > > from a
> > > >> > > split in a loop. If the split size is large, then the loop will
> > take
> > > >> more
> > > >> > > time, and then the mailbox executor won't have a chance to
> process
> > > the
> > > >> > > checkpoint barrier. This leads to checkpoint timing out.
> > > >> > > ContinuousFileReaderOperator could be improved to consume the
> > > records
> > > >> in
> > > >> > a
> > > >> > > mini batch, similar to Hudi's StreamReadOperator (
> > > >> > > https://issues.apache.org/jira/browse/HUDI-2485).
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > This message was sent by Atlassian Jira
> > > >> > > (v8.20.10#820010)
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to