Hi Dong & Jinhao, Thanks for your clarification! +1
Best regards, Wencong At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote: >Hi Wencong, > >Thanks for your comments! Please see my reply inline. > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote: > >> Dear Dong, >> >> I have thoroughly reviewed the proposal for FLIP-331 and believe it would >> be >> a valuable addition to Flink. However, I do have a few questions that I >> would >> like to discuss: >> >> >> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally >> supports WindowedStream and AllWindowedStream to process all records >> belonging to a key in a 'global' window under both STREAMING and BATCH >> runtime execution mode. >> >> >> However, besides coGroup and keyBy().aggregate(), other operators on >> WindowedStream and AllWindowedStream, such as join/reduce, etc, currently >> are still implemented based on WindowOperator. >> >> >> In fact, these operators can also be implemented without using >> WindowOperator >> to prevent additional WindowAssigner#assignWindows or >> triggerContext#onElement >> invocation cost. Will there be plans to support these operators in the >> future? >> > >You are right. The EndOfStreamWindows proposed in this FLIP can potentially >benefit any DataStream API that takes WindowAssigner as parameters. This >can involve more operations than aggregate and co-group. > >And yes, we have plans to take advantage of this API to optimize these >operators in the future. This FLIP focuses on the introduction of the >public APIs and uses aggregate/co-group as the first two examples to >show-case the performance benefits. > >I have added a "Analysis of APIs affected by this FLIP" to list the >DataStream APIs that can benefit from this FLIP. Would this answer your >question? > > >> >> 2. When using EndOfStreamWindows, upstream operators no longer support >> checkpointing. This limit may be too strict, especially when dealing with >> bounded data in streaming runtime execution mode, where checkpointing >> can still be useful. >> > >I am not sure we have a good way to support checkpoint while still >achieving the performance improves targeted by this FLIP. > >The issue here is that if we support checkpoint, then we can not take >advantage of algorithms (e.g. sorting inputs using ExternalSorter) that are >not compatible with checkpoints. These algorithms (which do not support >checkpoint) are the main reasons why batch mode currently significantly >outperforms stream mode in doing aggregation/cogroup etc. > >In most cases where the user does not care about processing latency, it is >generally preferred to use batch mode to perform aggregation operations >(which should be 10X faster than the existing stream mode performance) >instead of doing checkpoint. > >Also note that we can still let operators perform failover in the same as >the existing batch mode execution, where the intermediate results (produced >by one operator) can be persisted in shuffle service and downstream >operators can re-read those data from shuffle service after failover. > > >> >> 3. The proposal mentions that if a transformation has isOutputOnEOF == >> true, the >> operator as well as its upstream operators will be executed in 'batch >> mode' with >> checkpointing disabled. I would like to understand the specific >> implications of this >> 'batch mode' and if there are any other changes associated with it? > > >Good point. We should explicitly mention the changes. I have updated the >FLIP to clarify this. > >More specifically, the checkpoint is disabled when these operators are >running, such that these operators can do operations not compatible with >checkpoints (e.g. sorting inputs). And operators should re-read the data >from the upstream blocking edge or sources after failover. > >Would this answer your question? > > >> >> Additionally, I am curious to know if this 'batch mode' conflicts with the >> 'mix mode' >> >> described in FLIP-327. While the coGroup and keyBy().aggregate() operators >> on >> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to >> true, >> indicating support for the 'mixed mode', they also have isOutputOnEOF set >> to true, >> which suggests that the upstream operators should be executed in 'batch >> mode'. >> Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate >> any >> clarification on this matter. >> > >Good question. I think `isInternalSorterSupported` and `isOutputOnEOF` do >not conflict with each other. > >It might be useful to recap the semantics of these attributes: >- `isOutputOnEOF` describes whether an operator outputs data only after all >its input has been ingested by the operator. >- `isInternalSorterSupported` describes whether an operator will use an >internal sorter when it does not need to do checkpoints. > >And we can further derive that these semantics of two attributes do not >conflict with each other. And we can have valid operators with any of the >four combinations of true/false values for these two attributes. > >In the specific example you described above, let's say isOutputOnEOF = true >and isInternalSorterSupported = true. According to FLIP-331, the checkpoint >is disabled when this operator is running. And according to FLIP-327, this >operator will sort data internally, which means that Flink runtime should >not additionally sort its inputs. So overall the Flink job can comply with >the semantics of these two attributes consistently. > > >Thanks again for taking time to review this FLIP. Please let me know what >you think. > >Best regards, >Dong > > >> Thank you for taking the time to consider my feedback. I eagerly await >> your response. >> >> Best regards, >> >> Wencong Liu >> >> >> >> >> >> >> >> >> >> >> >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote: >> >Hi all, >> > >> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support >> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task >> >deployment. The design doc can be found at >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment >> >. >> > >> >This FLIP introduces isOutputOnEOF operator attribute that JobManager can >> >use to optimize task deployment and resource utilization. In addition, it >> >also adds EndOfStreamWindows that can be used with the DataStream APIs >> >(e.g. cogroup, aggregate) to significantly increase throughput and reduce >> >resource utilization. >> > >> >We would greatly appreciate any comment or feedback you may have on this >> >proposal. >> > >> >Cheers, >> >Dong >>