Sorry, I just sent an incomplete email draft by mistake. Thanks for reviving this, Xuannan. The updated FLIP LGTM. +1 for it.
Best, Xintong On Tue, Jan 23, 2024 at 5:51 PM Xintong Song <tonysong...@gmail.com> wrote: > Thanks > > Best, > > Xintong > > > > On Wed, Jan 10, 2024 at 5:56 PM Xuannan Su <suxuanna...@gmail.com> wrote: > >> Hi all, >> >> After several rounds of offline discussions with Xingtong and Jinhao, >> we have decided to narrow the scope of the FLIP. It will now focus on >> introducing OperatorAttributes that indicate whether an operator emits >> records only after inputs have ended. We will also use the attribute >> to optimize task scheduling for better resource utilization. Setting >> the backlog status and optimizing the operator implementation during >> the backlog will be deferred to future work. >> >> In addition to the change above, we also make the following changes to >> the FLIP to address the problems mentioned by Dong: >> - Public interfaces are updated to reuse the GlobalWindows. >> - Instead of making all outputs of the upstream operators of the >> "isOutputOnlyAfterEndOfStream=true" operator blocking, we only make >> the output of the operator with "isOutputOnlyAfterEndOfStream=true" >> blocking. This can prevent the second problem Dong mentioned. In the >> future, we may introduce an extra OperatorAttributes to indicate if an >> operator has any side output. >> >> I would greatly appreciate any comment or feedback you may have on the >> updated FLIP. >> >> Best regards, >> Xuannan >> >> On Tue, Sep 26, 2023 at 11:24 AM Dong Lin <lindon...@gmail.com> wrote: >> > >> > Hi all, >> > >> > Thanks for the review! >> > >> > Becket and I discussed this FLIP offline and we agreed on several things >> > that need to be improved with this FLIP. I will summarize our discussion >> > with the problems and TODOs. We will update the FLIP and let you know >> once >> > the FLIP is ready for review again. >> > >> > 1) Investigate whether it is possible to update the existing >> GlobalWindows >> > in a backward-compatible way and re-use it for the same purpose >> > as EndOfStreamWindows, without introducing EndOfStreamWindows as a new >> > class. >> > >> > Note that GlobalWindows#getDefaultTrigger returns a NeverTrigger >> instance >> > which will not trigger window's computation even on end-of-inputs. We >> will >> > need to investigate its existing usage and see if we can re-use it in a >> > backward-compatible way. >> > >> > 2) Let JM know whether any operator in the upstream of the operator with >> > "isOutputOnEOF=true" will emit output via any side channel. The FLIP >> should >> > update the execution mode of those operators *only if* all outputs from >> > those operators are emitted only at the end of input. >> > >> > More specifically, the upstream operator might involve a user-defined >> > operator that might emit output directly to an external service, where >> the >> > emission operation is not explicitly expressed as an operator's output >> edge >> > and thus not visible to JM. Similarly, it is also possible for the >> > user-defined operator to register a timer >> > via InternalTimerService#registerEventTimeTimer and emit output to an >> > external service inside Triggerable#onEventTime. There is a chance that >> > users still need related logic to output data in real-time, even if the >> > downstream operators have isOutputOnEOF=true. >> > >> > One possible solution to address this problem is to add an extra >> > OperatorAttribute to specify whether this operator might output records >> in >> > such a way that does not go through operator's output (e.g. side >> output). >> > Then the JM can safely enable the runtime optimization currently >> described >> > in the FLIP when there is no such operator. >> > >> > 3) Create a follow-up FLIP that allows users to specify whether a source >> > with Boundedness=bounded should have isProcessingBacklog=true. >> > >> > This capability would effectively introduce a 3rd strategy to set >> backlog >> > status (in addition to FLIP-309 and FLIP-328). It might be useful to >> note >> > that, even though the data in bounded sources are backlog data in most >> > practical use-cases, it is not necessarily true. For example, users >> might >> > want to start a Flink job to consume real-time data from a Kafka topic >> and >> > specify that the job stops after 24 hours, which means the source is >> > technically bounded while the data is fresh/real-time. >> > >> > This capability is more generic and can cover more use-case than >> > EndOfStreamWindows. On the other hand, EndOfStreamWindows will still be >> > useful in cases where users already need to specify this window >> assigner in >> > a DataStream program, without bothering users to decide whether it is >> safe >> > to treat data in a bounded source as backlog data. >> > >> > >> > Regards, >> > Dong >> > >> > >> > >> > >> > >> > >> > On Mon, Sep 18, 2023 at 2:56 PM Yuxin Tan <tanyuxinw...@gmail.com> >> wrote: >> > >> > > Hi, Dong, >> > > Thanks for your efforts. >> > > >> > > +1 to this proposal, >> > > I believe this will improve the performance in some mixture >> circumstances >> > > of bounded and unbounded workloads. >> > > >> > > Best, >> > > Yuxin >> > > >> > > >> > > Xintong Song <tonysong...@gmail.com> 于2023年9月18日周一 10:56写道: >> > > >> > > > Thanks for addressing my comments, Dong. >> > > > >> > > > LGTM. >> > > > >> > > > Best, >> > > > >> > > > Xintong >> > > > >> > > > >> > > > >> > > > On Sat, Sep 16, 2023 at 3:34 PM Wencong Liu <liuwencle...@163.com> >> > > wrote: >> > > > >> > > > > Hi Dong & Jinhao, >> > > > > >> > > > > Thanks for your clarification! +1 >> > > > > >> > > > > Best regards, >> > > > > Wencong >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > At 2023-09-15 11:26:16, "Dong Lin" <lindon...@gmail.com> wrote: >> > > > > >Hi Wencong, >> > > > > > >> > > > > >Thanks for your comments! Please see my reply inline. >> > > > > > >> > > > > >On Thu, Sep 14, 2023 at 12:30 PM Wencong Liu < >> liuwencle...@163.com> >> > > > > wrote: >> > > > > > >> > > > > >> Dear Dong, >> > > > > >> >> > > > > >> I have thoroughly reviewed the proposal for FLIP-331 and >> believe it >> > > > > would >> > > > > >> be >> > > > > >> a valuable addition to Flink. However, I do have a few >> questions >> > > that >> > > > I >> > > > > >> would >> > > > > >> like to discuss: >> > > > > >> >> > > > > >> >> > > > > >> 1. The FLIP-331 proposed the EndOfStreamWindows that is >> implemented >> > > by >> > > > > >> TimeWindow with maxTimestamp = (Long.MAX_VALUE - 1), which >> naturally >> > > > > >> supports WindowedStream and AllWindowedStream to process all >> records >> > > > > >> belonging to a key in a 'global' window under both STREAMING >> and >> > > BATCH >> > > > > >> runtime execution mode. >> > > > > >> >> > > > > >> >> > > > > >> However, besides coGroup and keyBy().aggregate(), other >> operators on >> > > > > >> WindowedStream and AllWindowedStream, such as join/reduce, etc, >> > > > > currently >> > > > > >> are still implemented based on WindowOperator. >> > > > > >> >> > > > > >> >> > > > > >> In fact, these operators can also be implemented without using >> > > > > >> WindowOperator >> > > > > >> to prevent additional WindowAssigner#assignWindows or >> > > > > >> triggerContext#onElement >> > > > > >> invocation cost. Will there be plans to support these >> operators in >> > > the >> > > > > >> future? >> > > > > >> >> > > > > > >> > > > > >You are right. The EndOfStreamWindows proposed in this FLIP can >> > > > > potentially >> > > > > >benefit any DataStream API that takes WindowAssigner as >> parameters. >> > > This >> > > > > >can involve more operations than aggregate and co-group. >> > > > > > >> > > > > >And yes, we have plans to take advantage of this API to optimize >> these >> > > > > >operators in the future. This FLIP focuses on the introduction >> of the >> > > > > >public APIs and uses aggregate/co-group as the first two >> examples to >> > > > > >show-case the performance benefits. >> > > > > > >> > > > > >I have added a "Analysis of APIs affected by this FLIP" to list >> the >> > > > > >DataStream APIs that can benefit from this FLIP. Would this >> answer >> > > your >> > > > > >question? >> > > > > > >> > > > > > >> > > > > >> >> > > > > >> 2. When using EndOfStreamWindows, upstream operators no longer >> > > support >> > > > > >> checkpointing. This limit may be too strict, especially when >> dealing >> > > > > with >> > > > > >> bounded data in streaming runtime execution mode, where >> > > checkpointing >> > > > > >> can still be useful. >> > > > > >> >> > > > > > >> > > > > >I am not sure we have a good way to support checkpoint while >> still >> > > > > >achieving the performance improves targeted by this FLIP. >> > > > > > >> > > > > >The issue here is that if we support checkpoint, then we can not >> take >> > > > > >advantage of algorithms (e.g. sorting inputs using >> ExternalSorter) >> > > that >> > > > > are >> > > > > >not compatible with checkpoints. These algorithms (which do not >> > > support >> > > > > >checkpoint) are the main reasons why batch mode currently >> > > significantly >> > > > > >outperforms stream mode in doing aggregation/cogroup etc. >> > > > > > >> > > > > >In most cases where the user does not care about processing >> latency, >> > > it >> > > > is >> > > > > >generally preferred to use batch mode to perform aggregation >> > > operations >> > > > > >(which should be 10X faster than the existing stream mode >> performance) >> > > > > >instead of doing checkpoint. >> > > > > > >> > > > > >Also note that we can still let operators perform failover in >> the same >> > > > as >> > > > > >the existing batch mode execution, where the intermediate results >> > > > > (produced >> > > > > >by one operator) can be persisted in shuffle service and >> downstream >> > > > > >operators can re-read those data from shuffle service after >> failover. >> > > > > > >> > > > > > >> > > > > >> >> > > > > >> 3. The proposal mentions that if a transformation has >> isOutputOnEOF >> > > == >> > > > > >> true, the >> > > > > >> operator as well as its upstream operators will be executed in >> > > 'batch >> > > > > >> mode' with >> > > > > >> checkpointing disabled. I would like to understand the specific >> > > > > >> implications of this >> > > > > >> 'batch mode' and if there are any other changes associated >> with it? >> > > > > > >> > > > > > >> > > > > >Good point. We should explicitly mention the changes. I have >> updated >> > > the >> > > > > >FLIP to clarify this. >> > > > > > >> > > > > >More specifically, the checkpoint is disabled when these >> operators are >> > > > > >running, such that these operators can do operations not >> compatible >> > > with >> > > > > >checkpoints (e.g. sorting inputs). And operators should re-read >> the >> > > data >> > > > > >from the upstream blocking edge or sources after failover. >> > > > > > >> > > > > >Would this answer your question? >> > > > > > >> > > > > > >> > > > > >> >> > > > > >> Additionally, I am curious to know if this 'batch mode' >> conflicts >> > > with >> > > > > the >> > > > > >> 'mix mode' >> > > > > >> >> > > > > >> described in FLIP-327. While the coGroup and >> keyBy().aggregate() >> > > > > operators >> > > > > >> on >> > > > > >> EndOfStreamWindows have the attribute >> 'isInternalSorterSupported' >> > > set >> > > > to >> > > > > >> true, >> > > > > >> indicating support for the 'mixed mode', they also have >> > > isOutputOnEOF >> > > > > set >> > > > > >> to true, >> > > > > >> which suggests that the upstream operators should be executed >> in >> > > > 'batch >> > > > > >> mode'. >> > > > > >> Will the 'mixed mode' be ignored when in 'batch mode'? I would >> > > > > appreciate >> > > > > >> any >> > > > > >> clarification on this matter. >> > > > > >> >> > > > > > >> > > > > >Good question. I think `isInternalSorterSupported` and >> `isOutputOnEOF` >> > > > do >> > > > > >not conflict with each other. >> > > > > > >> > > > > >It might be useful to recap the semantics of these attributes: >> > > > > >- `isOutputOnEOF` describes whether an operator outputs data only >> > > after >> > > > > all >> > > > > >its input has been ingested by the operator. >> > > > > >- `isInternalSorterSupported` describes whether an operator >> will use >> > > an >> > > > > >internal sorter when it does not need to do checkpoints. >> > > > > > >> > > > > >And we can further derive that these semantics of two attributes >> do >> > > not >> > > > > >conflict with each other. And we can have valid operators with >> any of >> > > > the >> > > > > >four combinations of true/false values for these two attributes. >> > > > > > >> > > > > >In the specific example you described above, let's say >> isOutputOnEOF = >> > > > > true >> > > > > >and isInternalSorterSupported = true. According to FLIP-331, the >> > > > > checkpoint >> > > > > >is disabled when this operator is running. And according to >> FLIP-327, >> > > > this >> > > > > >operator will sort data internally, which means that Flink >> runtime >> > > > should >> > > > > >not additionally sort its inputs. So overall the Flink job can >> comply >> > > > with >> > > > > >the semantics of these two attributes consistently. >> > > > > > >> > > > > > >> > > > > >Thanks again for taking time to review this FLIP. Please let me >> know >> > > > what >> > > > > >you think. >> > > > > > >> > > > > >Best regards, >> > > > > >Dong >> > > > > > >> > > > > > >> > > > > >> Thank you for taking the time to consider my feedback. I >> eagerly >> > > await >> > > > > >> your response. >> > > > > >> >> > > > > >> Best regards, >> > > > > >> >> > > > > >> Wencong Liu >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> At 2023-09-01 11:21:47, "Dong Lin" <lindon...@gmail.com> >> wrote: >> > > > > >> >Hi all, >> > > > > >> > >> > > > > >> >Jinhao (cc'ed) and I are opening this thread to discuss >> FLIP-331: >> > > > > Support >> > > > > >> >EndOfStreamWindows and isOutputOnEOF operator attribute to >> optimize >> > > > > task >> > > > > >> >deployment. The design doc can be found at >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-331%3A+Support+EndOfStreamWindows++and+isOutputOnEOF+operator+attribute+to+optimize+task+deployment >> > > > > >> >. >> > > > > >> > >> > > > > >> >This FLIP introduces isOutputOnEOF operator attribute that >> > > JobManager >> > > > > can >> > > > > >> >use to optimize task deployment and resource utilization. In >> > > > addition, >> > > > > it >> > > > > >> >also adds EndOfStreamWindows that can be used with the >> DataStream >> > > > APIs >> > > > > >> >(e.g. cogroup, aggregate) to significantly increase >> throughput and >> > > > > reduce >> > > > > >> >resource utilization. >> > > > > >> > >> > > > > >> >We would greatly appreciate any comment or feedback you may >> have on >> > > > > this >> > > > > >> >proposal. >> > > > > >> > >> > > > > >> >Cheers, >> > > > > >> >Dong >> > > > > >> >> > > > > >> > > > >> > > >> >