Hi Martijn, Thanks for your feedback! Please see my replhy inline.
On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Dong and Yunfeng, > > Thanks for the FLIP. What's not clear for me is what's the expected > behaviour when the allowed latency can't be met, for whatever reason. > Given that we're talking about an "allowed latency", it implies that > something has gone wrong and should fail? Isn't this more a minimum > latency that you're proposing? > execution.allowed-latency is not really a minimum latency because we do not require operators/job to buffer records based on this configuration. The actual latency can be much lower due to the following reasons: - The job is composed of operators (e.g. simple map operation) which can not increase throughput by delaying the processing. - The operators in the job have not been upgraded to take advantage of the allowed latency. - The operator needs to flush data at a higher frequency due to limited buffer space. execution.allowed-latency effectively gives operators a signal that they can choose the increase throughput at the cost of higher e2e latency. If Flink can not meet the allowed-latency, such as when the Flink is overloaded by traffic spike, nothing particular happens (no error is logged). This is similar to when we fail to trigger checkpoint at the user-specified checkpointing interval due to high checkpoint time. The above behavior (e.g. this config is best effort) is specified in the config's description. Can you see if the configuration's description can address your concern? I understand the config name might look confusing. We have considered a few other options but could not find a better one. For example, execution.target-latency is rejected because we want latency to be lower if the operator can not benefit from buffering reords. And execution.flush.interval is rejected because we might flush at higher frequency to deal with two-phase-commit sink (see Proposed Changes section). I am happy to change the config name based on your suggestions. > > There's also the part about the Hudi Sink processing records > immediately upon arrival. Given that the SinkV2 API provides the > ability for custom post and pre-commit topologies [1], specifically > targeted to avoid generating multiple small files, why isn't that > sufficient for the Hudi Sink? It would be great to see that added > under Rejected Alternatives if this is indeed not sufficient. > After reading through FLIP-191, I think these two FLIPs complement each other in addressing the small-file-compaction. FLIP-191 provides the SinkV2 API so that sink developers can plugin custom logic to merge small files before making the files visible. However, without FLIP-325, sink operators have to trigger compaction and make the compacted files visible to downstream jobs everytime a checkpoint is triggered. When the frequency of making files visible to Hudi increases, the number of files in the Hudi will also likely increase (for the same amount of input data). FLIP-325 can help reduce the number of files (and also increase sink throughput) by allowing Hudi Sink to optionally reduce the frequency of making data visible to Hudi when there is backlog. Does this answer your question? Best, Dong > Best regards, > > Martijn > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction > > On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou > <flink.zhouyunf...@gmail.com> wrote: > > > > Hi all, > > > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > > support configuring end-to-end allowed latency for Flink jobs, which > > has been documented in FLIP-325 > > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency > >. > > > > By configuring the latency requirement for a Flink job, users would be > > able to optimize the throughput and overhead of the job while still > > acceptably increasing latency. This approach is particularly useful > > when dealing with records that do not require immediate processing and > > emission upon arrival. > > > > Please refer to the FLIP document for more details about the proposed > > design and implementation. We welcome any feedback and opinions on > > this proposal. > > > > Best regards. > > > > Dong and Yunfeng >