Hi, Dong,
Thanks for your efforts.

+1 to this proposal,
I believe this will improve the performance in some mixture circumstances
of bounded and unbounded workloads.

Best,
Yuxin


Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道:

> Thanks for addressing my comments, Dong.
>
> LGTM.
>
> Best,
>
> Xintong
>
>
>
> On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com> wrote:
>
> > Hi Dong & Jinhao,
> >
> > Thanks for your clarification! +1
> >
> > Best regards,
> > Wencong
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote:
> > >Hi Wencong,
> > >
> > >Thanks for your comments! Please see my reply inline.
> > >
> > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com>
> > wrote:
> > >
> > >> Dear Dong,
> > >>
> > >> I have thoroughly reviewed the proposal for FLIP-331 and believe it
> > would
> > >> be
> > >> a valuable addition to Flink. However, I do have a few questions that
> I
> > >> would
> > >> like to discuss:
> > >>
> > >>
> > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> > >> supports WindowedStream and AllWindowedStream to process all records
> > >> belonging to a key in a 'global' window under both STREAMING and BATCH
> > >> runtime execution mode.
> > >>
> > >>
> > >> However, besides coGroup and keyBy().aggregate(), other operators on
> > >> WindowedStream and AllWindowedStream, such as join/reduce, etc,
> > currently
> > >> are still implemented based on WindowOperator.
> > >>
> > >>
> > >> In fact, these operators can also be implemented without using
> > >> WindowOperator
> > >> to prevent additional WindowAssigner#assignWindows or
> > >> triggerContext#onElement
> > >> invocation cost. Will there be plans to support these operators in the
> > >> future?
> > >>
> > >
> > >You are right. The EndOfStreamWindows proposed in this FLIP can
> > potentially
> > >benefit any DataStream API that takes WindowAssigner as parameters. This
> > >can involve more operations than aggregate and co-group.
> > >
> > >And yes, we have plans to take advantage of this API to optimize these
> > >operators in the future. This FLIP focuses on the introduction of the
> > >public APIs and uses aggregate/co-group as the first two examples to
> > >show-case the performance benefits.
> > >
> > >I have added a "Analysis of APIs affected by this FLIP" to list the
> > >DataStream APIs that can benefit from this FLIP. Would this answer your
> > >question?
> > >
> > >
> > >>
> > >> 2. When using EndOfStreamWindows, upstream operators no longer support
> > >> checkpointing. This limit may be too strict, especially when dealing
> > with
> > >> bounded data in streaming runtime execution mode, where checkpointing
> > >> can still be useful.
> > >>
> > >
> > >I am not sure we have a good way to support checkpoint while still
> > >achieving the performance improves targeted by this FLIP.
> > >
> > >The issue here is that if we support checkpoint, then we can not take
> > >advantage of algorithms (e.g. sorting inputs using ExternalSorter) that
> > are
> > >not compatible with checkpoints. These algorithms (which do not support
> > >checkpoint) are the main reasons why batch mode currently significantly
> > >outperforms stream mode in doing aggregation/cogroup etc.
> > >
> > >In most cases where the user does not care about processing latency, it
> is
> > >generally preferred to use batch mode to perform aggregation operations
> > >(which should be 10X faster than the existing stream mode performance)
> > >instead of doing checkpoint.
> > >
> > >Also note that we can still let operators perform failover in the same
> as
> > >the existing batch mode execution, where the intermediate results
> > (produced
> > >by one operator) can be persisted in shuffle service and downstream
> > >operators can re-read those data from shuffle service after failover.
> > >
> > >
> > >>
> > >> 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> > >> true, the
> > >> operator as well as its upstream operators will be executed in 'batch
> > >> mode' with
> > >> checkpointing disabled. I would like to understand the specific
> > >> implications of this
> > >> 'batch mode' and if there are any other changes associated with it?
> > >
> > >
> > >Good point. We should explicitly mention the changes. I have updated the
> > >FLIP to clarify this.
> > >
> > >More specifically, the checkpoint is disabled when these operators are
> > >running, such that these operators can do operations not compatible with
> > >checkpoints (e.g. sorting inputs). And operators should re-read the data
> > >from the upstream blocking edge or sources after failover.
> > >
> > >Would this answer your question?
> > >
> > >
> > >>
> > >> Additionally, I am curious to know if this 'batch mode' conflicts with
> > the
> > >> 'mix mode'
> > >>
> > >> described in FLIP-327. While the coGroup and keyBy().aggregate()
> > operators
> > >> on
> > >> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set
> to
> > >> true,
> > >> indicating support for the 'mixed mode', they also have isOutputOnEOF
> > set
> > >> to true,
> > >> which suggests that the upstream operators should be executed in
> 'batch
> > >> mode'.
> > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would
> > appreciate
> > >> any
> > >> clarification on this matter.
> > >>
> > >
> > >Good question. I think `isInternalSorterSupported` and `isOutputOnEOF`
> do
> > >not conflict with each other.
> > >
> > >It might be useful to recap the semantics of these attributes:
> > >- `isOutputOnEOF` describes whether an operator outputs data only after
> > all
> > >its input has been ingested by the operator.
> > >-  `isInternalSorterSupported` describes whether an operator will use an
> > >internal sorter when it does not need to do checkpoints.
> > >
> > >And we can further derive that these semantics of two attributes do not
> > >conflict with each other. And we can have valid operators with any of
> the
> > >four combinations of true/false values for these two attributes.
> > >
> > >In the specific example you described above, let's say isOutputOnEOF =
> > true
> > >and isInternalSorterSupported = true. According to FLIP-331, the
> > checkpoint
> > >is disabled when this operator is running. And according to FLIP-327,
> this
> > >operator will sort data internally, which means that Flink runtime
> should
> > >not additionally sort its inputs. So overall the Flink job can comply
> with
> > >the semantics of these two attributes consistently.
> > >
> > >
> > >Thanks again for taking time to review this FLIP. Please let me know
> what
> > >you think.
> > >
> > >Best regards,
> > >Dong
> > >
> > >
> > >> Thank you for taking the time to consider my feedback. I eagerly await
> > >> your response.
> > >>
> > >> Best regards,
> > >>
> > >> Wencong Liu
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> > >> >Hi all,
> > >> >
> > >> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331:
> > Support
> > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize
> > task
> > >> >deployment. The design doc can be found at
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> > >> >.
> > >> >
> > >> >This FLIP introduces isOutputOnEOF operator attribute that JobManager
> > can
> > >> >use to optimize task deployment and resource utilization. In
> addition,
> > it
> > >> >also adds EndOfStreamWindows that can be used with the DataStream
> APIs
> > >> >(e.g. cogroup, aggregate) to significantly increase throughput and
> > reduce
> > >> >resource utilization.
> > >> >
> > >> >We would greatly appreciate any comment or feedback you may have on
> > this
> > >> >proposal.
> > >> >
> > >> >Cheers,
> > >> >Dong
> > >>
> >
>

Reply via email to