Hi Dong & Jinhao,

Thanks for your clarification! +1

Best regards,
Wencong

















At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote:
>Hi Wencong,
>
>Thanks for your comments! Please see my reply inline.
>
>On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <liuwencle...@163.com> wrote:
>
>> Dear Dong,
>>
>> I have thoroughly reviewed the proposal for FLIP-331 and believe it would
>> be
>> a valuable addition to Flink. However, I do have a few questions that I
>> would
>> like to discuss:
>>
>>
>> 1. The FLIP-331 proposed the EndOfStreamWindows that is implemented by
>> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which naturally
>> supports WindowedStream and AllWindowedStream to process all records
>> belonging to a key in a 'global' window under both STREAMING and BATCH
>> runtime execution mode.
>>
>>
>> However, besides coGroup and keyBy().aggregate(), other operators on
>> WindowedStream and AllWindowedStream, such as join/reduce, etc, currently
>> are still implemented based on WindowOperator.
>>
>>
>> In fact, these operators can also be implemented without using
>> WindowOperator
>> to prevent additional WindowAssigner#assignWindows or
>> triggerContext#onElement
>> invocation cost. Will there be plans to support these operators in the
>> future?
>>
>
>You are right. The EndOfStreamWindows proposed in this FLIP can potentially
>benefit any DataStream API that takes WindowAssigner as parameters. This
>can involve more operations than aggregate and co-group.
>
>And yes, we have plans to take advantage of this API to optimize these
>operators in the future. This FLIP focuses on the introduction of the
>public APIs and uses aggregate/co-group as the first two examples to
>show-case the performance benefits.
>
>I have added a "Analysis of APIs affected by this FLIP" to list the
>DataStream APIs that can benefit from this FLIP. Would this answer your
>question?
>
>
>>
>> 2. When using EndOfStreamWindows, upstream operators no longer support
>> checkpointing. This limit may be too strict, especially when dealing with
>> bounded data in streaming runtime execution mode, where checkpointing
>> can still be useful.
>>
>
>I am not sure we have a good way to support checkpoint while still
>achieving the performance improves targeted by this FLIP.
>
>The issue here is that if we support checkpoint, then we can not take
>advantage of algorithms (e.g. sorting inputs using ExternalSorter) that are
>not compatible with checkpoints. These algorithms (which do not support
>checkpoint) are the main reasons why batch mode currently significantly
>outperforms stream mode in doing aggregation/cogroup etc.
>
>In most cases where the user does not care about processing latency, it is
>generally preferred to use batch mode to perform aggregation operations
>(which should be 10X faster than the existing stream mode performance)
>instead of doing checkpoint.
>
>Also note that we can still let operators perform failover in the same as
>the existing batch mode execution, where the intermediate results (produced
>by one operator) can be persisted in shuffle service and downstream
>operators can re-read those data from shuffle service after failover.
>
>
>>
>> 3. The proposal mentions that if a transformation has isOutputOnEOF ==
>> true, the
>> operator as well as its upstream operators will be executed in 'batch
>> mode' with
>> checkpointing disabled. I would like to understand the specific
>> implications of this
>> 'batch mode' and if there are any other changes associated with it?
>
>
>Good point. We should explicitly mention the changes. I have updated the
>FLIP to clarify this.
>
>More specifically, the checkpoint is disabled when these operators are
>running, such that these operators can do operations not compatible with
>checkpoints (e.g. sorting inputs). And operators should re-read the data
>from the upstream blocking edge or sources after failover.
>
>Would this answer your question?
>
>
>>
>> Additionally, I am curious to know if this 'batch mode' conflicts with the
>> 'mix mode'
>>
>> described in FLIP-327. While the coGroup and keyBy().aggregate() operators
>> on
>> EndOfStreamWindows have the attribute 'isInternalSorterSupported' set to
>> true,
>> indicating support for the 'mixed mode', they also have isOutputOnEOF set
>> to true,
>> which suggests that the upstream operators should be executed in 'batch
>> mode'.
>> Will the 'mixed mode' be ignored when in 'batch mode'? I would appreciate
>> any
>> clarification on this matter.
>>
>
>Good question. I think `isInternalSorterSupported` and `isOutputOnEOF` do
>not conflict with each other.
>
>It might be useful to recap the semantics of these attributes:
>- `isOutputOnEOF` describes whether an operator outputs data only after all
>its input has been ingested by the operator.
>-  `isInternalSorterSupported` describes whether an operator will use an
>internal sorter when it does not need to do checkpoints.
>
>And we can further derive that these semantics of two attributes do not
>conflict with each other. And we can have valid operators with any of the
>four combinations of true/false values for these two attributes.
>
>In the specific example you described above, let's say isOutputOnEOF = true
>and isInternalSorterSupported = true. According to FLIP-331, the checkpoint
>is disabled when this operator is running. And according to FLIP-327, this
>operator will sort data internally, which means that Flink runtime should
>not additionally sort its inputs. So overall the Flink job can comply with
>the semantics of these two attributes consistently.
>
>
>Thanks again for taking time to review this FLIP. Please let me know what
>you think.
>
>Best regards,
>Dong
>
>
>> Thank you for taking the time to consider my feedback. I eagerly await
>> your response.
>>
>> Best regards,
>>
>> Wencong Liu
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> wrote:
>> >Hi all,
>> >
>> >Jinhao (cc'ed) and I are opening this thread to discuss FLIP-331: Support
>> >EndOfStreamWindows and isOutputOnEOF operator attribute to optimize task
>> >deployment. The design doc can be found at
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
>> >.
>> >
>> >This FLIP introduces isOutputOnEOF operator attribute that JobManager can
>> >use to optimize task deployment and resource utilization. In addition, it
>> >also adds EndOfStreamWindows that can be used with the DataStream APIs
>> >(e.g. cogroup, aggregate) to significantly increase throughput and reduce
>> >resource utilization.
>> >
>> >We would greatly appreciate any comment or feedback you may have on this
>> >proposal.
>> >
>> >Cheers,
>> >Dong
>>

Reply via email to