Hi Dong,

Thanks for your reply!

Best regards,
Jing

On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for the comments. Please see my reply inline.
>
> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
>
> > Hi Dong,
> >
> > Thanks for the clarification. Now it is clear for me. I got additional
> noob
> > questions wrt the internal sorter.
> >
> > 1. when to call setter to set the internalSorterSupported to be true?
> >
>
> Developer of the operator class (i.e. those classes which implements
> `StreamOperator`) should override the `#getOperatorAttributes()` API to set
> internalSorterSupported to true, if he/she decides to sort records
> internally in the operator.
>
>
> > 2
> > *"For those operators whose throughput can be considerably improved with
> an
> > internal sorter, update it to take advantage of the internal sorter when
> > its input has isBacklog=true.*
> > *Typically, operators that involve aggregation operation (e.g. join,
> > cogroup, aggregate) on keyed inputs can benefit from using an internal
> > sorter."*
> >
> > *"The operator that performs CoGroup operation will instantiate two
> > internal sorter to sorts records from its two inputs separately. Then it
> > can pull the sorted records from these two sorters. This can be done
> > without wrapping input records with TaggedUnion<...>. In comparison, the
> > existing DataStream#coGroup needs to wrap input records with
> > TaggedUnion<...> before sorting them using one external sorter, which
> > introduces higher overhead."*
> >
> > According to the performance test, it seems that internal sorter has
> better
> > performance than external sorter. Is it possible to make those operators
> > that can benefit from it use internal sorter by default?
> >
>
> Yes, it is possible. After this FLIP is done, users can use
> DataStream#coGroup with EndOfStreamWindows as the window assigner to
> co-group two streams in effectively the batch manner. An operator that uses
> an internal sorter will be used to perform the co-group operation. There is
> no need for users of the DataStream API to explicitly know or set the
> internal sorter in anyway.
>
> In the future, we plan to incrementally optimize other aggregation
> operation (e.g. aggregate) on the DataStream API when EndOfStreamWindows is
> used as the window assigner.
>
> Best,
> Dong
>
>
> >
> > Best regards,
> > Jing
> >
> >
> > On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thank you for the comments! Please see my reply inline.
> > >
> > > On Tue, Jul 11, 2023 at 5:41 AM Jing Ge <j...@ververica.com.invalid>
> > > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > Thanks for the proposal! The FLIP is already in good shape. I got
> some
> > > NIT
> > > > questions.
> > > >
> > > > 1. It is a little bit weird to write the hint right after the
> > motivation
> > > > that some features have been moved to FLIP-331, because at that time,
> > > > readers don't know the context about what features does it mean. I
> > would
> > > > suggest moving the note to the beginning of "Public interfaces"
> > sections.
> > > >
> > >
> > > Given that the reviewer who commented on this email thread before I
> > > refactored the FLIP (i.e. Piotr) has read FLP-331, I think it is
> simpler
> > to
> > > just remove any mention of FLIP-331. I have updated the FLIP
> accordingly.
> > >
> > >
> > > > 2. It is also a little bit weird to describe all behaviour changes at
> > > first
> > > > but only focus on one single feature, i.e. how to implement
> > > > internalSorterSupported. TBH, I was lost while I was reading the
> Public
> > > > interfaces. Maybe change the FLIP title? Another option could be to
> > > write a
> > > > short summary of all features and point out that this FLIP will only
> > > focus
> > > > on the internalSorterSupported feature. Others could be found in
> > > FLIP-331.
> > > > WDYT?
> > > >
> > >
> > > Conceptually, the purpose of this FLIP is to allow a stream mode job to
> > run
> > > parts of the topology in batch mode so that it can apply
> > > optimizations/computations that can not be used together with
> > checkpointing
> > > (and thus not usable in stream mode). Although internal sorter is the
> > only
> > > optimization immediately supported in this FLIP, this FLIP lays the
> > > foundation to support other optimizations in the future, such as using
> > GPU
> > > to process a bounded stream of records.
> > >
> > > Therefore, I find it better to keep the current title rather than
> > limiting
> > > the scope to internal sorter. What do you think?
> > >
> > >
> > >
> > > > 3. There should be a typo at 4) Checkpoint and failover strategy ->
> > Mixed
> > > > mode ->
> > > >
> > > >    - If any task fails when isBacklog=false true, this task is
> > restarted
> > > to
> > > >    re-process its input from the beginning.
> > > >
> > > >
> > > Thank you for catching this issue. It is fixed now.
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > >
> > > >
> > > > Best regards
> > > > Jing
> > > >
> > > >
> > > > On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for your comments! Please see my reply inline.
> > > > >
> > > > > On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Dong,
> > > > > >
> > > > > > I have a couple of questions.
> > > > > >
> > > > > > Could you explain why those properties
> > > > > >
> > > > > >     @Nullable private Boolean isOutputOnEOF = null;
> > > > > >     @Nullable private Boolean isOutputOnCheckpoint = null;
> > > > > >     @Nullable private Boolean isInternalSorterSupported = null;
> > > > > >
> > > > > > must be `@Nullable`, instead of having the default value set to
> > > > `false`?
> > > > > >
> > > > >
> > > > > By initializing these private variables in
> OperatorAttributesBuilder
> > as
> > > > > null, we can implement `OperatorAttributesBuilder#build()` in such
> a
> > > way
> > > > > that it can print DEBUG level logging to say "isOutputOnCheckpoint
> is
> > > not
> > > > > explicitly set". This can help user/SRE debug performance issues
> (or
> > > lack
> > > > > of the expected optimization) due to operators not explicitly
> setting
> > > the
> > > > > right operator attribute.
> > > > >
> > > > > For example, we might want a job to always use the longer
> > checkpointing
> > > > > interval (i.e. execution.checkpointing.interval-during-backlog) if
> > all
> > > > > running operators have isOutputOnCheckpoint==false, and use the
> short
> > > > > checkpointing interval otherwise. If a user has explicitly
> configured
> > > the
> > > > > execution.checkpointing.interval-during-backlog but the two-phase
> > > commit
> > > > > sink library has not been upgraded to set
> isOutputOnCheckpoint=true,
> > > then
> > > > > the job will end up using the long checkpointing interval, and it
> > will
> > > be
> > > > > useful to figure out what is going wrong in this case by checking
> the
> > > > log.
> > > > >
> > > > > Note that the default value of these fields of the
> OperatorAttributes
> > > > > instance built by OperatorAttributesBuilder will still be false.
> The
> > > > > following is mentioned in the Java doc of
> > > > > `OperatorAttributesBuilder#build()`:
> > > > >
> > > > >  /**
> > > > >   * If any operator attribute is null, we will log it at DEBUG
> level
> > > and
> > > > > use the following
> > > > >   * default values.
> > > > >   * - isOutputOnEOF defaults to false
> > > > >   * - isOutputOnCheckpoint defaults to false
> > > > >   * - isInternalSorterSupported defaults to false
> > > > >   */
> > > > >
> > > > >
> > > > > >
> > > > > > Second question, have you thought about cases where someone is
> > > > > > either bootstrapping from a streaming source like Kafka
> > > > > > or simply trying to catch up after a long period of downtime in a
> > > > purely
> > > > > > streaming job? Generally speaking a cases where
> > > > > > user doesn't care about latency in the catch up phase, regardless
> > if
> > > > the
> > > > > > source is bounded or unbounded, but wants to process
> > > > > > the data as fast as possible, and then switch dynamically to real
> > > time
> > > > > > processing?
> > > > > >
> > > > >
> > > > > Yes, I have thought about this. We should allow this job to
> > effectively
> > > > run
> > > > > in batch mode when the job is in the catch-up phase. FLIP-327 is
> > > actually
> > > > > an important step toward addressing this use-case.
> > > > >
> > > > > In order to address the above use-case, all we need is a way for
> > source
> > > > > operator (e.g. Kafka) to tell Flink runtime (via
> IsProcessingBacklog)
> > > > > whether it is in the catch-up phase.
> > > > >
> > > > > Since every Kafka message has event-timestamp, we can allow users
> to
> > > > > specify a job-level config such as backlog-watermark-lag-threshold,
> > and
> > > > > consider a Kafka Source to have IsProcessingBacklog=true if
> > > system_time -
> > > > > watermark > backlog-watermark-lag-threshold. This effectively
> allows
> > us
> > > > to
> > > > > determine whether Kafka is in the catch up phase.
> > > > >
> > > > > Once we have this capability (I plan to work on this in FLIP-328),
> we
> > > can
> > > > > directly use the features proposed in FLIP-325 and FLIP-327 to
> > optimize
> > > > the
> > > > > above use-case.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > > niedz., 2 lip 2023 o 16:15 Dong Lin <lindon...@gmail.com>
> > > napisał(a):
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I am opening this thread to discuss FLIP-327: Support
> > stream-batch
> > > > > > unified
> > > > > > > operator to improve job throughput when processing backlog
> data.
> > > The
> > > > > > design
> > > > > > > doc can be found at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-327%3A+Support+stream-batch+unified+operator+to+improve+job+throughput+when+processing+backlog+data
> > > > > > > .
> > > > > > >
> > > > > > > This FLIP enables a Flink job to initially operate in batch
> mode,
> > > > > > achieving
> > > > > > > high throughput while processing records that do not require
> low
> > > > > > processing
> > > > > > > latency. Subsequently, the job can seamlessly transition to
> > stream
> > > > mode
> > > > > > for
> > > > > > > processing real-time records with low latency. Importantly, the
> > > same
> > > > > > state
> > > > > > > can be utilized before and after this mode switch, making it
> > > > > particularly
> > > > > > > valuable when users wish to bootstrap the job's state using
> > > > historical
> > > > > > > data.
> > > > > > >
> > > > > > > We would greatly appreciate any comments or feedback you may
> have
> > > on
> > > > > this
> > > > > > > proposal.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Dong
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to