Hi Shammon, Thanks for your comments. Please see my reply inline.
On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com> wrote: > Hi Dong and Yunfeng, > > Thanks for bringing up this discussion. > > As described in the FLIP, the differences between `end-to-end latency` and > `table.exec.mini-batch.allow-latency` are: "It allows users to specify the > end-to-end latency, whereas table.exec.mini-batch.allow-latency applies to > each operator. If there are N operators on the path from source to sink, > the end-to-end latency could be up to table.exec.mini-batch.allow-latency * > N". > > If I understand correctly, `table.exec.mini-batch.allow-latency` is also > applied to the end-to-end latency for a job, maybe @Jack Wu can give more > information. > Based on what I can tell from the doc/code and offline discussion, I believe table.exec.mini-batch.allow-latency is not applied to the end-to-end latency for a job. It is mentioned here <https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/> that table.exec.mini-batch.allow-latency is "the maximum latency can be used for MiniBatch to buffer input records". I think we should have mentioned that the config is applied to the end-to-end latency in this doc if it is indeed the case. > So, from my perspective, and please correct me if I'm misunderstand, the > targets of this FLIP may include the following: > > 1. Support a mechanism like `mini-batch` in SQL for `DataStream`, which > will collect data in the operator and flush data when it receives a `flush` > event, in the FLIP it is `FlushEvent`. > I think the goal is to allow users to specify an end-to-end latency budget for the job. IMO it is quite different from the `mini-batch` in SQL. > > 2. Support dynamic `latency` according to the progress of job, such as > snapshot stage and after that. > > To do that, I have some questions: > > 1. I didn't understand the purpose of public interface `RecordAttributes`. > I think `FlushEvent` in the FLIP is enough, and different > `DynamicFlushStrategy` can be added to generate flush events to address > different needs, such as a static interval similar to mini-batch in SQL or > collect the information `isProcessingBacklog` and metrics to generate > `FlushEvent` which is described in your FLIP? If hudi sink needs the > `isBacklog` flag, the hudi `SplitEnumerator` can create an operator event > and send it to hudi source reader. > Suppose we only have FlushEvent, then operators (e.g. Hudi Sink) will not know they can buffer data in the following scenario: - execution.allowed-latency is not configured and use the default value null. - The job is reading from HybridSource and HybridSource says isBacklog=true. Also note that Hudi Sink might not be the only operators that can benefit from knowing isBacklog=true. Other sinks and aggregation operators (e.g. CoGroup) can also increase throughput by buffering/sorting records when there is backlog. So it seems simpler to pass RecordAttributes to these operators than asking every operator developer to create operator event etc. > > 2. How is this new mechanism unified with SQL's mini-batch mechanism? As > far as I am concerned, SQL implements mini-batch mechanism based on > watermark, I think it is very unreasonable to have two different > implementation in SQL and DataStream. > I think we can deprecate table.exec.mini-batch.allow-latency later once execution.allowed-latency is ready for production usage. This is mentioned in the "Compatibility, Deprecation, and Migration Plan" section. If there is a config that supports user specifying the e2e latency, it is probably reasonable for this config to work for both DataStream and SQL. > 3. I notice that the `CheckpointCoordinator` will generate `FlushEvent`, > which information about `FlushEvent` will be stored in > CheckpointCoordinator might need to send FlushEvent before triggering checkpoint in order to deal with the two-phase commit sinks. The algorithm is specified in the "Proposed Changes" section. > `Checkpoint`? What is the alignment strategy for FlushEvent in the > operator? The operator will flush the data when it receives all > `FlushEvent` from upstream with the same ID or do flush for each > `FlushEvent`? Can you give more detailed proposal about that? We also have > a demand for this piece, thanks > After an operator has received a FlushEvent: - If the ID of the received FlushEvent is larger than the largest ID this operator has received, then flush() is triggered for this operator and the operator should broadcast FlushEvent to downstream operators. - Otherwise, this FlushEvent is ignored. This behavior is specified in the Java doc of the FlushEvent. Can you see if this answers your questions? Best, Dong > > > Best, > Shammon FY > > > > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Dong and Yunfeng, >> >> Thanks for the FLIP. What's not clear for me is what's the expected >> behaviour when the allowed latency can't be met, for whatever reason. >> Given that we're talking about an "allowed latency", it implies that >> something has gone wrong and should fail? Isn't this more a minimum >> latency that you're proposing? >> >> There's also the part about the Hudi Sink processing records >> immediately upon arrival. Given that the SinkV2 API provides the >> ability for custom post and pre-commit topologies [1], specifically >> targeted to avoid generating multiple small files, why isn't that >> sufficient for the Hudi Sink? It would be great to see that added >> under Rejected Alternatives if this is indeed not sufficient. >> >> Best regards, >> >> Martijn >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction >> >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou >> <flink.zhouyunf...@gmail.com> wrote: >> > >> > Hi all, >> > >> > Dong(cc'ed) and I are opening this thread to discuss our proposal to >> > support configuring end-to-end allowed latency for Flink jobs, which >> > has been documented in FLIP-325 >> > < >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency >> >. >> > >> > By configuring the latency requirement for a Flink job, users would be >> > able to optimize the throughput and overhead of the job while still >> > acceptably increasing latency. This approach is particularly useful >> > when dealing with records that do not require immediate processing and >> > emission upon arrival. >> > >> > Please refer to the FLIP document for more details about the proposed >> > design and implementation. We welcome any feedback and opinions on >> > this proposal. >> > >> > Best regards. >> > >> > Dong and Yunfeng >> >