Hi everyone, Thanks for your valuable feedback!
Our discussions have been going on for a while and are nearing a consensus. So I would like to start a vote after 72 hours. Please let me know if you have any concerns, thanks! Best, Zakelly On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Yunfeng, > > Thanks for the suggestion! > > I will reorganize the FLIP-425 accordingly. > > > Best, > Zakelly > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> > wrote: > >> Hi Xintong and Zakelly, >> >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks >> I agree with it that watermarks can use only out-of-order mode for >> now, because there is still not a concrete example showing the >> correctness risk about it. However, the strictly-ordered mode should >> still be supported as the default option for non-record event types >> other than watermark, at least for checkpoint barriers. >> >> I noticed that this information has already been documented in "For >> other non-record events, such as RecordAttributes ...", but it's at >> the bottom of the "Watermark" section, which might not be very >> obvious. Thus it might be better to reorganize the FLIP to better >> claim that the two order modes are designed for all non-record events, >> and which mode this FLIP would choose for each type of event. >> >> Best, >> Yunfeng >> >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com> >> wrote: >> > >> > Thanks for the quick response. Sounds good to me. >> > >> > Best, >> > >> > Xintong >> > >> > >> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com> >> wrote: >> > >> > > Hi Xintong, >> > > >> > > Thanks for sharing your thoughts! >> > > >> > > 1. Regarding Record-ordered and State-ordered of processElement. >> > > > >> > > > I understand that while State-ordered likely provides better >> performance, >> > > > Record-ordered is sometimes required for correctness. The question >> is how >> > > > should a user choose between these two modes? My concern is that >> such a >> > > > decision may require users to have in-depth knowledge about the >> Flink >> > > > internals, and may lead to correctness issues if State-ordered is >> chosen >> > > > improperly. >> > > > >> > > > I'd suggest not to expose such a knob, at least in the first >> version. >> > > That >> > > > means always use Record-ordered for custom operators / UDFs, and >> keep >> > > > State-ordered for internal usages (built-in operators) only. >> > > > >> > > >> > > Indeed, users may not be able to choose the mode properly. I agree to >> keep >> > > such options for internal use. >> > > >> > > >> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. >> > > > >> > > > I'm not entirely sure about Strictly-ordered being the default, or >> even >> > > > being supported. From my understanding, a Watermark(T) only >> suggests that >> > > > all records with event time before T has arrived, and it has >> nothing to >> > > do >> > > > with whether records with event time after T has arrived or not. >> From >> > > that >> > > > perspective, preventing certain records from arriving before a >> Watermark >> > > is >> > > > never supported. I also cannot come up with any use case where >> > > > Strictly-ordered is necessary. This implies the same issue as 1): >> how >> > > does >> > > > the user choose between the two modes? >> > > > >> > > > I'd suggest not expose the knob to users and only support >> Out-of-order, >> > > > until we see a concrete use case that Strictly-ordered is needed. >> > > > >> > > >> > > The semantics of watermarks do not define the sequence between a >> watermark >> > > and subsequent records. For the most part, this is inconsequential, >> except >> > > it may affect some current users who have previously relied on the >> implicit >> > > assumption of an ordered execution. I'd be fine with initially >> supporting >> > > only out-of-order processing. We may consider exposing the >> > > 'Strictly-ordered' mode once we encounter a concrete use case that >> > > necessitates it. >> > > >> > > >> > > My philosophies behind not exposing the two config options are: >> > > > - There are already too many options in Flink that barely know how >> to use >> > > > them. I think Flink should try as much as possible to decide its own >> > > > behavior, rather than throwing all the decisions to the users. >> > > > - It's much harder to take back knobs than to introduce them. >> Therefore, >> > > > options should be introduced only if concrete use cases are >> identified. >> > > > >> > > >> > > I agree to keep minimal configurable items especially for the MVP. >> Given >> > > that we have the opportunity to refine the functionality before the >> > > framework transitions from @Experimental to @PublicEvolving, it makes >> sense >> > > to refrain from presenting user-facing options until we have ensured >> > > their necessity. >> > > >> > > >> > > Best, >> > > Zakelly >> > > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com> >> > > wrote: >> > > >> > > > Sorry for joining the discussion late. >> > > > >> > > > I have two questions about FLIP-425. >> > > > >> > > > 1. Regarding Record-ordered and State-ordered of processElement. >> > > > >> > > > I understand that while State-ordered likely provides better >> performance, >> > > > Record-ordered is sometimes required for correctness. The question >> is how >> > > > should a user choose between these two modes? My concern is that >> such a >> > > > decision may require users to have in-depth knowledge about the >> Flink >> > > > internals, and may lead to correctness issues if State-ordered is >> chosen >> > > > improperly. >> > > > >> > > > I'd suggest not to expose such a knob, at least in the first >> version. >> > > That >> > > > means always use Record-ordered for custom operators / UDFs, and >> keep >> > > > State-ordered for internal usages (built-in operators) only. >> > > > >> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. >> > > > >> > > > I'm not entirely sure about Strictly-ordered being the default, or >> even >> > > > being supported. From my understanding, a Watermark(T) only >> suggests that >> > > > all records with event time before T has arrived, and it has >> nothing to >> > > do >> > > > with whether records with event time after T has arrived or not. >> From >> > > that >> > > > perspective, preventing certain records from arriving before a >> Watermark >> > > is >> > > > never supported. I also cannot come up with any use case where >> > > > Strictly-ordered is necessary. This implies the same issue as 1): >> how >> > > does >> > > > the user choose between the two modes? >> > > > >> > > > I'd suggest not expose the knob to users and only support >> Out-of-order, >> > > > until we see a concrete use case that Strictly-ordered is needed. >> > > > >> > > > >> > > > My philosophies behind not exposing the two config options are: >> > > > - There are already too many options in Flink that barely know how >> to use >> > > > them. I think Flink should try as much as possible to decide its own >> > > > behavior, rather than throwing all the decisions to the users. >> > > > - It's much harder to take back knobs than to introduce them. >> Therefore, >> > > > options should be introduced only if concrete use cases are >> identified. >> > > > >> > > > WDYT? >> > > > >> > > > Best, >> > > > >> > > > Xintong >> > > > >> > > > >> > > > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid> >> > > wrote: >> > > > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which >> introduced >> > > the >> > > > > intention of the big change and high level architecture. Great >> content >> > > > btw! >> > > > > The only public interface change for this FLIP is one new config >> to use >> > > > > ForSt. It makes sense to have one dedicated discussion thread for >> each >> > > > > concrete system design. >> > > > > >> > > > > @Zakelly The links in your mail do not work except the last one, >> > > because >> > > > > the FLIP-xxx has been included into the url like >> > > > > >> > > >> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 >> > > > . >> > > > > >> > > > > NIT fix: >> > > > > >> > > > > FLIP-424: >> > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 >> > > > > >> > > > > FLIP-425: >> > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h >> > > > > >> > > > > FLIP-426: >> > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf >> > > > > >> > > > > FLIP-427: >> > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft >> > > > > >> > > > > FLIP-428: >> > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b >> > > > > >> > > > > Best regards, >> > > > > Jing >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan < >> zakelly....@gmail.com> >> > > > wrote: >> > > > > >> > > > > > Hi everyone, >> > > > > > >> > > > > > Thank you all for a lively discussion here, and it is a good >> time to >> > > > move >> > > > > > forward to more detailed discussions. Thus we open several >> threads >> > > for >> > > > > > sub-FLIPs: >> > > > > > >> > > > > > FLIP-424: >> > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 >> > > > > > FLIP-425 >> > > > > > < >> > > > > >> > > >> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 >> > > > >: >> > > > > > >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h >> > > > > > FLIP-426 >> > > > > > < >> > > > > >> > > >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426 >> > > > >: >> > > > > > >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf >> > > > > > FLIP-427 >> > > > > > < >> > > > > >> > > >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427 >> > > > >: >> > > > > > >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft >> > > > > > FLIP-428 >> > > > > > < >> > > > > >> > > >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428 >> > > > >: >> > > > > > >> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b >> > > > > > >> > > > > > If you want to talk about the overall architecture, roadmap, >> > > milestones >> > > > > or >> > > > > > something related with multiple FLIPs, please post it here. >> Otherwise >> > > > you >> > > > > > can discuss some details in separate mails. Let's try to avoid >> > > repeated >> > > > > > discussion in different threads. I will sync important messages >> here >> > > if >> > > > > > there are any in the above threads. >> > > > > > >> > > > > > And reply to @Jeyhun: We will ensure the content between those >> FLIPs >> > > is >> > > > > > consistent. >> > > > > > >> > > > > > >> > > > > > Best, >> > > > > > Zakelly >> > > > > > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com >> > >> > > > wrote: >> > > > > > >> > > > > > > I have been a bit busy these few weeks and sorry for >> responding >> > > late. >> > > > > > > >> > > > > > > The original thinking of keeping discussion within one thread >> is >> > > for >> > > > > > easier >> > > > > > > tracking and avoid for repeated discussion in different >> threads. >> > > > > > > >> > > > > > > For details, It might be good to start in different threads if >> > > > needed. >> > > > > > > >> > > > > > > We will think of a way to better organize the discussion. >> > > > > > > >> > > > > > > Best >> > > > > > > Yuan >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov < >> > > je.kari...@gmail.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Hi, >> > > > > > > > >> > > > > > > > + 1 for the suggestion. >> > > > > > > > Maybe we can the discussion with the FLIPs with minimum >> > > > dependencies >> > > > > > > (from >> > > > > > > > the other new/proposed FLIPs). >> > > > > > > > Based on our discussion on a particular FLIP, the >> subsequent (or >> > > > its >> > > > > > > > dependent) FLIP(s) can be updated accordingly? >> > > > > > > > >> > > > > > > > Regards, >> > > > > > > > Jeyhun >> > > > > > > > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra < >> gyula.f...@gmail.com> >> > > > > > wrote: >> > > > > > > > >> > > > > > > > > Hey all! >> > > > > > > > > >> > > > > > > > > This is a massive improvement / work. I just started going >> > > > through >> > > > > > the >> > > > > > > > > Flips and have a more or less meta comment. >> > > > > > > > > >> > > > > > > > > While it's good to keep the overall architecture >> discussion >> > > > here, I >> > > > > > > think >> > > > > > > > > we should still have separate discussions for each FLIP >> where >> > > we >> > > > > can >> > > > > > > > > discuss interface details etc. With so much content if we >> start >> > > > > > adding >> > > > > > > > > minor comments here that will lead to nowhere but those >> > > > discussions >> > > > > > are >> > > > > > > > > still important and we should have them in separate >> threads >> > > (one >> > > > > for >> > > > > > > each >> > > > > > > > > FLIP) >> > > > > > > > > >> > > > > > > > > What do you think? >> > > > > > > > > Gyula >> > > > > > > > > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei < >> fredia...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi team, >> > > > > > > > > > >> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have >> > > > > > supplemented >> > > > > > > > > > several updates to answer high-frequency questions: >> > > > > > > > > > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap state >> backend in >> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1], which >> > > > reveals >> > > > > > that >> > > > > > > > > > the framework overhead (including reference counting, >> > > > > > future-related >> > > > > > > > > > code and so on) consumes about 9% of the keyed operator >> CPU >> > > > time. >> > > > > > > > > > 2. We added a set of comparative experiments for >> watermark >> > > > > > > processing, >> > > > > > > > > > the performance of Out-Of-Order mode is 70% better than >> > > > > > > > > > strictly-ordered mode under ~140MB state size. >> Instructions >> > > on >> > > > > how >> > > > > > to >> > > > > > > > > > run this test have also been added[2]. >> > > > > > > > > > 3. Regarding the order of StreamRecord, whether it has >> state >> > > > > access >> > > > > > > or >> > > > > > > > > > not. We supplemented a new *Strict order of >> > > > 'processElement'*[3]. >> > > > > > > > > > >> > > > > > > > > > [1] >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs >> > > > > > > > > > [2] >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing >> > > > > > > > > > [3] >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Best regards, >> > > > > > > > > > Yanfei >> > > > > > > > > > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 >> > > > 09:25写道: >> > > > > > > > > > > >> > > > > > > > > > > Hi Zakelly, >> > > > > > > > > > > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later >> since it >> > > is >> > > > > not >> > > > > > > > > > important. >> > > > > > > > > > > >> > > > > > > > > > > It seems that we still have some details to confirm >> about >> > > > this >> > > > > > > > > > > question. Let's postpone this to after the critical >> parts >> > > of >> > > > > the >> > > > > > > > > > > design are settled. >> > > > > > > > > > > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics should be like >> > > > > > afterwards. >> > > > > > > > > > > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing >> this >> > > > topic >> > > > > > in >> > > > > > > > > > milestone 2. >> > > > > > > > > > > >> > > > > > > > > > > Looking forward to the detailed design about the >> strict >> > > mode >> > > > > > > between >> > > > > > > > > > > same-key records and the benchmark results about the >> epoch >> > > > > > > mechanism. >> > > > > > > > > > > >> > > > > > > > > > > Best regards, >> > > > > > > > > > > Yunfeng >> > > > > > > > > > > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan < >> > > > > > zakelly....@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > Hi Yunfeng, >> > > > > > > > > > > > >> > > > > > > > > > > > For 1: >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize >> it is >> > > a >> > > > > > common >> > > > > > > > > case >> > > > > > > > > > the same-key record should be blocked before the >> > > > > `processElement`. >> > > > > > It >> > > > > > > > is >> > > > > > > > > > easier for users to understand. Thus I will introduce a >> > > strict >> > > > > mode >> > > > > > > for >> > > > > > > > > > this and make it default. My rough idea is just like >> yours, >> > > by >> > > > > > > invoking >> > > > > > > > > > some method of AEC instance before `processElement`. The >> > > > detailed >> > > > > > > > design >> > > > > > > > > > will be described in FLIP later. >> > > > > > > > > > > > >> > > > > > > > > > > > For 2: >> > > > > > > > > > > > I agree with you. We could throw exceptions for now >> and >> > > > > > optimize >> > > > > > > > this >> > > > > > > > > > later. >> > > > > > > > > > > > >> > > > > > > > > > > > For 5: >> > > > > > > > > > > >> >> > > > > > > > > > > >> It might be better to move the default values to >> the >> > > > > Proposed >> > > > > > > > > Changes >> > > > > > > > > > > >> section instead of making them public for now, as >> there >> > > > will >> > > > > > be >> > > > > > > > > > > >> compatibility issues once we want to dynamically >> adjust >> > > > the >> > > > > > > > > thresholds >> > > > > > > > > > > >> and timeouts in future. >> > > > > > > > > > > > >> > > > > > > > > > > > Agreed. The whole framework is under experiment >> until we >> > > > > think >> > > > > > it >> > > > > > > > is >> > > > > > > > > > complete in 2.0 or later. The default value should be >> better >> > > > > > > determined >> > > > > > > > > > with more testing results and production experience. >> > > > > > > > > > > > >> > > > > > > > > > > >> The configuration execution.async-state.enabled >> seems >> > > > > > > unnecessary, >> > > > > > > > > as >> > > > > > > > > > > >> the infrastructure may automatically get this >> > > information >> > > > > from >> > > > > > > the >> > > > > > > > > > > >> detailed state backend configurations. We may >> revisit >> > > this >> > > > > > part >> > > > > > > > > after >> > > > > > > > > > > >> the core designs have reached an agreement. WDYT? >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > I'm not very sure if there is any use case where >> users >> > > > write >> > > > > > > their >> > > > > > > > > > code using async APIs but run their job in a >> synchronous way. >> > > > The >> > > > > > > first >> > > > > > > > > two >> > > > > > > > > > scenarios that come to me are for benchmarking or for a >> small >> > > > > > state, >> > > > > > > > > while >> > > > > > > > > > they don't want to rewrite their code. Actually it is >> easy to >> > > > > > > support, >> > > > > > > > so >> > > > > > > > > > I'd suggest providing it. But I'm fine with revisiting >> this >> > > > later >> > > > > > > since >> > > > > > > > > it >> > > > > > > > > > is not important. WDYT? >> > > > > > > > > > > > >> > > > > > > > > > > > For 8: >> > > > > > > > > > > > Yes, we had considered the I/O metrics group >> especially >> > > the >> > > > > > > > > > back-pressure, idle and task busy per second. In the >> current >> > > > plan >> > > > > > we >> > > > > > > > can >> > > > > > > > > do >> > > > > > > > > > state access during back-pressure, meaning that those >> metrics >> > > > for >> > > > > > > input >> > > > > > > > > > would better be redefined. I suggest we discuss these >> > > existing >> > > > > > > metrics >> > > > > > > > as >> > > > > > > > > > well as some new metrics that should be introduced in >> > > FLIP-431 >> > > > > > later >> > > > > > > in >> > > > > > > > > > milestone 2, since we have basically finished the >> framework >> > > > thus >> > > > > we >> > > > > > > > will >> > > > > > > > > > have a better view of what metrics should be like >> afterwards. >> > > > > WDYT? >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Best, >> > > > > > > > > > > > Zakelly >> > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou < >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote: >> > > > > > > > > > > >> >> > > > > > > > > > > >> Hi Zakelly, >> > > > > > > > > > > >> >> > > > > > > > > > > >> Thanks for the responses! >> > > > > > > > > > > >> >> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL >> > > developers. >> > > > > ... >> > > > > > > mode >> > > > > > > > > > for StreamRecord processing. >> > > > > > > > > > > >> >> > > > > > > > > > > >> In DataStream API there should also be use cases >> when >> > > the >> > > > > > order >> > > > > > > of >> > > > > > > > > > > >> output is strictly required. I agree with it that >> SQL >> > > > > experts >> > > > > > > may >> > > > > > > > > help >> > > > > > > > > > > >> provide more concrete use cases that can >> accelerate our >> > > > > > > > discussion, >> > > > > > > > > > > >> but please allow me to search for DataStream use >> cases >> > > > that >> > > > > > can >> > > > > > > > > prove >> > > > > > > > > > > >> the necessity of this strict order preservation >> mode, if >> > > > > > answers >> > > > > > > > > from >> > > > > > > > > > > >> SQL experts are shown to be negative. >> > > > > > > > > > > >> >> > > > > > > > > > > >> For your convenience, my current rough idea is >> that we >> > > can >> > > > > > add a >> > > > > > > > > > > >> module between the Input(s) and processElement() >> module >> > > in >> > > > > > Fig 2 >> > > > > > > > of >> > > > > > > > > > > >> FLIP-425. The module will be responsible for >> caching >> > > > records >> > > > > > > whose >> > > > > > > > > > > >> keys collide with in-flight records, and AEC will >> only >> > > be >> > > > > > > > > responsible >> > > > > > > > > > > >> for handling async state calls, without knowing the >> > > record >> > > > > > each >> > > > > > > > call >> > > > > > > > > > > >> belongs to. We may revisit this topic once the >> necessity >> > > > of >> > > > > > the >> > > > > > > > > strict >> > > > > > > > > > > >> order mode is clarified. >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ... >> instead of >> > > > > > > invoking >> > > > > > > > > > yield >> > > > > > > > > > > >> >> > > > > > > > > > > >> Your suggestions generally appeal to me. I think >> we may >> > > > let >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for now, >> since >> > > the >> > > > > > > majority >> > > > > > > > > of >> > > > > > > > > > > >> a StateRequest should just be references to >> existing >> > > Java >> > > > > > > objects, >> > > > > > > > > > > >> which only occupies very small memory space and can >> > > hardly >> > > > > > cause >> > > > > > > > OOM >> > > > > > > > > > > >> in common cases. We can monitor the pending >> > > StateRequests >> > > > > and >> > > > > > if >> > > > > > > > > there >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we can >> throw >> > > > > > > Exceptions >> > > > > > > > > with >> > > > > > > > > > > >> proper messages notifying users what to do, like >> > > > increasing >> > > > > > > memory >> > > > > > > > > > > >> through configurations. >> > > > > > > > > > > >> >> > > > > > > > > > > >> Your suggestions to adjust threshold adaptively or >> to >> > > use >> > > > > the >> > > > > > > > > blocking >> > > > > > > > > > > >> buffer sounds good, and in my opinion we can >> postpone >> > > them >> > > > > to >> > > > > > > > future >> > > > > > > > > > > >> FLIPs since they seem to only benefit users in rare >> > > cases. >> > > > > > Given >> > > > > > > > > that >> > > > > > > > > > > >> FLIP-423~428 has already been a big enough design, >> it >> > > > might >> > > > > be >> > > > > > > > > better >> > > > > > > > > > > >> to focus on the most critical design for now and >> > > postpone >> > > > > > > > > > > >> optimizations like this. WDYT? >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as well as >> > > their >> > > > > > > default >> > > > > > > > > > value. >> > > > > > > > > > > >> >> > > > > > > > > > > >> Thanks for adding the default values and the values >> > > > > themselves >> > > > > > > > LGTM. >> > > > > > > > > > > >> It might be better to move the default values to >> the >> > > > > Proposed >> > > > > > > > > Changes >> > > > > > > > > > > >> section instead of making them public for now, as >> there >> > > > will >> > > > > > be >> > > > > > > > > > > >> compatibility issues once we want to dynamically >> adjust >> > > > the >> > > > > > > > > thresholds >> > > > > > > > > > > >> and timeouts in future. >> > > > > > > > > > > >> >> > > > > > > > > > > >> The configuration execution.async-state.enabled >> seems >> > > > > > > unnecessary, >> > > > > > > > > as >> > > > > > > > > > > >> the infrastructure may automatically get this >> > > information >> > > > > from >> > > > > > > the >> > > > > > > > > > > >> detailed state backend configurations. We may >> revisit >> > > this >> > > > > > part >> > > > > > > > > after >> > > > > > > > > > > >> the core designs have reached an agreement. WDYT? >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes >> to me >> > > > that >> > > > > > > > > > > >> >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that measure >> the >> > > > time >> > > > > a >> > > > > > > > Flink >> > > > > > > > > > > >> job is back-pressured by State IOs? Under the >> current >> > > > > design, >> > > > > > > this >> > > > > > > > > > > >> metric could measure the time when the blocking >> buffer >> > > is >> > > > > full >> > > > > > > and >> > > > > > > > > > > >> yield() cannot get callbacks to process, which >> means the >> > > > > > > operator >> > > > > > > > is >> > > > > > > > > > > >> fully waiting for state responses. >> > > > > > > > > > > >> >> > > > > > > > > > > >> Best regards, >> > > > > > > > > > > >> Yunfeng >> > > > > > > > > > > >> >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan < >> > > > > > > > zakelly....@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Hi Yunfeng, >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Thanks for your detailed comments! >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on >> StateIterator? >> > > > This >> > > > > > > > method >> > > > > > > > > > seems >> > > > > > > > > > > >> >> unused in the usage example codes. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > The `close()` is introduced to release internal >> > > > resources, >> > > > > > but >> > > > > > > > it >> > > > > > > > > > does not seem to require the user to call it. I removed >> this. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is >> > > stated >> > > > > that >> > > > > > > "No >> > > > > > > > > > null >> > > > > > > > > > > >> >> entries are allowed". It might be better to >> further >> > > > > explain >> > > > > > > > what >> > > > > > > > > > will >> > > > > > > > > > > >> >> happen if a null value is passed, ignoring the >> value >> > > in >> > > > > the >> > > > > > > > > > returned >> > > > > > > > > > > >> >> Collection or throwing exceptions. Given that >> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the >> > > > example >> > > > > > > code, >> > > > > > > > I >> > > > > > > > > > > >> >> suppose the former one might be correct. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > The statement "No null entries are allowed" >> refers to >> > > > the >> > > > > > > > > > parameters, it means some arrayList like [null, >> StateFuture1, >> > > > > > > > > StateFuture2] >> > > > > > > > > > passed in are not allowed, and an Exception will be >> thrown. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This >> > > > situation >> > > > > > > should >> > > > > > > > > be >> > > > > > > > > > > >> >> avoided and the order of same-key records >> should be >> > > > > > strictly >> > > > > > > > > > > >> >> preserved. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > I will discuss this with some expert SQL >> developers. >> > > And >> > > > > if >> > > > > > it >> > > > > > > > is >> > > > > > > > > > valid and common, I suggest a strict order preservation >> mode >> > > > for >> > > > > > > > > > StreamRecord processing. WDYT? >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by >> > > > > Callbacks >> > > > > > > will >> > > > > > > > > not >> > > > > > > > > > > >> >> invoke further yield() methods. Given that >> yield() is >> > > > > used >> > > > > > > when >> > > > > > > > > > there >> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean >> > > > StateRequests >> > > > > > > > > submitted >> > > > > > > > > > by >> > > > > > > > > > > >> >> Callbacks will never be "too much"? What if the >> total >> > > > > > number >> > > > > > > of >> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink >> operator's >> > > > > > memory >> > > > > > > > > space? >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > The amount of parallel StateRequests for one >> > > > StreamRecord >> > > > > > > cannot >> > > > > > > > > be >> > > > > > > > > > determined since the code is written by users. So the >> > > in-flight >> > > > > > > > requests >> > > > > > > > > > may be "too much", and may cause OOM. Users should >> > > re-configure >> > > > > > their >> > > > > > > > > job, >> > > > > > > > > > controlling the amount of on-going StreamRecord. And I >> > > suggest >> > > > > two >> > > > > > > ways >> > > > > > > > > to >> > > > > > > > > > avoid this: >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Adaptively adjust the count of on-going >> StreamRecord >> > > > > > according >> > > > > > > > to >> > > > > > > > > > historical StateRequests amount. >> > > > > > > > > > > >> > Also control the max StateRequests that can be >> > > executed >> > > > in >> > > > > > > > > parallel >> > > > > > > > > > for each StreamRecord, and if it exceeds, put the new >> > > > > StateRequest >> > > > > > in >> > > > > > > > the >> > > > > > > > > > blocking buffer waiting for execution (instead of >> invoking >> > > > > > yield()). >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > WDYT? >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order >> execution >> > > mode, >> > > > > > along >> > > > > > > > > with >> > > > > > > > > > the >> > > > > > > > > > > >> >> epoch mechanism, would bring more complexity to >> the >> > > > > > execution >> > > > > > > > > model >> > > > > > > > > > > >> >> than the performance improvement it promises. >> Could >> > > we >> > > > > add >> > > > > > > some >> > > > > > > > > > > >> >> benchmark results proving the benefit of this >> mode? >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Agreed, will do. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API >> section >> > > > > > > describing >> > > > > > > > > how >> > > > > > > > > > > >> >> users or developers can switch between these two >> > > > > execution >> > > > > > > > modes. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Good point. We will add a Public API section. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint >> mentioned >> > > > in >> > > > > > this >> > > > > > > > > FLIP, >> > > > > > > > > > > >> >> there are also more other events that might >> appear in >> > > > the >> > > > > > > > stream >> > > > > > > > > of >> > > > > > > > > > > >> >> data records. It might be better to generalize >> the >> > > > > > execution >> > > > > > > > mode >> > > > > > > > > > > >> >> mechanism to handle all possible events. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the >> reminder. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 4. It might be better to treat >> callback-handling as a >> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to >> avoid the >> > > > > > overhead >> > > > > > > > of >> > > > > > > > > > > >> >> repeatedly creating Mail objects. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > I thought the intermediated wrapper for >> callback can >> > > > not >> > > > > be >> > > > > > > > > > omitted, since there will be some context switch before >> each >> > > > > > > execution. >> > > > > > > > > The >> > > > > > > > > > MailboxDefaultAction in most cases is processInput >> right? >> > > While >> > > > > the >> > > > > > > > > > callback should be executed with higher priority. I'd >> suggest >> > > > not >> > > > > > > > > changing >> > > > > > > > > > the basic logic of Mailbox and the default action since >> it is >> > > > > very >> > > > > > > > > critical >> > > > > > > > > > for performance. But yes, we will try our best to avoid >> > > > creating >> > > > > > > > > > intermediated objects. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current default >> values >> > > > for >> > > > > > > > things >> > > > > > > > > > like >> > > > > > > > > > > >> >> active buffer size thresholds and timeouts? >> These >> > > could >> > > > > > help >> > > > > > > > with >> > > > > > > > > > > >> >> memory consumption and latency analysis. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Sure, we will introduce new configs as well as >> their >> > > > > default >> > > > > > > > > value. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a >> record >> > > in >> > > > > its >> > > > > > > > > > > >> >> RecordContext? It seems not used. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > The context switch before each callback execution >> > > > involves >> > > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We >> cache >> > > it >> > > > > for >> > > > > > > > > > accelerating. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or >> > > > RocksDB", >> > > > > > the >> > > > > > > > link >> > > > > > > > > > > >> >> points to a document in flink-1.15. It might be >> > > better >> > > > to >> > > > > > > > verify >> > > > > > > > > > the >> > > > > > > > > > > >> >> referenced content is still valid in the latest >> Flink >> > > > and >> > > > > > > > update >> > > > > > > > > > the >> > > > > > > > > > > >> >> link accordingly. Same for other references if >> any. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Thanks for the reminder! Will check. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Thanks a lot & Best, >> > > > > > > > > > > >> > Zakelly >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov < >> > > > > > > > > je.kari...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> Hi, >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> Thanks for the great proposals. I have a few >> comments >> > > > > > > comments: >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original >> > > backpressure >> > > > > > > handling >> > > > > > > > > is >> > > > > > > > > > quite >> > > > > > > > > > > >> >> robust and the semantics is quite "simple" >> (simple is >> > > > > > > > beautiful). >> > > > > > > > > > > >> >> This mechanism has proven to perform >> better/robust >> > > than >> > > > > the >> > > > > > > > other >> > > > > > > > > > open >> > > > > > > > > > > >> >> source streaming systems, where they were >> relying on >> > > > some >> > > > > > > > > loopback >> > > > > > > > > > > >> >> information. >> > > > > > > > > > > >> >> Now that the proposal also relies on loopback >> (yield >> > > in >> > > > > > this >> > > > > > > > > > case), it is >> > > > > > > > > > > >> >> not clear how well the new backpressure handling >> > > > proposed >> > > > > > in >> > > > > > > > > > FLIP-425 is >> > > > > > > > > > > >> >> robust and handle fluctuating workloads. >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments >> apply >> > > for >> > > > > > > > watermark >> > > > > > > > > > and timer >> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing >> the >> > > > > > overhead >> > > > > > > > > > > >> >> of epoch management with different parameters >> (e.g., >> > > > > window >> > > > > > > > size, >> > > > > > > > > > watermark >> > > > > > > > > > > >> >> strategy, etc) >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in >> > > FLIP-427 >> > > > is >> > > > > > > > > > DFS-agnostic. >> > > > > > > > > > > >> >> However, different cloud providers have >> different >> > > > storage >> > > > > > > > > > consistency >> > > > > > > > > > > >> >> models. >> > > > > > > > > > > >> >> How do we want to deal with them? >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> Regards, >> > > > > > > > > > > >> >> Jeyhun >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan < >> > > > > > > > > zakelly....@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > >> >> >> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts! >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > I guess it depends how we would like to treat >> the >> > > > local >> > > > > > > > disks. >> > > > > > > > > > I've always >> > > > > > > > > > > >> >> > > thought about them that almost always >> eventually >> > > > all >> > > > > > > state >> > > > > > > > > > from the DFS >> > > > > > > > > > > >> >> > > should end up cached in the local disks. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat local >> disk as >> > > > an >> > > > > > > > optional >> > > > > > > > > > cache, so >> > > > > > > > > > > >> >> > the basic design will handle the case with >> state >> > > > > residing >> > > > > > > in >> > > > > > > > > DFS >> > > > > > > > > > only. It >> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that does >> not >> > > rely >> > > > on >> > > > > > any >> > > > > > > > > > local storage >> > > > > > > > > > > >> >> > assumptions, which allow users to dynamically >> > > adjust >> > > > > the >> > > > > > > > > > capacity or I/O >> > > > > > > > > > > >> >> > bound of remote storage to gain performance >> or save >> > > > the >> > > > > > > cost, >> > > > > > > > > > even without >> > > > > > > > > > > >> >> > a job restart. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > In >> > > > > > > > > > > >> >> > > the currently proposed more fine grained >> > > solution, >> > > > > you >> > > > > > > > make a >> > > > > > > > > > single >> > > > > > > > > > > >> >> > > request to DFS per each state access. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the >> > > state >> > > > > > > requests >> > > > > > > > > > and process >> > > > > > > > > > > >> >> > them in batch, multiple requests will >> correspond to >> > > > one >> > > > > > DFS >> > > > > > > > > > access (One >> > > > > > > > > > > >> >> > block access for multiple keys performed by >> > > RocksDB). >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you >> requesting >> > > > the >> > > > > > > state >> > > > > > > > > > > >> >> > > asynchronously from local disks into >> memory? If >> > > the >> > > > > > > benefit >> > > > > > > > > > comes from >> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the >> benefit to >> > > > > > > > > > disappear/shrink when >> > > > > > > > > > > >> >> > > running multiple subtasks on the same >> machine, as >> > > > > they >> > > > > > > > would >> > > > > > > > > > be making >> > > > > > > > > > > >> >> > > their own parallel requests, right? Also >> enabling >> > > > > > > > > > checkpointing would >> > > > > > > > > > > >> >> > > further cut into the available I/O budget. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > That's an interesting topic. Our proposal is >> > > > > specifically >> > > > > > > > aimed >> > > > > > > > > > at the >> > > > > > > > > > > >> >> > scenario where the machine I/O is not fully >> loaded >> > > > but >> > > > > > the >> > > > > > > > I/O >> > > > > > > > > > latency has >> > > > > > > > > > > >> >> > indeed become a bottleneck for each subtask. >> While >> > > > the >> > > > > > > > > > distributed file >> > > > > > > > > > > >> >> > system is a prime example of a scenario >> > > characterized >> > > > > by >> > > > > > > > > > abundant and >> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with >> higher >> > > I/O >> > > > > > > > latency. >> > > > > > > > > > You may >> > > > > > > > > > > >> >> > expect to increase the parallelism of a job to >> > > > enhance >> > > > > > the >> > > > > > > > > > performance as >> > > > > > > > > > > >> >> > well, but that also brings in more waste of >> CPU's >> > > and >> > > > > > > memory >> > > > > > > > > for >> > > > > > > > > > building >> > > > > > > > > > > >> >> > up more subtasks. This is one drawback for the >> > > > > > > > > > computation-storage tightly >> > > > > > > > > > > >> >> > coupled nodes. While in our proposal, the >> parallel >> > > > I/O >> > > > > > with >> > > > > > > > all >> > > > > > > > > > the >> > > > > > > > > > > >> >> > callbacks still running in one task, >> pre-allocated >> > > > > > > > > computational >> > > > > > > > > > resources >> > > > > > > > > > > >> >> > are better utilized. It is a much more >> lightweight >> > > > way >> > > > > to >> > > > > > > > > > perform parallel >> > > > > > > > > > > >> >> > I/O. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Just with what granularity those async >> requests >> > > > should >> > > > > be >> > > > > > > > made. >> > > > > > > > > > > >> >> > > Making state access asynchronous is >> definitely >> > > the >> > > > > > right >> > > > > > > > way >> > > > > > > > > > to go! >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > I think the current proposal is based on such >> core >> > > > > ideas: >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > - A pure cloud-native disaggregated state. >> > > > > > > > > > > >> >> > - Fully utilize the given resources and >> try not >> > > to >> > > > > > waste >> > > > > > > > > them >> > > > > > > > > > (including >> > > > > > > > > > > >> >> > I/O). >> > > > > > > > > > > >> >> > - The ability to scale isolated resources >> (I/O >> > > or >> > > > > CPU >> > > > > > or >> > > > > > > > > > memory) >> > > > > > > > > > > >> >> > independently. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > We think a fine-grained granularity is more >> inline >> > > > with >> > > > > > > those >> > > > > > > > > > ideas, >> > > > > > > > > > > >> >> > especially without local disk assumptions and >> > > without >> > > > > any >> > > > > > > > waste >> > > > > > > > > > of I/O by >> > > > > > > > > > > >> >> > prefetching. Please note that it is not a >> > > replacement >> > > > > of >> > > > > > > the >> > > > > > > > > > original local >> > > > > > > > > > > >> >> > state with synchronous execution. Instead >> this is a >> > > > > > > solution >> > > > > > > > > > embracing the >> > > > > > > > > > > >> >> > cloud-native era, providing much more >> scalability >> > > and >> > > > > > > > resource >> > > > > > > > > > efficiency >> > > > > > > > > > > >> >> > when handling a *huge state*. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > What also worries me a lot in this fine >> grained >> > > model >> > > > > is >> > > > > > > the >> > > > > > > > > > effect on the >> > > > > > > > > > > >> >> > > checkpointing times. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster >> > > > checkpointing >> > > > > > is >> > > > > > > > > > always a core >> > > > > > > > > > > >> >> > advantage of disaggregated state, but only >> for the >> > > > > async >> > > > > > > > phase. >> > > > > > > > > > There will >> > > > > > > > > > > >> >> > be some complexity introduced by in-flight >> > > requests, >> > > > > but >> > > > > > > I'd >> > > > > > > > > > suggest a >> > > > > > > > > > > >> >> > checkpoint containing those in-flight state >> > > requests >> > > > as >> > > > > > > part >> > > > > > > > of >> > > > > > > > > > the state, >> > > > > > > > > > > >> >> > to accelerate the sync phase by skipping the >> buffer >> > > > > > > draining. >> > > > > > > > > > This makes >> > > > > > > > > > > >> >> > the buffer size have little impact on >> checkpoint >> > > > time. >> > > > > > And >> > > > > > > > all >> > > > > > > > > > the changes >> > > > > > > > > > > >> >> > keep within the execution model we proposed >> while >> > > the >> > > > > > > > > checkpoint >> > > > > > > > > > barrier >> > > > > > > > > > > >> >> > alignment or handling will not be touched in >> our >> > > > > > proposal, >> > > > > > > > so I >> > > > > > > > > > guess >> > > > > > > > > > > >> >> > the complexity is relatively controllable. I >> have >> > > > faith >> > > > > > in >> > > > > > > > that >> > > > > > > > > > :) >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Also regarding the overheads, it would be >> great if >> > > > you >> > > > > > > could >> > > > > > > > > > provide >> > > > > > > > > > > >> >> > > profiling results of the benchmarks that you >> > > > > conducted >> > > > > > to >> > > > > > > > > > verify the >> > > > > > > > > > > >> >> > > results. Or maybe if you could describe the >> steps >> > > > to >> > > > > > > > > reproduce >> > > > > > > > > > the >> > > > > > > > > > > >> >> > results? >> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with >> > > async >> > > > > > API". >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for >> the >> > > > > > comparison >> > > > > > > > of >> > > > > > > > > > "Hashmap >> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we >> conduct a >> > > > > > Wordcount >> > > > > > > > job >> > > > > > > > > > written >> > > > > > > > > > > >> >> > with async APIs but disabling the async >> execution >> > > by >> > > > > > > directly >> > > > > > > > > > completing >> > > > > > > > > > > >> >> > the future using sync state access. This >> evaluates >> > > > the >> > > > > > > > overhead >> > > > > > > > > > of newly >> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync >> execution >> > > (even >> > > > > > > though >> > > > > > > > > > they are not >> > > > > > > > > > > >> >> > designed for it). The code will be provided >> later. >> > > > For >> > > > > > > other >> > > > > > > > > > results of our >> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions >> here[2] to >> > > > > > > reproduce. >> > > > > > > > > > Since the >> > > > > > > > > > > >> >> > compilation may take some effort, we will >> directly >> > > > > > provide >> > > > > > > > the >> > > > > > > > > > jar for >> > > > > > > > > > > >> >> > testing next week. >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail >> but it >> > > > is a >> > > > > > bit >> > > > > > > > > late >> > > > > > > > > > in my >> > > > > > > > > > > >> >> > local time and the next few days are >> weekends. So I >> > > > > will >> > > > > > > > reply >> > > > > > > > > > to you >> > > > > > > > > > > >> >> > later. Thanks for your response! >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > [1] >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults >> > > > > > > > > > > >> >> > [2] >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > Best, >> > > > > > > > > > > >> >> > Zakelly >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou < >> > > > > > > > > > flink.zhouyunf...@gmail.com> >> > > > > > > > > > > >> >> > wrote: >> > > > > > > > > > > >> >> > >> > > > > > > > > > > >> >> > > Hi, >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > Thanks for proposing this design! I just >> read >> > > > > FLIP-424 >> > > > > > > and >> > > > > > > > > > FLIP-425 >> > > > > > > > > > > >> >> > > and have some questions about the proposed >> > > changes. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > For Async API (FLIP-424) >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on >> > > > StateIterator? >> > > > > > This >> > > > > > > > > > method seems >> > > > > > > > > > > >> >> > > unused in the usage example codes. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, >> it is >> > > > > stated >> > > > > > > that >> > > > > > > > > > "No null >> > > > > > > > > > > >> >> > > entries are allowed". It might be better to >> > > further >> > > > > > > explain >> > > > > > > > > > what will >> > > > > > > > > > > >> >> > > happen if a null value is passed, ignoring >> the >> > > > value >> > > > > in >> > > > > > > the >> > > > > > > > > > returned >> > > > > > > > > > > >> >> > > Collection or throwing exceptions. Given >> that >> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned >> in the >> > > > > > example >> > > > > > > > > code, >> > > > > > > > > > I >> > > > > > > > > > > >> >> > > suppose the former one might be correct. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425) >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a >> recordB >> > > > has >> > > > > > its >> > > > > > > > key >> > > > > > > > > > collide >> > > > > > > > > > > >> >> > > with an ongoing recordA, its >> processElement() >> > > > method >> > > > > > can >> > > > > > > > > still >> > > > > > > > > > be >> > > > > > > > > > > >> >> > > triggered immediately, and then it might be >> moved >> > > > to >> > > > > > the >> > > > > > > > > > blocking >> > > > > > > > > > > >> >> > > buffer in AEC if it involves state >> operations. >> > > This >> > > > > > means >> > > > > > > > > that >> > > > > > > > > > > >> >> > > recordB's output will precede recordA's >> output in >> > > > > > > > downstream >> > > > > > > > > > > >> >> > > operators, if recordA involves state >> operations >> > > > while >> > > > > > > > recordB >> > > > > > > > > > does >> > > > > > > > > > > >> >> > > not. This will harm the correctness of >> Flink jobs >> > > > in >> > > > > > some >> > > > > > > > use >> > > > > > > > > > cases. >> > > > > > > > > > > >> >> > > For example, in dim table join cases, >> recordA >> > > could >> > > > > be >> > > > > > a >> > > > > > > > > delete >> > > > > > > > > > > >> >> > > operation that involves state access, while >> > > recordB >> > > > > > could >> > > > > > > > be >> > > > > > > > > > an insert >> > > > > > > > > > > >> >> > > operation that needs to visit external >> storage >> > > > > without >> > > > > > > > state >> > > > > > > > > > access. >> > > > > > > > > > > >> >> > > If recordB's output precedes recordA's, >> then an >> > > > entry >> > > > > > > that >> > > > > > > > is >> > > > > > > > > > supposed >> > > > > > > > > > > >> >> > > to finally exist with recordB's value in >> the sink >> > > > > table >> > > > > > > > might >> > > > > > > > > > actually >> > > > > > > > > > > >> >> > > be deleted according to recordA's command. >> This >> > > > > > situation >> > > > > > > > > > should be >> > > > > > > > > > > >> >> > > avoided and the order of same-key records >> should >> > > be >> > > > > > > > strictly >> > > > > > > > > > > >> >> > > preserved. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests >> submitted by >> > > > > > > Callbacks >> > > > > > > > > > will not >> > > > > > > > > > > >> >> > > invoke further yield() methods. Given that >> > > yield() >> > > > is >> > > > > > > used >> > > > > > > > > > when there >> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean >> > > > > > StateRequests >> > > > > > > > > > submitted by >> > > > > > > > > > > >> >> > > Callbacks will never be "too much"? What if >> the >> > > > total >> > > > > > > > number >> > > > > > > > > of >> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink >> > > > operator's >> > > > > > > > memory >> > > > > > > > > > space? >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP >> provided >> > > > an >> > > > > > > > > > out-of-order >> > > > > > > > > > > >> >> > > execution mode apart from the default >> > > > > strictly-ordered >> > > > > > > > mode, >> > > > > > > > > > which can >> > > > > > > > > > > >> >> > > optimize performance by allowing more >> concurrent >> > > > > > > > executions. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order >> execution >> > > > > mode, >> > > > > > > > along >> > > > > > > > > > with the >> > > > > > > > > > > >> >> > > epoch mechanism, would bring more >> complexity to >> > > the >> > > > > > > > execution >> > > > > > > > > > model >> > > > > > > > > > > >> >> > > than the performance improvement it >> promises. >> > > Could >> > > > > we >> > > > > > > add >> > > > > > > > > some >> > > > > > > > > > > >> >> > > benchmark results proving the benefit of >> this >> > > mode? >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API >> > > section >> > > > > > > > > describing >> > > > > > > > > > how >> > > > > > > > > > > >> >> > > users or developers can switch between >> these two >> > > > > > > execution >> > > > > > > > > > modes. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint >> > > > mentioned >> > > > > > in >> > > > > > > > this >> > > > > > > > > > FLIP, >> > > > > > > > > > > >> >> > > there are also more other events that might >> > > appear >> > > > in >> > > > > > the >> > > > > > > > > > stream of >> > > > > > > > > > > >> >> > > data records. It might be better to >> generalize >> > > the >> > > > > > > > execution >> > > > > > > > > > mode >> > > > > > > > > > > >> >> > > mechanism to handle all possible events. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 4. It might be better to treat >> callback-handling >> > > > as a >> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to >> avoid >> > > > the >> > > > > > > > overhead >> > > > > > > > > > of >> > > > > > > > > > > >> >> > > repeatedly creating Mail objects. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current >> default >> > > > values >> > > > > > for >> > > > > > > > > > things like >> > > > > > > > > > > >> >> > > active buffer size thresholds and timeouts? >> These >> > > > > could >> > > > > > > > help >> > > > > > > > > > with >> > > > > > > > > > > >> >> > > memory consumption and latency analysis. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of >> a >> > > > record >> > > > > in >> > > > > > > its >> > > > > > > > > > > >> >> > > RecordContext? It seems not used. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap >> or >> > > > > > RocksDB", >> > > > > > > > the >> > > > > > > > > > link >> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It >> might be >> > > > > better >> > > > > > to >> > > > > > > > > > verify the >> > > > > > > > > > > >> >> > > referenced content is still valid in the >> latest >> > > > Flink >> > > > > > and >> > > > > > > > > > update the >> > > > > > > > > > > >> >> > > link accordingly. Same for other references >> if >> > > any. >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > Best, >> > > > > > > > > > > >> >> > > Yunfeng Zhou >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei < >> > > > > > > > > > yuanmei.w...@gmail.com> wrote: >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > Hi Devs, >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly >> Lan, >> > > > > > Jinzhong >> > > > > > > > Li, >> > > > > > > > > > Hangxiang >> > > > > > > > > > > >> >> > Yu, >> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to >> start a >> > > > > > > discussion >> > > > > > > > > > about >> > > > > > > > > > > >> >> > > introducing >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and >> Management in >> > > > Flink >> > > > > > > 2.0. >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic >> shift >> > > in >> > > > > > > Flink's >> > > > > > > > > > deployment >> > > > > > > > > > > >> >> > > mode, >> > > > > > > > > > > >> >> > > > workload patterns, and hardware >> improvements. >> > > > We've >> > > > > > > moved >> > > > > > > > > > from the >> > > > > > > > > > > >> >> > > > map-reduce era where workers are >> > > > > computation-storage >> > > > > > > > > tightly >> > > > > > > > > > coupled >> > > > > > > > > > > >> >> > > nodes >> > > > > > > > > > > >> >> > > > to a cloud-native world where >> containerized >> > > > > > deployments >> > > > > > > > on >> > > > > > > > > > Kubernetes >> > > > > > > > > > > >> >> > > > become standard. To enable Flink's >> Cloud-Native >> > > > > > future, >> > > > > > > > we >> > > > > > > > > > introduce >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and >> Management that >> > > > > uses >> > > > > > > DFS >> > > > > > > > as >> > > > > > > > > > primary >> > > > > > > > > > > >> >> > > storage >> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 >> > > > Roadmap. >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > Design Details can be found in >> FLIP-423[1]. >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > This new architecture is aimed to solve >> the >> > > > > following >> > > > > > > > > > challenges >> > > > > > > > > > > >> >> > brought >> > > > > > > > > > > >> >> > > in >> > > > > > > > > > > >> >> > > > the cloud-native era for Flink. >> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in >> containerization >> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by >> compaction in >> > > > the >> > > > > > > > current >> > > > > > > > > > state model >> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large >> states >> > > > > > (hundreds >> > > > > > > of >> > > > > > > > > > Terabytes) >> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native >> way >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > More specifically, we want to reach a >> consensus >> > > > on >> > > > > > the >> > > > > > > > > > following issues >> > > > > > > > > > > >> >> > > in >> > > > > > > > > > > >> >> > > > this discussion: >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > 1. Overall design >> > > > > > > > > > > >> >> > > > 2. Proposed Changes >> > > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1 >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end >> baseline >> > > > > > version >> > > > > > > > > > using DFS as >> > > > > > > > > > > >> >> > > > primary storage and complete core >> > > > functionalities, >> > > > > > > > > including: >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: >> > > > Introduce >> > > > > > new >> > > > > > > > APIs >> > > > > > > > > > for >> > > > > > > > > > > >> >> > > > asynchronous state access. >> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model >> (FLIP-425)[3]: >> > > > > > > Implement a >> > > > > > > > > > non-blocking >> > > > > > > > > > > >> >> > > > execution model leveraging the >> asynchronous >> > > APIs >> > > > > > > > introduced >> > > > > > > > > > in >> > > > > > > > > > > >> >> > FLIP-424. >> > > > > > > > > > > >> >> > > > - Grouping Remote State Access >> (FLIP-426)[4]: >> > > > > Enable >> > > > > > > > > > retrieval of >> > > > > > > > > > > >> >> > remote >> > > > > > > > > > > >> >> > > > state data in batches to avoid unnecessary >> > > > > round-trip >> > > > > > > > costs >> > > > > > > > > > for remote >> > > > > > > > > > > >> >> > > > access >> > > > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: >> > > > > Introduce >> > > > > > > the >> > > > > > > > > > initial >> > > > > > > > > > > >> >> > version >> > > > > > > > > > > >> >> > > of >> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store. >> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration >> > > > > (FLIP-428)[6]: >> > > > > > > > > > Integrate >> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the >> disaggregated >> > > > > state >> > > > > > > > store >> > > > > > > > > > for fault >> > > > > > > > > > > >> >> > > > tolerance and fast rescaling. >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate >> threads >> > > to >> > > > > make >> > > > > > > > sure >> > > > > > > > > > each FLIP >> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep >> the >> > > > > > discussion >> > > > > > > > > > within a >> > > > > > > > > > > >> >> > focused >> > > > > > > > > > > >> >> > > > thread (this thread) for easier tracking >> of >> > > > > contexts >> > > > > > to >> > > > > > > > > avoid >> > > > > > > > > > > >> >> > duplicated >> > > > > > > > > > > >> >> > > > questions/discussions and also to think >> of the >> > > > > > > > > > problem/solution in a >> > > > > > > > > > > >> >> > full >> > > > > > > > > > > >> >> > > > picture. >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > Looking forward to your feedback >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > Best, >> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, >> Yanfei and >> > > > Feng >> > > > > > > > > > > >> >> > > > >> > > > > > > > > > > >> >> > > > [1] >> > > https://cwiki.apache.org/confluence/x/R4p3EQ >> > > > > > > > > > > >> >> > > > [2] >> > > https://cwiki.apache.org/confluence/x/SYp3EQ >> > > > > > > > > > > >> >> > > > [3] >> > > https://cwiki.apache.org/confluence/x/S4p3EQ >> > > > > > > > > > > >> >> > > > [4] >> > > https://cwiki.apache.org/confluence/x/TYp3EQ >> > > > > > > > > > > >> >> > > > [5] >> > > https://cwiki.apache.org/confluence/x/T4p3EQ >> > > > > > > > > > > >> >> > > > [6] >> > > https://cwiki.apache.org/confluence/x/UYp3EQ >> > > > > > > > > > > >> >> > > >> > > > > > > > > > > >> >> > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >