Hi Xintong,

Thanks for the comments! Please see my reply inline.

On Thu, Sep 14, 2023 at 4:17 PM Xintong Song <tonysong...@gmail.com> wrote:

> Thanks for preparing this FLIP, Dong & Jinhao.
>
> I'm overall +1 to this proposal. This is helpful for some cases that we are
> dealing with.
> - Wencong and I are preparing guidelines for migrating from DataSet API to
> DataStream API. We noticed that users have to define a custom trigger in
> order to process all data within one window, or use a very large time
> window and mock a timestamp for each record expecting the Long.MAX_VALUE
> watermark to trigger the window. EndOfStreamWindow would help reduce that
> effort.
> - With Hybrid Shuffle mode, we also encountered the problem that the
> downstream tasks are pulled up but cannot perform any processing because
> the upstream tasks do not emit any data until all inputs are consumed. The
> output-on-eof operator attribute semantic should also be helpful in
> addressing this issue, as mentioned in future work of this FLIP.
>

Great! Thanks for providing the information.


>
> I have a few minor comments.
>
> A blocking input edge with pending records is same as a source with
> > isBacklog=true when an operator determines its RecordAttributes for
> > downstream nodes.
> >
> - It is not very clear to me what this sentence means. In particular, how
> does it relate to the proposed changes in this FLIP?
>

This is needed for FLIP-331 to work with FLIP-327. More specifically, once
both FLIP-327 and FLIP-331 are accepted, we need a way to determine the
backlog status for input with blocking edge type.

Thanks for catching this. I have added this explanation in the FLIP to
hopefully reduce the confusion.


> - IIUC, EndOfStrearmWindows is a new type of window that can be used in all
> cases where it takes a WindowAssigner. It is not limited to coGroup and
> aggregate. It might be better to make that more explicit in the FLIP.
>

Good point. I agree it is useful to make that explicit.

I have added a section "Analysis of APIs affected by this FLIP" in the FLIP
to clarify this.


Best,
Dong


>
> WDYT?
>
> Best,
>
> Xintong
>
>
>
> On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote:
>
> > Dear Dong,
> >
> > I have thoroughly reviewed the proposal for FLIP-331 and believe it would
> > be
> > a valuable addition to Flink. However, I do have a few questions that I
> > would
> > like to discuss:
> >
> >
> > 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> > TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> > supports WindowedStream and AllWindowedStream to process all records
> > belonging to a key in a 'global' window under both STREAMING and BATCH
> > runtime execution mode.
> >
> >
> > However, besides coGroup and keyBy().aggregate(), other operators on
> > WindowedStream and AllWindowedStream, such as join/reduce, etc, currently
> > are still implemented based on WindowOperator.
> >
> >
> > In fact, these operators can also be implemented without using
> > WindowOperator
> > to prevent additional WindowAssigner#assignWindows or
> > triggerContext#onElement
> > invocation cost. Will there be plans to support these operators in the
> > future?
> >
> >
> > 2. When using EndOfStreamWindows, upstream operators no longer support
> > checkpointing. This limit may be too strict, especially when dealing with
> > bounded data in streaming runtime execution mode, where checkpointing
> > can still be useful.
> >
> > 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> > true, the
> > operator as well as its upstream operators will be executed in 'batch
> > mode' with
> > checkpointing disabled. I would like to understand the specific
> > implications of this
> > 'batch mode' and if there are any other changes associated with it?
> >
> > Additionally, I am curious to know if this 'batch mode' conflicts with
> the
> > 'mix mode'
> >
> > described in FLIP-327. While the coGroup and keyBy().aggregate()
> operators
> > on
> > EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
> > true,
> > indicating support for the 'mixed mode', they also have isOutputOnEOF set
> > to true,
> > which suggests that the upstream operators should be executed in 'batch
> > mode'.
> > Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate
> > any
> > clarification on this matter.
> >
> > Thank you for taking the time to consider my feedback. I eagerly await
> > your response.
> >
> > Best regards,
> >
> > Wencong Liu
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> > >Hi all,
> > >
> > >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331:
> Support
> > >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
> > >deployment. The design doc can be found at
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> > >.
> > >
> > >This FLIP introduces isOutputOnEOF operator attribute that JobManager
> can
> > >use to optimize task deployment and resource utilization. In addition,
> it
> > >also adds EndOfStreamWindows that can be used with the DataStream APIs
> > >(e.g. cogroup, aggregate) to significantly increase throughput and
> reduce
> > >resource utilization.
> > >
> > >We would greatly appreciate any comment or feedback you may have on this
> > >proposal.
> > >
> > >Cheers,
> > >Dong
> >
>

Reply via email to