Hi Yunfeng, Thanks for the suggestion!
I will reorganize the FLIP-425 accordingly. Best, Zakelly On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> wrote: > Hi Xintong and Zakelly, > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks > I agree with it that watermarks can use only out-of-order mode for > now, because there is still not a concrete example showing the > correctness risk about it. However, the strictly-ordered mode should > still be supported as the default option for non-record event types > other than watermark, at least for checkpoint barriers. > > I noticed that this information has already been documented in "For > other non-record events, such as RecordAttributes ...", but it's at > the bottom of the "Watermark" section, which might not be very > obvious. Thus it might be better to reorganize the FLIP to better > claim that the two order modes are designed for all non-record events, > and which mode this FLIP would choose for each type of event. > > Best, > Yunfeng > > On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com> > wrote: > > > > Thanks for the quick response. Sounds good to me. > > > > Best, > > > > Xintong > > > > > > > > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com> > wrote: > > > > > Hi Xintong, > > > > > > Thanks for sharing your thoughts! > > > > > > 1. Regarding Record-ordered and State-ordered of processElement. > > > > > > > > I understand that while State-ordered likely provides better > performance, > > > > Record-ordered is sometimes required for correctness. The question > is how > > > > should a user choose between these two modes? My concern is that > such a > > > > decision may require users to have in-depth knowledge about the Flink > > > > internals, and may lead to correctness issues if State-ordered is > chosen > > > > improperly. > > > > > > > > I'd suggest not to expose such a knob, at least in the first version. > > > That > > > > means always use Record-ordered for custom operators / UDFs, and keep > > > > State-ordered for internal usages (built-in operators) only. > > > > > > > > > > Indeed, users may not be able to choose the mode properly. I agree to > keep > > > such options for internal use. > > > > > > > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. > > > > > > > > I'm not entirely sure about Strictly-ordered being the default, or > even > > > > being supported. From my understanding, a Watermark(T) only suggests > that > > > > all records with event time before T has arrived, and it has nothing > to > > > do > > > > with whether records with event time after T has arrived or not. From > > > that > > > > perspective, preventing certain records from arriving before a > Watermark > > > is > > > > never supported. I also cannot come up with any use case where > > > > Strictly-ordered is necessary. This implies the same issue as 1): how > > > does > > > > the user choose between the two modes? > > > > > > > > I'd suggest not expose the knob to users and only support > Out-of-order, > > > > until we see a concrete use case that Strictly-ordered is needed. > > > > > > > > > > The semantics of watermarks do not define the sequence between a > watermark > > > and subsequent records. For the most part, this is inconsequential, > except > > > it may affect some current users who have previously relied on the > implicit > > > assumption of an ordered execution. I'd be fine with initially > supporting > > > only out-of-order processing. We may consider exposing the > > > 'Strictly-ordered' mode once we encounter a concrete use case that > > > necessitates it. > > > > > > > > > My philosophies behind not exposing the two config options are: > > > > - There are already too many options in Flink that barely know how > to use > > > > them. I think Flink should try as much as possible to decide its own > > > > behavior, rather than throwing all the decisions to the users. > > > > - It's much harder to take back knobs than to introduce them. > Therefore, > > > > options should be introduced only if concrete use cases are > identified. > > > > > > > > > > I agree to keep minimal configurable items especially for the MVP. > Given > > > that we have the opportunity to refine the functionality before the > > > framework transitions from @Experimental to @PublicEvolving, it makes > sense > > > to refrain from presenting user-facing options until we have ensured > > > their necessity. > > > > > > > > > Best, > > > Zakelly > > > > > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com> > > > wrote: > > > > > > > Sorry for joining the discussion late. > > > > > > > > I have two questions about FLIP-425. > > > > > > > > 1. Regarding Record-ordered and State-ordered of processElement. > > > > > > > > I understand that while State-ordered likely provides better > performance, > > > > Record-ordered is sometimes required for correctness. The question > is how > > > > should a user choose between these two modes? My concern is that > such a > > > > decision may require users to have in-depth knowledge about the Flink > > > > internals, and may lead to correctness issues if State-ordered is > chosen > > > > improperly. > > > > > > > > I'd suggest not to expose such a knob, at least in the first version. > > > That > > > > means always use Record-ordered for custom operators / UDFs, and keep > > > > State-ordered for internal usages (built-in operators) only. > > > > > > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. > > > > > > > > I'm not entirely sure about Strictly-ordered being the default, or > even > > > > being supported. From my understanding, a Watermark(T) only suggests > that > > > > all records with event time before T has arrived, and it has nothing > to > > > do > > > > with whether records with event time after T has arrived or not. From > > > that > > > > perspective, preventing certain records from arriving before a > Watermark > > > is > > > > never supported. I also cannot come up with any use case where > > > > Strictly-ordered is necessary. This implies the same issue as 1): how > > > does > > > > the user choose between the two modes? > > > > > > > > I'd suggest not expose the knob to users and only support > Out-of-order, > > > > until we see a concrete use case that Strictly-ordered is needed. > > > > > > > > > > > > My philosophies behind not exposing the two config options are: > > > > - There are already too many options in Flink that barely know how > to use > > > > them. I think Flink should try as much as possible to decide its own > > > > behavior, rather than throwing all the decisions to the users. > > > > - It's much harder to take back knobs than to introduce them. > Therefore, > > > > options should be introduced only if concrete use cases are > identified. > > > > > > > > WDYT? > > > > > > > > Best, > > > > > > > > Xintong > > > > > > > > > > > > > > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid> > > > wrote: > > > > > > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which > introduced > > > the > > > > > intention of the big change and high level architecture. Great > content > > > > btw! > > > > > The only public interface change for this FLIP is one new config > to use > > > > > ForSt. It makes sense to have one dedicated discussion thread for > each > > > > > concrete system design. > > > > > > > > > > @Zakelly The links in your mail do not work except the last one, > > > because > > > > > the FLIP-xxx has been included into the url like > > > > > > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > > > . > > > > > > > > > > NIT fix: > > > > > > > > > > FLIP-424: > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > > > > > > > > > FLIP-425: > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > > > > > > > > > FLIP-426: > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > > > > > > > > > FLIP-427: > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > > > > > > > > > FLIP-428: > > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > > > > > > > > > Best regards, > > > > > Jing > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <zakelly....@gmail.com > > > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > Thank you all for a lively discussion here, and it is a good > time to > > > > move > > > > > > forward to more detailed discussions. Thus we open several > threads > > > for > > > > > > sub-FLIPs: > > > > > > > > > > > > FLIP-424: > > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > > > > > FLIP-425 > > > > > > < > > > > > > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > > > >: > > > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > > > > > FLIP-426 > > > > > > < > > > > > > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426 > > > > >: > > > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > > > > > FLIP-427 > > > > > > < > > > > > > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427 > > > > >: > > > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > > > > > FLIP-428 > > > > > > < > > > > > > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428 > > > > >: > > > > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > > > > > > > > > > > If you want to talk about the overall architecture, roadmap, > > > milestones > > > > > or > > > > > > something related with multiple FLIPs, please post it here. > Otherwise > > > > you > > > > > > can discuss some details in separate mails. Let's try to avoid > > > repeated > > > > > > discussion in different threads. I will sync important messages > here > > > if > > > > > > there are any in the above threads. > > > > > > > > > > > > And reply to @Jeyhun: We will ensure the content between those > FLIPs > > > is > > > > > > consistent. > > > > > > > > > > > > > > > > > > Best, > > > > > > Zakelly > > > > > > > > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com> > > > > wrote: > > > > > > > > > > > > > I have been a bit busy these few weeks and sorry for responding > > > late. > > > > > > > > > > > > > > The original thinking of keeping discussion within one thread > is > > > for > > > > > > easier > > > > > > > tracking and avoid for repeated discussion in different > threads. > > > > > > > > > > > > > > For details, It might be good to start in different threads if > > > > needed. > > > > > > > > > > > > > > We will think of a way to better organize the discussion. > > > > > > > > > > > > > > Best > > > > > > > Yuan > > > > > > > > > > > > > > > > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov < > > > je.kari...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > + 1 for the suggestion. > > > > > > > > Maybe we can the discussion with the FLIPs with minimum > > > > dependencies > > > > > > > (from > > > > > > > > the other new/proposed FLIPs). > > > > > > > > Based on our discussion on a particular FLIP, the subsequent > (or > > > > its > > > > > > > > dependent) FLIP(s) can be updated accordingly? > > > > > > > > > > > > > > > > Regards, > > > > > > > > Jeyhun > > > > > > > > > > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra < > gyula.f...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey all! > > > > > > > > > > > > > > > > > > This is a massive improvement / work. I just started going > > > > through > > > > > > the > > > > > > > > > Flips and have a more or less meta comment. > > > > > > > > > > > > > > > > > > While it's good to keep the overall architecture discussion > > > > here, I > > > > > > > think > > > > > > > > > we should still have separate discussions for each FLIP > where > > > we > > > > > can > > > > > > > > > discuss interface details etc. With so much content if we > start > > > > > > adding > > > > > > > > > minor comments here that will lead to nowhere but those > > > > discussions > > > > > > are > > > > > > > > > still important and we should have them in separate threads > > > (one > > > > > for > > > > > > > each > > > > > > > > > FLIP) > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > Gyula > > > > > > > > > > > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei < > fredia...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi team, > > > > > > > > > > > > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have > > > > > > supplemented > > > > > > > > > > several updates to answer high-frequency questions: > > > > > > > > > > > > > > > > > > > > 1. We captured a flame graph of the Hashmap state > backend in > > > > > > > > > > "Synchronous execution with asynchronous APIs"[1], which > > > > reveals > > > > > > that > > > > > > > > > > the framework overhead (including reference counting, > > > > > > future-related > > > > > > > > > > code and so on) consumes about 9% of the keyed operator > CPU > > > > time. > > > > > > > > > > 2. We added a set of comparative experiments for > watermark > > > > > > > processing, > > > > > > > > > > the performance of Out-Of-Order mode is 70% better than > > > > > > > > > > strictly-ordered mode under ~140MB state size. > Instructions > > > on > > > > > how > > > > > > to > > > > > > > > > > run this test have also been added[2]. > > > > > > > > > > 3. Regarding the order of StreamRecord, whether it has > state > > > > > access > > > > > > > or > > > > > > > > > > not. We supplemented a new *Strict order of > > > > 'processElement'*[3]. > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs > > > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing > > > > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > Yanfei > > > > > > > > > > > > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 > > > > 09:25写道: > > > > > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > > > > > > 5. I'm not very sure ... revisiting this later since > it > > > is > > > > > not > > > > > > > > > > important. > > > > > > > > > > > > > > > > > > > > > > It seems that we still have some details to confirm > about > > > > this > > > > > > > > > > > question. Let's postpone this to after the critical > parts > > > of > > > > > the > > > > > > > > > > > design are settled. > > > > > > > > > > > > > > > > > > > > > > > 8. Yes, we had considered ... metrics should be like > > > > > > afterwards. > > > > > > > > > > > > > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing > this > > > > topic > > > > > > in > > > > > > > > > > milestone 2. > > > > > > > > > > > > > > > > > > > > > > Looking forward to the detailed design about the strict > > > mode > > > > > > > between > > > > > > > > > > > same-key records and the benchmark results about the > epoch > > > > > > > mechanism. > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > Yunfeng > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan < > > > > > > zakelly....@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > Hi Yunfeng, > > > > > > > > > > > > > > > > > > > > > > > > For 1: > > > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize > it is > > > a > > > > > > common > > > > > > > > > case > > > > > > > > > > the same-key record should be blocked before the > > > > > `processElement`. > > > > > > It > > > > > > > > is > > > > > > > > > > easier for users to understand. Thus I will introduce a > > > strict > > > > > mode > > > > > > > for > > > > > > > > > > this and make it default. My rough idea is just like > yours, > > > by > > > > > > > invoking > > > > > > > > > > some method of AEC instance before `processElement`. The > > > > detailed > > > > > > > > design > > > > > > > > > > will be described in FLIP later. > > > > > > > > > > > > > > > > > > > > > > > > For 2: > > > > > > > > > > > > I agree with you. We could throw exceptions for now > and > > > > > > optimize > > > > > > > > this > > > > > > > > > > later. > > > > > > > > > > > > > > > > > > > > > > > > For 5: > > > > > > > > > > > >> > > > > > > > > > > > >> It might be better to move the default values to the > > > > > Proposed > > > > > > > > > Changes > > > > > > > > > > > >> section instead of making them public for now, as > there > > > > will > > > > > > be > > > > > > > > > > > >> compatibility issues once we want to dynamically > adjust > > > > the > > > > > > > > > thresholds > > > > > > > > > > > >> and timeouts in future. > > > > > > > > > > > > > > > > > > > > > > > > Agreed. The whole framework is under experiment > until we > > > > > think > > > > > > it > > > > > > > > is > > > > > > > > > > complete in 2.0 or later. The default value should be > better > > > > > > > determined > > > > > > > > > > with more testing results and production experience. > > > > > > > > > > > > > > > > > > > > > > > >> The configuration execution.async-state.enabled > seems > > > > > > > unnecessary, > > > > > > > > > as > > > > > > > > > > > >> the infrastructure may automatically get this > > > information > > > > > from > > > > > > > the > > > > > > > > > > > >> detailed state backend configurations. We may > revisit > > > this > > > > > > part > > > > > > > > > after > > > > > > > > > > > >> the core designs have reached an agreement. WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not very sure if there is any use case where > users > > > > write > > > > > > > their > > > > > > > > > > code using async APIs but run their job in a synchronous > way. > > > > The > > > > > > > first > > > > > > > > > two > > > > > > > > > > scenarios that come to me are for benchmarking or for a > small > > > > > > state, > > > > > > > > > while > > > > > > > > > > they don't want to rewrite their code. Actually it is > easy to > > > > > > > support, > > > > > > > > so > > > > > > > > > > I'd suggest providing it. But I'm fine with revisiting > this > > > > later > > > > > > > since > > > > > > > > > it > > > > > > > > > > is not important. WDYT? > > > > > > > > > > > > > > > > > > > > > > > > For 8: > > > > > > > > > > > > Yes, we had considered the I/O metrics group > especially > > > the > > > > > > > > > > back-pressure, idle and task busy per second. In the > current > > > > plan > > > > > > we > > > > > > > > can > > > > > > > > > do > > > > > > > > > > state access during back-pressure, meaning that those > metrics > > > > for > > > > > > > input > > > > > > > > > > would better be redefined. I suggest we discuss these > > > existing > > > > > > > metrics > > > > > > > > as > > > > > > > > > > well as some new metrics that should be introduced in > > > FLIP-431 > > > > > > later > > > > > > > in > > > > > > > > > > milestone 2, since we have basically finished the > framework > > > > thus > > > > > we > > > > > > > > will > > > > > > > > > > have a better view of what metrics should be like > afterwards. > > > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou < > > > > > > > > > > flink.zhouyunf...@gmail.com> wrote: > > > > > > > > > > > >> > > > > > > > > > > > >> Hi Zakelly, > > > > > > > > > > > >> > > > > > > > > > > > >> Thanks for the responses! > > > > > > > > > > > >> > > > > > > > > > > > >> > 1. I will discuss this with some expert SQL > > > developers. > > > > > ... > > > > > > > mode > > > > > > > > > > for StreamRecord processing. > > > > > > > > > > > >> > > > > > > > > > > > >> In DataStream API there should also be use cases > when > > > the > > > > > > order > > > > > > > of > > > > > > > > > > > >> output is strictly required. I agree with it that > SQL > > > > > experts > > > > > > > may > > > > > > > > > help > > > > > > > > > > > >> provide more concrete use cases that can accelerate > our > > > > > > > > discussion, > > > > > > > > > > > >> but please allow me to search for DataStream use > cases > > > > that > > > > > > can > > > > > > > > > prove > > > > > > > > > > > >> the necessity of this strict order preservation > mode, if > > > > > > answers > > > > > > > > > from > > > > > > > > > > > >> SQL experts are shown to be negative. > > > > > > > > > > > >> > > > > > > > > > > > >> For your convenience, my current rough idea is that > we > > > can > > > > > > add a > > > > > > > > > > > >> module between the Input(s) and processElement() > module > > > in > > > > > > Fig 2 > > > > > > > > of > > > > > > > > > > > >> FLIP-425. The module will be responsible for caching > > > > records > > > > > > > whose > > > > > > > > > > > >> keys collide with in-flight records, and AEC will > only > > > be > > > > > > > > > responsible > > > > > > > > > > > >> for handling async state calls, without knowing the > > > record > > > > > > each > > > > > > > > call > > > > > > > > > > > >> belongs to. We may revisit this topic once the > necessity > > > > of > > > > > > the > > > > > > > > > strict > > > > > > > > > > > >> order mode is clarified. > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > 2. The amount of parallel StateRequests ... > instead of > > > > > > > invoking > > > > > > > > > > yield > > > > > > > > > > > >> > > > > > > > > > > > >> Your suggestions generally appeal to me. I think we > may > > > > let > > > > > > > > > > > >> corresponding Flink jobs fail with OOM for now, > since > > > the > > > > > > > majority > > > > > > > > > of > > > > > > > > > > > >> a StateRequest should just be references to existing > > > Java > > > > > > > objects, > > > > > > > > > > > >> which only occupies very small memory space and can > > > hardly > > > > > > cause > > > > > > > > OOM > > > > > > > > > > > >> in common cases. We can monitor the pending > > > StateRequests > > > > > and > > > > > > if > > > > > > > > > there > > > > > > > > > > > >> is really a risk of OOM in extreme cases, we can > throw > > > > > > > Exceptions > > > > > > > > > with > > > > > > > > > > > >> proper messages notifying users what to do, like > > > > increasing > > > > > > > memory > > > > > > > > > > > >> through configurations. > > > > > > > > > > > >> > > > > > > > > > > > >> Your suggestions to adjust threshold adaptively or > to > > > use > > > > > the > > > > > > > > > blocking > > > > > > > > > > > >> buffer sounds good, and in my opinion we can > postpone > > > them > > > > > to > > > > > > > > future > > > > > > > > > > > >> FLIPs since they seem to only benefit users in rare > > > cases. > > > > > > Given > > > > > > > > > that > > > > > > > > > > > >> FLIP-423~428 has already been a big enough design, > it > > > > might > > > > > be > > > > > > > > > better > > > > > > > > > > > >> to focus on the most critical design for now and > > > postpone > > > > > > > > > > > >> optimizations like this. WDYT? > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > 5. Sure, we will introduce new configs as well as > > > their > > > > > > > default > > > > > > > > > > value. > > > > > > > > > > > >> > > > > > > > > > > > >> Thanks for adding the default values and the values > > > > > themselves > > > > > > > > LGTM. > > > > > > > > > > > >> It might be better to move the default values to the > > > > > Proposed > > > > > > > > > Changes > > > > > > > > > > > >> section instead of making them public for now, as > there > > > > will > > > > > > be > > > > > > > > > > > >> compatibility issues once we want to dynamically > adjust > > > > the > > > > > > > > > thresholds > > > > > > > > > > > >> and timeouts in future. > > > > > > > > > > > >> > > > > > > > > > > > >> The configuration execution.async-state.enabled > seems > > > > > > > unnecessary, > > > > > > > > > as > > > > > > > > > > > >> the infrastructure may automatically get this > > > information > > > > > from > > > > > > > the > > > > > > > > > > > >> detailed state backend configurations. We may > revisit > > > this > > > > > > part > > > > > > > > > after > > > > > > > > > > > >> the core designs have reached an agreement. WDYT? > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes to > me > > > > that > > > > > > > > > > > >> > > > > > > > > > > > >> 8. Should this FLIP introduce metrics that measure > the > > > > time > > > > > a > > > > > > > > Flink > > > > > > > > > > > >> job is back-pressured by State IOs? Under the > current > > > > > design, > > > > > > > this > > > > > > > > > > > >> metric could measure the time when the blocking > buffer > > > is > > > > > full > > > > > > > and > > > > > > > > > > > >> yield() cannot get callbacks to process, which > means the > > > > > > > operator > > > > > > > > is > > > > > > > > > > > >> fully waiting for state responses. > > > > > > > > > > > >> > > > > > > > > > > > >> Best regards, > > > > > > > > > > > >> Yunfeng > > > > > > > > > > > >> > > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan < > > > > > > > > zakelly....@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > >> > > > > > > > > > > > > >> > Hi Yunfeng, > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks for your detailed comments! > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 1. Why do we need a close() method on > StateIterator? > > > > This > > > > > > > > method > > > > > > > > > > seems > > > > > > > > > > > >> >> unused in the usage example codes. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > The `close()` is introduced to release internal > > > > resources, > > > > > > but > > > > > > > > it > > > > > > > > > > does not seem to require the user to call it. I removed > this. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is > > > stated > > > > > that > > > > > > > "No > > > > > > > > > > null > > > > > > > > > > > >> >> entries are allowed". It might be better to > further > > > > > explain > > > > > > > > what > > > > > > > > > > will > > > > > > > > > > > >> >> happen if a null value is passed, ignoring the > value > > > in > > > > > the > > > > > > > > > > returned > > > > > > > > > > > >> >> Collection or throwing exceptions. Given that > > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the > > > > example > > > > > > > code, > > > > > > > > I > > > > > > > > > > > >> >> suppose the former one might be correct. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > The statement "No null entries are allowed" > refers to > > > > the > > > > > > > > > > parameters, it means some arrayList like [null, > StateFuture1, > > > > > > > > > StateFuture2] > > > > > > > > > > passed in are not allowed, and an Exception will be > thrown. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This > > > > situation > > > > > > > should > > > > > > > > > be > > > > > > > > > > > >> >> avoided and the order of same-key records should > be > > > > > > strictly > > > > > > > > > > > >> >> preserved. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > I will discuss this with some expert SQL > developers. > > > And > > > > > if > > > > > > it > > > > > > > > is > > > > > > > > > > valid and common, I suggest a strict order preservation > mode > > > > for > > > > > > > > > > StreamRecord processing. WDYT? > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by > > > > > Callbacks > > > > > > > will > > > > > > > > > not > > > > > > > > > > > >> >> invoke further yield() methods. Given that > yield() is > > > > > used > > > > > > > when > > > > > > > > > > there > > > > > > > > > > > >> >> is "too much" in-flight data, does it mean > > > > StateRequests > > > > > > > > > submitted > > > > > > > > > > by > > > > > > > > > > > >> >> Callbacks will never be "too much"? What if the > total > > > > > > number > > > > > > > of > > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink > operator's > > > > > > memory > > > > > > > > > space? > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > The amount of parallel StateRequests for one > > > > StreamRecord > > > > > > > cannot > > > > > > > > > be > > > > > > > > > > determined since the code is written by users. So the > > > in-flight > > > > > > > > requests > > > > > > > > > > may be "too much", and may cause OOM. Users should > > > re-configure > > > > > > their > > > > > > > > > job, > > > > > > > > > > controlling the amount of on-going StreamRecord. And I > > > suggest > > > > > two > > > > > > > ways > > > > > > > > > to > > > > > > > > > > avoid this: > > > > > > > > > > > >> > > > > > > > > > > > > >> > Adaptively adjust the count of on-going > StreamRecord > > > > > > according > > > > > > > > to > > > > > > > > > > historical StateRequests amount. > > > > > > > > > > > >> > Also control the max StateRequests that can be > > > executed > > > > in > > > > > > > > > parallel > > > > > > > > > > for each StreamRecord, and if it exceeds, put the new > > > > > StateRequest > > > > > > in > > > > > > > > the > > > > > > > > > > blocking buffer waiting for execution (instead of > invoking > > > > > > yield()). > > > > > > > > > > > >> > > > > > > > > > > > > >> > WDYT? > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order execution > > > mode, > > > > > > along > > > > > > > > > with > > > > > > > > > > the > > > > > > > > > > > >> >> epoch mechanism, would bring more complexity to > the > > > > > > execution > > > > > > > > > model > > > > > > > > > > > >> >> than the performance improvement it promises. > Could > > > we > > > > > add > > > > > > > some > > > > > > > > > > > >> >> benchmark results proving the benefit of this > mode? > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Agreed, will do. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API > section > > > > > > > describing > > > > > > > > > how > > > > > > > > > > > >> >> users or developers can switch between these two > > > > > execution > > > > > > > > modes. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Good point. We will add a Public API section. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint > mentioned > > > > in > > > > > > this > > > > > > > > > FLIP, > > > > > > > > > > > >> >> there are also more other events that might > appear in > > > > the > > > > > > > > stream > > > > > > > > > of > > > > > > > > > > > >> >> data records. It might be better to generalize > the > > > > > > execution > > > > > > > > mode > > > > > > > > > > > >> >> mechanism to handle all possible events. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Yes, I missed this point. Thanks for the reminder. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 4. It might be better to treat callback-handling > as a > > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid > the > > > > > > overhead > > > > > > > > of > > > > > > > > > > > >> >> repeatedly creating Mail objects. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > I thought the intermediated wrapper for callback > can > > > > not > > > > > be > > > > > > > > > > omitted, since there will be some context switch before > each > > > > > > > execution. > > > > > > > > > The > > > > > > > > > > MailboxDefaultAction in most cases is processInput right? > > > While > > > > > the > > > > > > > > > > callback should be executed with higher priority. I'd > suggest > > > > not > > > > > > > > > changing > > > > > > > > > > the basic logic of Mailbox and the default action since > it is > > > > > very > > > > > > > > > critical > > > > > > > > > > for performance. But yes, we will try our best to avoid > > > > creating > > > > > > > > > > intermediated objects. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 5. Could this FLIP provide the current default > values > > > > for > > > > > > > > things > > > > > > > > > > like > > > > > > > > > > > >> >> active buffer size thresholds and timeouts? These > > > could > > > > > > help > > > > > > > > with > > > > > > > > > > > >> >> memory consumption and latency analysis. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Sure, we will introduce new configs as well as > their > > > > > default > > > > > > > > > value. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a > record > > > in > > > > > its > > > > > > > > > > > >> >> RecordContext? It seems not used. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > The context switch before each callback execution > > > > involves > > > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We > cache > > > it > > > > > for > > > > > > > > > > accelerating. > > > > > > > > > > > >> > > > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or > > > > RocksDB", > > > > > > the > > > > > > > > link > > > > > > > > > > > >> >> points to a document in flink-1.15. It might be > > > better > > > > to > > > > > > > > verify > > > > > > > > > > the > > > > > > > > > > > >> >> referenced content is still valid in the latest > Flink > > > > and > > > > > > > > update > > > > > > > > > > the > > > > > > > > > > > >> >> link accordingly. Same for other references if > any. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks for the reminder! Will check. > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks a lot & Best, > > > > > > > > > > > >> > Zakelly > > > > > > > > > > > >> > > > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov < > > > > > > > > > je.kari...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> Hi, > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> Thanks for the great proposals. I have a few > comments > > > > > > > comments: > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - Backpressure Handling. Flink's original > > > backpressure > > > > > > > handling > > > > > > > > > is > > > > > > > > > > quite > > > > > > > > > > > >> >> robust and the semantics is quite "simple" > (simple is > > > > > > > > beautiful). > > > > > > > > > > > >> >> This mechanism has proven to perform > better/robust > > > than > > > > > the > > > > > > > > other > > > > > > > > > > open > > > > > > > > > > > >> >> source streaming systems, where they were > relying on > > > > some > > > > > > > > > loopback > > > > > > > > > > > >> >> information. > > > > > > > > > > > >> >> Now that the proposal also relies on loopback > (yield > > > in > > > > > > this > > > > > > > > > > case), it is > > > > > > > > > > > >> >> not clear how well the new backpressure handling > > > > proposed > > > > > > in > > > > > > > > > > FLIP-425 is > > > > > > > > > > > >> >> robust and handle fluctuating workloads. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments > apply > > > for > > > > > > > > watermark > > > > > > > > > > and timer > > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing > the > > > > > > overhead > > > > > > > > > > > >> >> of epoch management with different parameters > (e.g., > > > > > window > > > > > > > > size, > > > > > > > > > > watermark > > > > > > > > > > > >> >> strategy, etc) > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in > > > FLIP-427 > > > > is > > > > > > > > > > DFS-agnostic. > > > > > > > > > > > >> >> However, different cloud providers have different > > > > storage > > > > > > > > > > consistency > > > > > > > > > > > >> >> models. > > > > > > > > > > > >> >> How do we want to deal with them? > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> Regards, > > > > > > > > > > > >> >> Jeyhun > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan < > > > > > > > > > zakelly....@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts! > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > I guess it depends how we would like to treat > the > > > > local > > > > > > > > disks. > > > > > > > > > > I've always > > > > > > > > > > > >> >> > > thought about them that almost always > eventually > > > > all > > > > > > > state > > > > > > > > > > from the DFS > > > > > > > > > > > >> >> > > should end up cached in the local disks. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > OK I got it. In our proposal we treat local > disk as > > > > an > > > > > > > > optional > > > > > > > > > > cache, so > > > > > > > > > > > >> >> > the basic design will handle the case with > state > > > > > residing > > > > > > > in > > > > > > > > > DFS > > > > > > > > > > only. It > > > > > > > > > > > >> >> > is a more 'cloud-native' approach that does not > > > rely > > > > on > > > > > > any > > > > > > > > > > local storage > > > > > > > > > > > >> >> > assumptions, which allow users to dynamically > > > adjust > > > > > the > > > > > > > > > > capacity or I/O > > > > > > > > > > > >> >> > bound of remote storage to gain performance or > save > > > > the > > > > > > > cost, > > > > > > > > > > even without > > > > > > > > > > > >> >> > a job restart. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > In > > > > > > > > > > > >> >> > > the currently proposed more fine grained > > > solution, > > > > > you > > > > > > > > make a > > > > > > > > > > single > > > > > > > > > > > >> >> > > request to DFS per each state access. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the > > > state > > > > > > > requests > > > > > > > > > > and process > > > > > > > > > > > >> >> > them in batch, multiple requests will > correspond to > > > > one > > > > > > DFS > > > > > > > > > > access (One > > > > > > > > > > > >> >> > block access for multiple keys performed by > > > RocksDB). > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > In that benchmark you mentioned, are you > requesting > > > > the > > > > > > > state > > > > > > > > > > > >> >> > > asynchronously from local disks into memory? > If > > > the > > > > > > > benefit > > > > > > > > > > comes from > > > > > > > > > > > >> >> > > parallel I/O, then I would expect the > benefit to > > > > > > > > > > disappear/shrink when > > > > > > > > > > > >> >> > > running multiple subtasks on the same > machine, as > > > > > they > > > > > > > > would > > > > > > > > > > be making > > > > > > > > > > > >> >> > > their own parallel requests, right? Also > enabling > > > > > > > > > > checkpointing would > > > > > > > > > > > >> >> > > further cut into the available I/O budget. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > That's an interesting topic. Our proposal is > > > > > specifically > > > > > > > > aimed > > > > > > > > > > at the > > > > > > > > > > > >> >> > scenario where the machine I/O is not fully > loaded > > > > but > > > > > > the > > > > > > > > I/O > > > > > > > > > > latency has > > > > > > > > > > > >> >> > indeed become a bottleneck for each subtask. > While > > > > the > > > > > > > > > > distributed file > > > > > > > > > > > >> >> > system is a prime example of a scenario > > > characterized > > > > > by > > > > > > > > > > abundant and > > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with > higher > > > I/O > > > > > > > > latency. > > > > > > > > > > You may > > > > > > > > > > > >> >> > expect to increase the parallelism of a job to > > > > enhance > > > > > > the > > > > > > > > > > performance as > > > > > > > > > > > >> >> > well, but that also brings in more waste of > CPU's > > > and > > > > > > > memory > > > > > > > > > for > > > > > > > > > > building > > > > > > > > > > > >> >> > up more subtasks. This is one drawback for the > > > > > > > > > > computation-storage tightly > > > > > > > > > > > >> >> > coupled nodes. While in our proposal, the > parallel > > > > I/O > > > > > > with > > > > > > > > all > > > > > > > > > > the > > > > > > > > > > > >> >> > callbacks still running in one task, > pre-allocated > > > > > > > > > computational > > > > > > > > > > resources > > > > > > > > > > > >> >> > are better utilized. It is a much more > lightweight > > > > way > > > > > to > > > > > > > > > > perform parallel > > > > > > > > > > > >> >> > I/O. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Just with what granularity those async requests > > > > should > > > > > be > > > > > > > > made. > > > > > > > > > > > >> >> > > Making state access asynchronous is > definitely > > > the > > > > > > right > > > > > > > > way > > > > > > > > > > to go! > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > I think the current proposal is based on such > core > > > > > ideas: > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > - A pure cloud-native disaggregated state. > > > > > > > > > > > >> >> > - Fully utilize the given resources and try > not > > > to > > > > > > waste > > > > > > > > > them > > > > > > > > > > (including > > > > > > > > > > > >> >> > I/O). > > > > > > > > > > > >> >> > - The ability to scale isolated resources > (I/O > > > or > > > > > CPU > > > > > > or > > > > > > > > > > memory) > > > > > > > > > > > >> >> > independently. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > We think a fine-grained granularity is more > inline > > > > with > > > > > > > those > > > > > > > > > > ideas, > > > > > > > > > > > >> >> > especially without local disk assumptions and > > > without > > > > > any > > > > > > > > waste > > > > > > > > > > of I/O by > > > > > > > > > > > >> >> > prefetching. Please note that it is not a > > > replacement > > > > > of > > > > > > > the > > > > > > > > > > original local > > > > > > > > > > > >> >> > state with synchronous execution. Instead this > is a > > > > > > > solution > > > > > > > > > > embracing the > > > > > > > > > > > >> >> > cloud-native era, providing much more > scalability > > > and > > > > > > > > resource > > > > > > > > > > efficiency > > > > > > > > > > > >> >> > when handling a *huge state*. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > What also worries me a lot in this fine grained > > > model > > > > > is > > > > > > > the > > > > > > > > > > effect on the > > > > > > > > > > > >> >> > > checkpointing times. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster > > > > checkpointing > > > > > > is > > > > > > > > > > always a core > > > > > > > > > > > >> >> > advantage of disaggregated state, but only for > the > > > > > async > > > > > > > > phase. > > > > > > > > > > There will > > > > > > > > > > > >> >> > be some complexity introduced by in-flight > > > requests, > > > > > but > > > > > > > I'd > > > > > > > > > > suggest a > > > > > > > > > > > >> >> > checkpoint containing those in-flight state > > > requests > > > > as > > > > > > > part > > > > > > > > of > > > > > > > > > > the state, > > > > > > > > > > > >> >> > to accelerate the sync phase by skipping the > buffer > > > > > > > draining. > > > > > > > > > > This makes > > > > > > > > > > > >> >> > the buffer size have little impact on > checkpoint > > > > time. > > > > > > And > > > > > > > > all > > > > > > > > > > the changes > > > > > > > > > > > >> >> > keep within the execution model we proposed > while > > > the > > > > > > > > > checkpoint > > > > > > > > > > barrier > > > > > > > > > > > >> >> > alignment or handling will not be touched in > our > > > > > > proposal, > > > > > > > > so I > > > > > > > > > > guess > > > > > > > > > > > >> >> > the complexity is relatively controllable. I > have > > > > faith > > > > > > in > > > > > > > > that > > > > > > > > > > :) > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Also regarding the overheads, it would be > great if > > > > you > > > > > > > could > > > > > > > > > > provide > > > > > > > > > > > >> >> > > profiling results of the benchmarks that you > > > > > conducted > > > > > > to > > > > > > > > > > verify the > > > > > > > > > > > >> >> > > results. Or maybe if you could describe the > steps > > > > to > > > > > > > > > reproduce > > > > > > > > > > the > > > > > > > > > > > >> >> > results? > > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with > > > async > > > > > > API". > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for > the > > > > > > comparison > > > > > > > > of > > > > > > > > > > "Hashmap > > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we > conduct a > > > > > > Wordcount > > > > > > > > job > > > > > > > > > > written > > > > > > > > > > > >> >> > with async APIs but disabling the async > execution > > > by > > > > > > > directly > > > > > > > > > > completing > > > > > > > > > > > >> >> > the future using sync state access. This > evaluates > > > > the > > > > > > > > overhead > > > > > > > > > > of newly > > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync execution > > > (even > > > > > > > though > > > > > > > > > > they are not > > > > > > > > > > > >> >> > designed for it). The code will be provided > later. > > > > For > > > > > > > other > > > > > > > > > > results of our > > > > > > > > > > > >> >> > PoC[1], you can follow the instructions > here[2] to > > > > > > > reproduce. > > > > > > > > > > Since the > > > > > > > > > > > >> >> > compilation may take some effort, we will > directly > > > > > > provide > > > > > > > > the > > > > > > > > > > jar for > > > > > > > > > > > >> >> > testing next week. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail > but it > > > > is a > > > > > > bit > > > > > > > > > late > > > > > > > > > > in my > > > > > > > > > > > >> >> > local time and the next few days are weekends. > So I > > > > > will > > > > > > > > reply > > > > > > > > > > to you > > > > > > > > > > > >> >> > later. Thanks for your response! > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > [1] > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults > > > > > > > > > > > >> >> > [2] > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Best, > > > > > > > > > > > >> >> > Zakelly > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou < > > > > > > > > > > flink.zhouyunf...@gmail.com> > > > > > > > > > > > >> >> > wrote: > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > Hi, > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > Thanks for proposing this design! I just read > > > > > FLIP-424 > > > > > > > and > > > > > > > > > > FLIP-425 > > > > > > > > > > > >> >> > > and have some questions about the proposed > > > changes. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > For Async API (FLIP-424) > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 1. Why do we need a close() method on > > > > StateIterator? > > > > > > This > > > > > > > > > > method seems > > > > > > > > > > > >> >> > > unused in the usage example codes. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it > is > > > > > stated > > > > > > > that > > > > > > > > > > "No null > > > > > > > > > > > >> >> > > entries are allowed". It might be better to > > > further > > > > > > > explain > > > > > > > > > > what will > > > > > > > > > > > >> >> > > happen if a null value is passed, ignoring > the > > > > value > > > > > in > > > > > > > the > > > > > > > > > > returned > > > > > > > > > > > >> >> > > Collection or throwing exceptions. Given that > > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned in > the > > > > > > example > > > > > > > > > code, > > > > > > > > > > I > > > > > > > > > > > >> >> > > suppose the former one might be correct. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > For Async Execution (FLIP-425) > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a > recordB > > > > has > > > > > > its > > > > > > > > key > > > > > > > > > > collide > > > > > > > > > > > >> >> > > with an ongoing recordA, its processElement() > > > > method > > > > > > can > > > > > > > > > still > > > > > > > > > > be > > > > > > > > > > > >> >> > > triggered immediately, and then it might be > moved > > > > to > > > > > > the > > > > > > > > > > blocking > > > > > > > > > > > >> >> > > buffer in AEC if it involves state > operations. > > > This > > > > > > means > > > > > > > > > that > > > > > > > > > > > >> >> > > recordB's output will precede recordA's > output in > > > > > > > > downstream > > > > > > > > > > > >> >> > > operators, if recordA involves state > operations > > > > while > > > > > > > > recordB > > > > > > > > > > does > > > > > > > > > > > >> >> > > not. This will harm the correctness of Flink > jobs > > > > in > > > > > > some > > > > > > > > use > > > > > > > > > > cases. > > > > > > > > > > > >> >> > > For example, in dim table join cases, recordA > > > could > > > > > be > > > > > > a > > > > > > > > > delete > > > > > > > > > > > >> >> > > operation that involves state access, while > > > recordB > > > > > > could > > > > > > > > be > > > > > > > > > > an insert > > > > > > > > > > > >> >> > > operation that needs to visit external > storage > > > > > without > > > > > > > > state > > > > > > > > > > access. > > > > > > > > > > > >> >> > > If recordB's output precedes recordA's, then > an > > > > entry > > > > > > > that > > > > > > > > is > > > > > > > > > > supposed > > > > > > > > > > > >> >> > > to finally exist with recordB's value in the > sink > > > > > table > > > > > > > > might > > > > > > > > > > actually > > > > > > > > > > > >> >> > > be deleted according to recordA's command. > This > > > > > > situation > > > > > > > > > > should be > > > > > > > > > > > >> >> > > avoided and the order of same-key records > should > > > be > > > > > > > > strictly > > > > > > > > > > > >> >> > > preserved. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests > submitted by > > > > > > > Callbacks > > > > > > > > > > will not > > > > > > > > > > > >> >> > > invoke further yield() methods. Given that > > > yield() > > > > is > > > > > > > used > > > > > > > > > > when there > > > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean > > > > > > StateRequests > > > > > > > > > > submitted by > > > > > > > > > > > >> >> > > Callbacks will never be "too much"? What if > the > > > > total > > > > > > > > number > > > > > > > > > of > > > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink > > > > operator's > > > > > > > > memory > > > > > > > > > > space? > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP > provided > > > > an > > > > > > > > > > out-of-order > > > > > > > > > > > >> >> > > execution mode apart from the default > > > > > strictly-ordered > > > > > > > > mode, > > > > > > > > > > which can > > > > > > > > > > > >> >> > > optimize performance by allowing more > concurrent > > > > > > > > executions. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order > execution > > > > > mode, > > > > > > > > along > > > > > > > > > > with the > > > > > > > > > > > >> >> > > epoch mechanism, would bring more complexity > to > > > the > > > > > > > > execution > > > > > > > > > > model > > > > > > > > > > > >> >> > > than the performance improvement it promises. > > > Could > > > > > we > > > > > > > add > > > > > > > > > some > > > > > > > > > > > >> >> > > benchmark results proving the benefit of this > > > mode? > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API > > > section > > > > > > > > > describing > > > > > > > > > > how > > > > > > > > > > > >> >> > > users or developers can switch between these > two > > > > > > > execution > > > > > > > > > > modes. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint > > > > mentioned > > > > > > in > > > > > > > > this > > > > > > > > > > FLIP, > > > > > > > > > > > >> >> > > there are also more other events that might > > > appear > > > > in > > > > > > the > > > > > > > > > > stream of > > > > > > > > > > > >> >> > > data records. It might be better to > generalize > > > the > > > > > > > > execution > > > > > > > > > > mode > > > > > > > > > > > >> >> > > mechanism to handle all possible events. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 4. It might be better to treat > callback-handling > > > > as a > > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to > avoid > > > > the > > > > > > > > overhead > > > > > > > > > > of > > > > > > > > > > > >> >> > > repeatedly creating Mail objects. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current > default > > > > values > > > > > > for > > > > > > > > > > things like > > > > > > > > > > > >> >> > > active buffer size thresholds and timeouts? > These > > > > > could > > > > > > > > help > > > > > > > > > > with > > > > > > > > > > > >> >> > > memory consumption and latency analysis. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of a > > > > record > > > > > in > > > > > > > its > > > > > > > > > > > >> >> > > RecordContext? It seems not used. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap > or > > > > > > RocksDB", > > > > > > > > the > > > > > > > > > > link > > > > > > > > > > > >> >> > > points to a document in flink-1.15. It might > be > > > > > better > > > > > > to > > > > > > > > > > verify the > > > > > > > > > > > >> >> > > referenced content is still valid in the > latest > > > > Flink > > > > > > and > > > > > > > > > > update the > > > > > > > > > > > >> >> > > link accordingly. Same for other references > if > > > any. > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > Best, > > > > > > > > > > > >> >> > > Yunfeng Zhou > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei < > > > > > > > > > > yuanmei.w...@gmail.com> wrote: > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > Hi Devs, > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly > Lan, > > > > > > Jinzhong > > > > > > > > Li, > > > > > > > > > > Hangxiang > > > > > > > > > > > >> >> > Yu, > > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to > start a > > > > > > > discussion > > > > > > > > > > about > > > > > > > > > > > >> >> > > introducing > > > > > > > > > > > >> >> > > > Disaggregated State Storage and Management > in > > > > Flink > > > > > > > 2.0. > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic > shift > > > in > > > > > > > Flink's > > > > > > > > > > deployment > > > > > > > > > > > >> >> > > mode, > > > > > > > > > > > >> >> > > > workload patterns, and hardware > improvements. > > > > We've > > > > > > > moved > > > > > > > > > > from the > > > > > > > > > > > >> >> > > > map-reduce era where workers are > > > > > computation-storage > > > > > > > > > tightly > > > > > > > > > > coupled > > > > > > > > > > > >> >> > > nodes > > > > > > > > > > > >> >> > > > to a cloud-native world where containerized > > > > > > deployments > > > > > > > > on > > > > > > > > > > Kubernetes > > > > > > > > > > > >> >> > > > become standard. To enable Flink's > Cloud-Native > > > > > > future, > > > > > > > > we > > > > > > > > > > introduce > > > > > > > > > > > >> >> > > > Disaggregated State Storage and Management > that > > > > > uses > > > > > > > DFS > > > > > > > > as > > > > > > > > > > primary > > > > > > > > > > > >> >> > > storage > > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 > > > > Roadmap. > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > Design Details can be found in FLIP-423[1]. > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > This new architecture is aimed to solve the > > > > > following > > > > > > > > > > challenges > > > > > > > > > > > >> >> > brought > > > > > > > > > > > >> >> > > in > > > > > > > > > > > >> >> > > > the cloud-native era for Flink. > > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in > containerization > > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by > compaction in > > > > the > > > > > > > > current > > > > > > > > > > state model > > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large > states > > > > > > (hundreds > > > > > > > of > > > > > > > > > > Terabytes) > > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native > way > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > More specifically, we want to reach a > consensus > > > > on > > > > > > the > > > > > > > > > > following issues > > > > > > > > > > > >> >> > > in > > > > > > > > > > > >> >> > > > this discussion: > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > 1. Overall design > > > > > > > > > > > >> >> > > > 2. Proposed Changes > > > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1 > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end > baseline > > > > > > version > > > > > > > > > > using DFS as > > > > > > > > > > > >> >> > > > primary storage and complete core > > > > functionalities, > > > > > > > > > including: > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: > > > > Introduce > > > > > > new > > > > > > > > APIs > > > > > > > > > > for > > > > > > > > > > > >> >> > > > asynchronous state access. > > > > > > > > > > > >> >> > > > - Asynchronous Execution Model > (FLIP-425)[3]: > > > > > > > Implement a > > > > > > > > > > non-blocking > > > > > > > > > > > >> >> > > > execution model leveraging the asynchronous > > > APIs > > > > > > > > introduced > > > > > > > > > > in > > > > > > > > > > > >> >> > FLIP-424. > > > > > > > > > > > >> >> > > > - Grouping Remote State Access > (FLIP-426)[4]: > > > > > Enable > > > > > > > > > > retrieval of > > > > > > > > > > > >> >> > remote > > > > > > > > > > > >> >> > > > state data in batches to avoid unnecessary > > > > > round-trip > > > > > > > > costs > > > > > > > > > > for remote > > > > > > > > > > > >> >> > > > access > > > > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: > > > > > Introduce > > > > > > > the > > > > > > > > > > initial > > > > > > > > > > > >> >> > version > > > > > > > > > > > >> >> > > of > > > > > > > > > > > >> >> > > > the ForSt disaggregated state store. > > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration > > > > > (FLIP-428)[6]: > > > > > > > > > > Integrate > > > > > > > > > > > >> >> > > > checkpointing mechanisms with the > disaggregated > > > > > state > > > > > > > > store > > > > > > > > > > for fault > > > > > > > > > > > >> >> > > > tolerance and fast rescaling. > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate > threads > > > to > > > > > make > > > > > > > > sure > > > > > > > > > > each FLIP > > > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep > the > > > > > > discussion > > > > > > > > > > within a > > > > > > > > > > > >> >> > focused > > > > > > > > > > > >> >> > > > thread (this thread) for easier tracking of > > > > > contexts > > > > > > to > > > > > > > > > avoid > > > > > > > > > > > >> >> > duplicated > > > > > > > > > > > >> >> > > > questions/discussions and also to think of > the > > > > > > > > > > problem/solution in a > > > > > > > > > > > >> >> > full > > > > > > > > > > > >> >> > > > picture. > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > Looking forward to your feedback > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > Best, > > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei > and > > > > Feng > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > [1] > > > https://cwiki.apache.org/confluence/x/R4p3EQ > > > > > > > > > > > >> >> > > > [2] > > > https://cwiki.apache.org/confluence/x/SYp3EQ > > > > > > > > > > > >> >> > > > [3] > > > https://cwiki.apache.org/confluence/x/S4p3EQ > > > > > > > > > > > >> >> > > > [4] > > > https://cwiki.apache.org/confluence/x/TYp3EQ > > > > > > > > > > > >> >> > > > [5] > > > https://cwiki.apache.org/confluence/x/T4p3EQ > > > > > > > > > > > >> >> > > > [6] > > > https://cwiki.apache.org/confluence/x/UYp3EQ > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >