Thanks for preparing this FLIP, Dong & Jinhao.

I'm overall +1 to this proposal. This is helpful for some cases that we are
dealing with.
- Wencong and I are preparing guidelines for migrating from DataSet API to
DataStream API. We noticed that users have to define a custom trigger in
order to process all data within one window, or use a very large time
window and mock a timestamp for each record expecting the Long.MAX_VALUE
watermark to trigger the window. EndOfStreamWindow would help reduce that
effort.
- With Hybrid Shuffle mode, we also encountered the problem that the
downstream tasks are pulled up but cannot perform any processing because
the upstream tasks do not emit any data until all inputs are consumed. The
output-on-eof operator attribute semantic should also be helpful in
addressing this issue, as mentioned in future work of this FLIP.

I have a few minor comments.

A blocking input edge with pending records is same as a source with
> isBacklog=true when an operator determines its RecordAttributes for
> downstream nodes.
>
- It is not very clear to me what this sentence means. In particular, how
does it relate to the proposed changes in this FLIP?

- IIUC, EndOfStrearmWindows is a new type of window that can be used in all
cases where it takes a WindowAssigner. It is not limited to coGroup and
aggregate. It might be better to make that more explicit in the FLIP.

WDYT?

Best,

Xintong



On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote:

> Dear Dong,
>
> I have thoroughly reviewed the proposal for FLIP-331 and believe it would
> be
> a valuable addition to Flink. However, I do have a few questions that I
> would
> like to discuss:
>
>
> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
> supports WindowedStream and AllWindowedStream to process all records
> belonging to a key in a 'global' window under both STREAMING and BATCH
> runtime execution mode.
>
>
> However, besides coGroup and keyBy().aggregate(), other operators on
> WindowedStream and AllWindowedStream, such as join/reduce, etc, currently
> are still implemented based on WindowOperator.
>
>
> In fact, these operators can also be implemented without using
> WindowOperator
> to prevent additional WindowAssigner#assignWindows or
> triggerContext#onElement
> invocation cost. Will there be plans to support these operators in the
> future?
>
>
> 2. When using EndOfStreamWindows, upstream operators no longer support
> checkpointing. This limit may be too strict, especially when dealing with
> bounded data in streaming runtime execution mode, where checkpointing
> can still be useful.
>
> 3. The proposal mentions that if a transformation has isOutputOnEOF ==
> true, the
> operator as well as its upstream operators will be executed in 'batch
> mode' with
> checkpointing disabled. I would like to understand the specific
> implications of this
> 'batch mode' and if there are any other changes associated with it?
>
> Additionally, I am curious to know if this 'batch mode' conflicts with the
> 'mix mode'
>
> described in FLIP-327. While the coGroup and keyBy().aggregate() operators
> on
> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
> true,
> indicating support for the 'mixed mode', they also have isOutputOnEOF set
> to true,
> which suggests that the upstream operators should be executed in 'batch
> mode'.
> Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate
> any
> clarification on this matter.
>
> Thank you for taking the time to consider my feedback. I eagerly await
> your response.
>
> Best regards,
>
> Wencong Liu
>
>
>
>
>
>
>
>
>
>
>
> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
> >Hi all,
> >
> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
> >deployment. The design doc can be found at
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
> >.
> >
> >This FLIP introduces isOutputOnEOF operator attribute that JobManager can
> >use to optimize task deployment and resource utilization. In addition, it
> >also adds EndOfStreamWindows that can be used with the DataStream APIs
> >(e.g. cogroup, aggregate) to significantly increase throughput and reduce
> >resource utilization.
> >
> >We would greatly appreciate any comment or feedback you may have on this
> >proposal.
> >
> >Cheers,
> >Dong
>

Reply via email to