Hi, Dong, Thanks for your efforts. +1 to this proposal, I believe this will improve the performance in some mixture circumstances of bounded and unbounded workloads.
Best, Yuxin Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道: > Thanks for addressing my comments, Dong. > > LGTM. > > Best, > > Xintong > > > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com> wrote: > > > Hi Dong & Jinhao, > > > > Thanks for your clarification! +1 > > > > Best regards, > > Wencong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote: > > >Hi Wencong, > > > > > >Thanks for your comments! Please see my reply inline. > > > > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> > > wrote: > > > > > >> Dear Dong, > > >> > > >> I have thoroughly reviewed the proposal for FLIP-331 and believe it > > would > > >> be > > >> a valuable addition to Flink. However, I do have a few questions that > I > > >> would > > >> like to discuss: > > >> > > >> > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally > > >> supports WindowedStream and AllWindowedStream to process all records > > >> belonging to a key in a 'global' window under both STREAMING and BATCH > > >> runtime execution mode. > > >> > > >> > > >> However, besides coGroup and keyBy().aggregate(), other operators on > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc, > > currently > > >> are still implemented based on WindowOperator. > > >> > > >> > > >> In fact, these operators can also be implemented without using > > >> WindowOperator > > >> to prevent additional WindowAssigner#assignWindows or > > >> triggerContext#onElement > > >> invocation cost. Will there be plans to support these operators in the > > >> future? > > >> > > > > > >You are right. The EndOfStreamWindows proposed in this FLIP can > > potentially > > >benefit any DataStream API that takes WindowAssigner as parameters. This > > >can involve more operations than aggregate and co-group. > > > > > >And yes, we have plans to take advantage of this API to optimize these > > >operators in the future. This FLIP focuses on the introduction of the > > >public APIs and uses aggregate/co-group as the first two examples to > > >show-case the performance benefits. > > > > > >I have added a "Analysis of APIs affected by this FLIP" to list the > > >DataStream APIs that can benefit from this FLIP. Would this answer your > > >question? > > > > > > > > >> > > >> 2. When using EndOfStreamWindows, upstream operators no longer support > > >> checkpointing. This limit may be too strict, especially when dealing > > with > > >> bounded data in streaming runtime execution mode, where checkpointing > > >> can still be useful. > > >> > > > > > >I am not sure we have a good way to support checkpoint while still > > >achieving the performance improves targeted by this FLIP. > > > > > >The issue here is that if we support checkpoint, then we can not take > > >advantage of algorithms (e.g. sorting inputs using ExternalSorter) that > > are > > >not compatible with checkpoints. These algorithms (which do not support > > >checkpoint) are the main reasons why batch mode currently significantly > > >outperforms stream mode in doing aggregation/cogroup etc. > > > > > >In most cases where the user does not care about processing latency, it > is > > >generally preferred to use batch mode to perform aggregation operations > > >(which should be 10X faster than the existing stream mode performance) > > >instead of doing checkpoint. > > > > > >Also note that we can still let operators perform failover in the same > as > > >the existing batch mode execution, where the intermediate results > > (produced > > >by one operator) can be persisted in shuffle service and downstream > > >operators can re-read those data from shuffle service after failover. > > > > > > > > >> > > >> 3. The proposal mentions that if a transformation has isOutputOnEOF == > > >> true, the > > >> operator as well as its upstream operators will be executed in 'batch > > >> mode' with > > >> checkpointing disabled. I would like to understand the specific > > >> implications of this > > >> 'batch mode' and if there are any other changes associated with it? > > > > > > > > >Good point. We should explicitly mention the changes. I have updated the > > >FLIP to clarify this. > > > > > >More specifically, the checkpoint is disabled when these operators are > > >running, such that these operators can do operations not compatible with > > >checkpoints (e.g. sorting inputs). And operators should re-read the data > > >from the upstream blocking edge or sources after failover. > > > > > >Would this answer your question? > > > > > > > > >> > > >> Additionally, I am curious to know if this 'batch mode' conflicts with > > the > > >> 'mix mode' > > >> > > >> described in FLIP-327. While the coGroup and keyBy().aggregate() > > operators > > >> on > > >> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set > to > > >> true, > > >> indicating support for the 'mixed mode', they also have isOutputOnEOF > > set > > >> to true, > > >> which suggests that the upstream operators should be executed in > 'batch > > >> mode'. > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would > > appreciate > > >> any > > >> clarification on this matter. > > >> > > > > > >Good question. I think `isInternalSorterSupported` and `isOutputOnEOF` > do > > >not conflict with each other. > > > > > >It might be useful to recap the semantics of these attributes: > > >- `isOutputOnEOF` describes whether an operator outputs data only after > > all > > >its input has been ingested by the operator. > > >- `isInternalSorterSupported` describes whether an operator will use an > > >internal sorter when it does not need to do checkpoints. > > > > > >And we can further derive that these semantics of two attributes do not > > >conflict with each other. And we can have valid operators with any of > the > > >four combinations of true/false values for these two attributes. > > > > > >In the specific example you described above, let's say isOutputOnEOF = > > true > > >and isInternalSorterSupported = true. According to FLIP-331, the > > checkpoint > > >is disabled when this operator is running. And according to FLIP-327, > this > > >operator will sort data internally, which means that Flink runtime > should > > >not additionally sort its inputs. So overall the Flink job can comply > with > > >the semantics of these two attributes consistently. > > > > > > > > >Thanks again for taking time to review this FLIP. Please let me know > what > > >you think. > > > > > >Best regards, > > >Dong > > > > > > > > >> Thank you for taking the time to consider my feedback. I eagerly await > > >> your response. > > >> > > >> Best regards, > > >> > > >> Wencong Liu > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote: > > >> >Hi all, > > >> > > > >> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: > > Support > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize > > task > > >> >deployment. The design doc can be found at > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment > > >> >. > > >> > > > >> >This FLIP introduces isOutputOnEOF operator attribute that JobManager > > can > > >> >use to optimize task deployment and resource utilization. In > addition, > > it > > >> >also adds EndOfStreamWindows that can be used with the DataStream > APIs > > >> >(e.g. cogroup, aggregate) to significantly increase throughput and > > reduce > > >> >resource utilization. > > >> > > > >> >We would greatly appreciate any comment or feedback you may have on > > this > > >> >proposal. > > >> > > > >> >Cheers, > > >> >Dong > > >> > > >