Sorry, I just sent an incomplete email draft by mistake.

Thanks for reviving this, Xuannan. The updated FLIP LGTM. +1 for it.

Best,

Xintong



On Tue, Jan 23, 2024 at 5:51 PM Xintong Song <tonysong...@gmail.com> wrote:

> Thanks
>
> Best,
>
> Xintong
>
>
>
> On Wed, Jan 10, 2024 at 5:56 PM Xuannan Su <suxuanna...@gmail.com> wrote:
>
>> Hi all,
>>
>> After several rounds of offline discussions with Xingtong and Jinhao,
>> we have decided to narrow the scope of the FLIP. It will now focus on
>> introducing OperatorAttributes that indicate whether an operator emits
>> records only after inputs have ended. We will also use the attribute
>> to optimize task scheduling for better resource utilization. Setting
>> the backlog status and optimizing the operator implementation during
>> the backlog will be deferred to future work.
>>
>> In addition to the change above, we also make the following changes to
>> the FLIP to address the problems mentioned by Dong:
>> - Public interfaces are updated to reuse the GlobalWindows.
>> - Instead of making all outputs of the upstream operators of the
>> "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make
>> the output of the operator with "isOutputOnlyAfterEndOfStream=true"
>> blocking. This can prevent the second problem Dong mentioned. In the
>> future, we may introduce an extra OperatorAttributes to indicate if an
>> operator has any side output.
>>
>> I would greatly appreciate any comment or feedback you may have on the
>> updated FLIP.
>>
>> Best regards,
>> Xuannan
>>
>> On Tue, Sep 26, 2023 at 11:24 AM Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > Thanks for the review!
>> >
>> > Becket and I discussed this FLIP offline and we agreed on several things
>> > that need to be improved with this FLIP. I will summarize our discussion
>> > with the problems and TODOs. We will update the FLIP and let you know
>> once
>> > the FLIP is ready for review again.
>> >
>> > 1) Investigate whether it is possible to update the existing
>> GlobalWindows
>> > in a backward-compatible way and re-use it for the same purpose
>> > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new
>> > class.
>> >
>> > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger
>> instance
>> > which will not trigger window's computation even on end-of-inputs. We
>> will
>> > need to investigate its existing usage and see if we can re-use it in a
>> > backward-compatible way.
>> >
>> > 2) Let JM know whether any operator in the upstream of the operator with
>> > "isOutputOnEOF=true" will emit output via any side channel. The FLIP
>> should
>> > update the execution mode of those operators *only if* all outputs from
>> > those operators are emitted only at the end of input.
>> >
>> > More specifically, the upstream operator might involve a user-defined
>> > operator that might emit output directly to an external service, where
>> the
>> > emission operation is not explicitly expressed as an operator's output
>> edge
>> > and thus not visible to JM. Similarly, it is also possible for the
>> > user-defined operator to register a timer
>> > via InternalTimerService#registerEventTimeTimer and emit output to an
>> > external service inside Triggerable#onEventTime. There is a chance that
>> > users still need related logic to output data in real-time, even if the
>> > downstream operators have isOutputOnEOF=true.
>> >
>> > One possible solution to address this problem is to add an extra
>> > OperatorAttribute to specify whether this operator might output records
>> in
>> > such a way that does not go through operator's output (e.g. side
>> output).
>> > Then the JM can safely enable the runtime optimization currently
>> described
>> > in the FLIP when there is no such operator.
>> >
>> > 3) Create a follow-up FLIP that allows users to specify whether a source
>> > with Boundedness=bounded should have isProcessingBacklog=true.
>> >
>> > This capability would effectively introduce a 3rd strategy to set
>> backlog
>> > status (in addition to FLIP-309 and FLIP-328). It might be useful to
>> note
>> > that, even though the data in bounded sources are backlog data in most
>> > practical use-cases, it is not necessarily true. For example, users
>> might
>> > want to start a Flink job to consume real-time data from a Kafka topic
>> and
>> > specify that the job stops after 24 hours, which means the source is
>> > technically bounded while the data is fresh/real-time.
>> >
>> > This capability is more generic and can cover more use-case than
>> > EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be
>> > useful in cases where users already need to specify this window
>> assigner in
>> > a DataStream program, without bothering users to decide whether it is
>> safe
>> > to treat data in a bounded source as backlog data.
>> >
>> >
>> > Regards,
>> > Dong
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan <tanyuxinw...@gmail.com>
>> wrote:
>> >
>> > > Hi, Dong,
>> > > Thanks for your efforts.
>> > >
>> > > +1 to this proposal,
>> > > I believe this will improve the performance in some mixture
>> circumstances
>> > > of bounded and unbounded workloads.
>> > >
>> > > Best,
>> > > Yuxin
>> > >
>> > >
>> > > Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道:
>> > >
>> > > > Thanks for addressing my comments, Dong.
>> > > >
>> > > > LGTM.
>> > > >
>> > > > Best,
>> > > >
>> > > > Xintong
>> > > >
>> > > >
>> > > >
>> > > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com>
>> > > wrote:
>> > > >
>> > > > > Hi Dong & Jinhao,
>> > > > >
>> > > > > Thanks for your clarification! +1
>> > > > >
>> > > > > Best regards,
>> > > > > Wencong
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > >
>> > > > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote:
>> > > > > >Hi Wencong,
>> > > > > >
>> > > > > >Thanks for your comments! Please see my reply inline.
>> > > > > >
>> > > > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu <
>> liuwencle...@163.com>
>> > > > > wrote:
>> > > > > >
>> > > > > >> Dear Dong,
>> > > > > >>
>> > > > > >> I have thoroughly reviewed the proposal for FLIP-331 and
>> believe it
>> > > > > would
>> > > > > >> be
>> > > > > >> a valuable addition to Flink. However, I do have a few
>> questions
>> > > that
>> > > > I
>> > > > > >> would
>> > > > > >> like to discuss:
>> > > > > >>
>> > > > > >>
>> > > > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is
>> implemented
>> > > by
>> > > > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which
>> naturally
>> > > > > >> supports WindowedStream and AllWindowedStream to process all
>> records
>> > > > > >> belonging to a key in a 'global' window under both STREAMING
>> and
>> > > BATCH
>> > > > > >> runtime execution mode.
>> > > > > >>
>> > > > > >>
>> > > > > >> However, besides coGroup and keyBy().aggregate(), other
>> operators on
>> > > > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc,
>> > > > > currently
>> > > > > >> are still implemented based on WindowOperator.
>> > > > > >>
>> > > > > >>
>> > > > > >> In fact, these operators can also be implemented without using
>> > > > > >> WindowOperator
>> > > > > >> to prevent additional WindowAssigner#assignWindows or
>> > > > > >> triggerContext#onElement
>> > > > > >> invocation cost. Will there be plans to support these
>> operators in
>> > > the
>> > > > > >> future?
>> > > > > >>
>> > > > > >
>> > > > > >You are right. The EndOfStreamWindows proposed in this FLIP can
>> > > > > potentially
>> > > > > >benefit any DataStream API that takes WindowAssigner as
>> parameters.
>> > > This
>> > > > > >can involve more operations than aggregate and co-group.
>> > > > > >
>> > > > > >And yes, we have plans to take advantage of this API to optimize
>> these
>> > > > > >operators in the future. This FLIP focuses on the introduction
>> of the
>> > > > > >public APIs and uses aggregate/co-group as the first two
>> examples to
>> > > > > >show-case the performance benefits.
>> > > > > >
>> > > > > >I have added a "Analysis of APIs affected by this FLIP" to list
>> the
>> > > > > >DataStream APIs that can benefit from this FLIP. Would this
>> answer
>> > > your
>> > > > > >question?
>> > > > > >
>> > > > > >
>> > > > > >>
>> > > > > >> 2. When using EndOfStreamWindows, upstream operators no longer
>> > > support
>> > > > > >> checkpointing. This limit may be too strict, especially when
>> dealing
>> > > > > with
>> > > > > >> bounded data in streaming runtime execution mode, where
>> > > checkpointing
>> > > > > >> can still be useful.
>> > > > > >>
>> > > > > >
>> > > > > >I am not sure we have a good way to support checkpoint while
>> still
>> > > > > >achieving the performance improves targeted by this FLIP.
>> > > > > >
>> > > > > >The issue here is that if we support checkpoint, then we can not
>> take
>> > > > > >advantage of algorithms (e.g. sorting inputs using
>> ExternalSorter)
>> > > that
>> > > > > are
>> > > > > >not compatible with checkpoints. These algorithms (which do not
>> > > support
>> > > > > >checkpoint) are the main reasons why batch mode currently
>> > > significantly
>> > > > > >outperforms stream mode in doing aggregation/cogroup etc.
>> > > > > >
>> > > > > >In most cases where the user does not care about processing
>> latency,
>> > > it
>> > > > is
>> > > > > >generally preferred to use batch mode to perform aggregation
>> > > operations
>> > > > > >(which should be 10X faster than the existing stream mode
>> performance)
>> > > > > >instead of doing checkpoint.
>> > > > > >
>> > > > > >Also note that we can still let operators perform failover in
>> the same
>> > > > as
>> > > > > >the existing batch mode execution, where the intermediate results
>> > > > > (produced
>> > > > > >by one operator) can be persisted in shuffle service and
>> downstream
>> > > > > >operators can re-read those data from shuffle service after
>> failover.
>> > > > > >
>> > > > > >
>> > > > > >>
>> > > > > >> 3. The proposal mentions that if a transformation has
>> isOutputOnEOF
>> > > ==
>> > > > > >> true, the
>> > > > > >> operator as well as its upstream operators will be executed in
>> > > 'batch
>> > > > > >> mode' with
>> > > > > >> checkpointing disabled. I would like to understand the specific
>> > > > > >> implications of this
>> > > > > >> 'batch mode' and if there are any other changes associated
>> with it?
>> > > > > >
>> > > > > >
>> > > > > >Good point. We should explicitly mention the changes. I have
>> updated
>> > > the
>> > > > > >FLIP to clarify this.
>> > > > > >
>> > > > > >More specifically, the checkpoint is disabled when these
>> operators are
>> > > > > >running, such that these operators can do operations not
>> compatible
>> > > with
>> > > > > >checkpoints (e.g. sorting inputs). And operators should re-read
>> the
>> > > data
>> > > > > >from the upstream blocking edge or sources after failover.
>> > > > > >
>> > > > > >Would this answer your question?
>> > > > > >
>> > > > > >
>> > > > > >>
>> > > > > >> Additionally, I am curious to know if this 'batch mode'
>> conflicts
>> > > with
>> > > > > the
>> > > > > >> 'mix mode'
>> > > > > >>
>> > > > > >> described in FLIP-327. While the coGroup and
>> keyBy().aggregate()
>> > > > > operators
>> > > > > >> on
>> > > > > >> EndOfStreamWindows have the attribute
>> 'isInternalSorterSupported'
>> > > set
>> > > > to
>> > > > > >> true,
>> > > > > >> indicating support for the 'mixed mode', they also have
>> > > isOutputOnEOF
>> > > > > set
>> > > > > >> to true,
>> > > > > >> which suggests that the upstream operators should be executed
>> in
>> > > > 'batch
>> > > > > >> mode'.
>> > > > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would
>> > > > > appreciate
>> > > > > >> any
>> > > > > >> clarification on this matter.
>> > > > > >>
>> > > > > >
>> > > > > >Good question. I think `isInternalSorterSupported` and
>> `isOutputOnEOF`
>> > > > do
>> > > > > >not conflict with each other.
>> > > > > >
>> > > > > >It might be useful to recap the semantics of these attributes:
>> > > > > >- `isOutputOnEOF` describes whether an operator outputs data only
>> > > after
>> > > > > all
>> > > > > >its input has been ingested by the operator.
>> > > > > >-  `isInternalSorterSupported` describes whether an operator
>> will use
>> > > an
>> > > > > >internal sorter when it does not need to do checkpoints.
>> > > > > >
>> > > > > >And we can further derive that these semantics of two attributes
>> do
>> > > not
>> > > > > >conflict with each other. And we can have valid operators with
>> any of
>> > > > the
>> > > > > >four combinations of true/false values for these two attributes.
>> > > > > >
>> > > > > >In the specific example you described above, let's say
>> isOutputOnEOF =
>> > > > > true
>> > > > > >and isInternalSorterSupported = true. According to FLIP-331, the
>> > > > > checkpoint
>> > > > > >is disabled when this operator is running. And according to
>> FLIP-327,
>> > > > this
>> > > > > >operator will sort data internally, which means that Flink
>> runtime
>> > > > should
>> > > > > >not additionally sort its inputs. So overall the Flink job can
>> comply
>> > > > with
>> > > > > >the semantics of these two attributes consistently.
>> > > > > >
>> > > > > >
>> > > > > >Thanks again for taking time to review this FLIP. Please let me
>> know
>> > > > what
>> > > > > >you think.
>> > > > > >
>> > > > > >Best regards,
>> > > > > >Dong
>> > > > > >
>> > > > > >
>> > > > > >> Thank you for taking the time to consider my feedback. I
>> eagerly
>> > > await
>> > > > > >> your response.
>> > > > > >>
>> > > > > >> Best regards,
>> > > > > >>
>> > > > > >> Wencong Liu
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com>
>> wrote:
>> > > > > >> >Hi all,
>> > > > > >> >
>> > > > > >> >Jinhao (cc'ed) and I are opening this thread to discuss
>> FLIP-331:
>> > > > > Support
>> > > > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to
>> optimize
>> > > > > task
>> > > > > >> >deployment. The design doc can be found at
>> > > > > >> >
>> > > > > >>
>> > > > >
>> > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment
>> > > > > >> >.
>> > > > > >> >
>> > > > > >> >This FLIP introduces isOutputOnEOF operator attribute that
>> > > JobManager
>> > > > > can
>> > > > > >> >use to optimize task deployment and resource utilization. In
>> > > > addition,
>> > > > > it
>> > > > > >> >also adds EndOfStreamWindows that can be used with the
>> DataStream
>> > > > APIs
>> > > > > >> >(e.g. cogroup, aggregate) to significantly increase
>> throughput and
>> > > > > reduce
>> > > > > >> >resource utilization.
>> > > > > >> >
>> > > > > >> >We would greatly appreciate any comment or feedback you may
>> have on
>> > > > > this
>> > > > > >> >proposal.
>> > > > > >> >
>> > > > > >> >Cheers,
>> > > > > >> >Dong
>> > > > > >>
>> > > > >
>> > > >
>> > >
>>
>

Reply via email to