Hi Xintong, Thanks for the comments! Please see my reply inline.
On Thu, Sep 14, 2023 at 4:17 PM Xintong Song <tonysong...@gmail.com> wrote: > Thanks for preparing this FLIP, Dong & Jinhao. > > I'm overall +1 to this proposal. This is helpful for some cases that we are > dealing with. > - Wencong and I are preparing guidelines for migrating from DataSet API to > DataStream API. We noticed that users have to define a custom trigger in > order to process all data within one window, or use a very large time > window and mock a timestamp for each record expecting the Long.MAX_VALUE > watermark to trigger the window. EndOfStreamWindow would help reduce that > effort. > - With Hybrid Shuffle mode, we also encountered the problem that the > downstream tasks are pulled up but cannot perform any processing because > the upstream tasks do not emit any data until all inputs are consumed. The > output-on-eof operator attribute semantic should also be helpful in > addressing this issue, as mentioned in future work of this FLIP. > Great! Thanks for providing the information. > > I have a few minor comments. > > A blocking input edge with pending records is same as a source with > > isBacklog=true when an operator determines its RecordAttributes for > > downstream nodes. > > > - It is not very clear to me what this sentence means. In particular, how > does it relate to the proposed changes in this FLIP? > This is needed for FLIP-331 to work with FLIP-327. More specifically, once both FLIP-327 and FLIP-331 are accepted, we need a way to determine the backlog status for input with blocking edge type. Thanks for catching this. I have added this explanation in the FLIP to hopefully reduce the confusion. > - IIUC, EndOfStrearmWindows is a new type of window that can be used in all > cases where it takes a WindowAssigner. It is not limited to coGroup and > aggregate. It might be better to make that more explicit in the FLIP. > Good point. I agree it is useful to make that explicit. I have added a section "Analysis of APIs affected by this FLIP" in the FLIP to clarify this. Best, Dong > > WDYT? > > Best, > > Xintong > > > > On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote: > > > Dear Dong, > > > > I have thoroughly reviewed the proposal for FLIP-331 and believe it would > > be > > a valuable addition to Flink. However, I do have a few questions that I > > would > > like to discuss: > > > > > > 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by > > TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally > > supports WindowedStream and AllWindowedStream to process all records > > belonging to a key in a 'global' window under both STREAMING and BATCH > > runtime execution mode. > > > > > > However, besides coGroup and keyBy().aggregate(), other operators on > > WindowedStream and AllWindowedStream, such as join/reduce, etc, currently > > are still implemented based on WindowOperator. > > > > > > In fact, these operators can also be implemented without using > > WindowOperator > > to prevent additional WindowAssigner#assignWindows or > > triggerContext#onElement > > invocation cost. Will there be plans to support these operators in the > > future? > > > > > > 2. When using EndOfStreamWindows, upstream operators no longer support > > checkpointing. This limit may be too strict, especially when dealing with > > bounded data in streaming runtime execution mode, where checkpointing > > can still be useful. > > > > 3. The proposal mentions that if a transformation has isOutputOnEOF == > > true, the > > operator as well as its upstream operators will be executed in 'batch > > mode' with > > checkpointing disabled. I would like to understand the specific > > implications of this > > 'batch mode' and if there are any other changes associated with it? > > > > Additionally, I am curious to know if this 'batch mode' conflicts with > the > > 'mix mode' > > > > described in FLIP-327. While the coGroup and keyBy().aggregate() > operators > > on > > EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to > > true, > > indicating support for the 'mixed mode', they also have isOutputOnEOF set > > to true, > > which suggests that the upstream operators should be executed in 'batch > > mode'. > > Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate > > any > > clarification on this matter. > > > > Thank you for taking the time to consider my feedback. I eagerly await > > your response. > > > > Best regards, > > > > Wencong Liu > > > > > > > > > > > > > > > > > > > > > > > > At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote: > > >Hi all, > > > > > >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: > Support > > >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task > > >deployment. The design doc can be found at > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment > > >. > > > > > >This FLIP introduces isOutputOnEOF operator attribute that JobManager > can > > >use to optimize task deployment and resource utilization. In addition, > it > > >also adds EndOfStreamWindows that can be used with the DataStream APIs > > >(e.g. cogroup, aggregate) to significantly increase throughput and > reduce > > >resource utilization. > > > > > >We would greatly appreciate any comment or feedback you may have on this > > >proposal. > > > > > >Cheers, > > >Dong > > >