Thanks for the quick response. Sounds good to me. Best,
Xintong On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Xintong, > > Thanks for sharing your thoughts! > > 1. Regarding Record-ordered and State-ordered of processElement. > > > > I understand that while State-ordered likely provides better performance, > > Record-ordered is sometimes required for correctness. The question is how > > should a user choose between these two modes? My concern is that such a > > decision may require users to have in-depth knowledge about the Flink > > internals, and may lead to correctness issues if State-ordered is chosen > > improperly. > > > > I'd suggest not to expose such a knob, at least in the first version. > That > > means always use Record-ordered for custom operators / UDFs, and keep > > State-ordered for internal usages (built-in operators) only. > > > > Indeed, users may not be able to choose the mode properly. I agree to keep > such options for internal use. > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. > > > > I'm not entirely sure about Strictly-ordered being the default, or even > > being supported. From my understanding, a Watermark(T) only suggests that > > all records with event time before T has arrived, and it has nothing to > do > > with whether records with event time after T has arrived or not. From > that > > perspective, preventing certain records from arriving before a Watermark > is > > never supported. I also cannot come up with any use case where > > Strictly-ordered is necessary. This implies the same issue as 1): how > does > > the user choose between the two modes? > > > > I'd suggest not expose the knob to users and only support Out-of-order, > > until we see a concrete use case that Strictly-ordered is needed. > > > > The semantics of watermarks do not define the sequence between a watermark > and subsequent records. For the most part, this is inconsequential, except > it may affect some current users who have previously relied on the implicit > assumption of an ordered execution. I'd be fine with initially supporting > only out-of-order processing. We may consider exposing the > 'Strictly-ordered' mode once we encounter a concrete use case that > necessitates it. > > > My philosophies behind not exposing the two config options are: > > - There are already too many options in Flink that barely know how to use > > them. I think Flink should try as much as possible to decide its own > > behavior, rather than throwing all the decisions to the users. > > - It's much harder to take back knobs than to introduce them. Therefore, > > options should be introduced only if concrete use cases are identified. > > > > I agree to keep minimal configurable items especially for the MVP. Given > that we have the opportunity to refine the functionality before the > framework transitions from @Experimental to @PublicEvolving, it makes sense > to refrain from presenting user-facing options until we have ensured > their necessity. > > > Best, > Zakelly > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com> > wrote: > > > Sorry for joining the discussion late. > > > > I have two questions about FLIP-425. > > > > 1. Regarding Record-ordered and State-ordered of processElement. > > > > I understand that while State-ordered likely provides better performance, > > Record-ordered is sometimes required for correctness. The question is how > > should a user choose between these two modes? My concern is that such a > > decision may require users to have in-depth knowledge about the Flink > > internals, and may lead to correctness issues if State-ordered is chosen > > improperly. > > > > I'd suggest not to expose such a knob, at least in the first version. > That > > means always use Record-ordered for custom operators / UDFs, and keep > > State-ordered for internal usages (built-in operators) only. > > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks. > > > > I'm not entirely sure about Strictly-ordered being the default, or even > > being supported. From my understanding, a Watermark(T) only suggests that > > all records with event time before T has arrived, and it has nothing to > do > > with whether records with event time after T has arrived or not. From > that > > perspective, preventing certain records from arriving before a Watermark > is > > never supported. I also cannot come up with any use case where > > Strictly-ordered is necessary. This implies the same issue as 1): how > does > > the user choose between the two modes? > > > > I'd suggest not expose the knob to users and only support Out-of-order, > > until we see a concrete use case that Strictly-ordered is needed. > > > > > > My philosophies behind not exposing the two config options are: > > - There are already too many options in Flink that barely know how to use > > them. I think Flink should try as much as possible to decide its own > > behavior, rather than throwing all the decisions to the users. > > - It's much harder to take back knobs than to introduce them. Therefore, > > options should be introduced only if concrete use cases are identified. > > > > WDYT? > > > > Best, > > > > Xintong > > > > > > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid> > wrote: > > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which introduced > the > > > intention of the big change and high level architecture. Great content > > btw! > > > The only public interface change for this FLIP is one new config to use > > > ForSt. It makes sense to have one dedicated discussion thread for each > > > concrete system design. > > > > > > @Zakelly The links in your mail do not work except the last one, > because > > > the FLIP-xxx has been included into the url like > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > . > > > > > > NIT fix: > > > > > > FLIP-424: > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > > > > > FLIP-425: > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > > > > > FLIP-426: > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > > > > > FLIP-427: > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > > > > > FLIP-428: > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > > > > > Best regards, > > > Jing > > > > > > > > > > > > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > > > Hi everyone, > > > > > > > > Thank you all for a lively discussion here, and it is a good time to > > move > > > > forward to more detailed discussions. Thus we open several threads > for > > > > sub-FLIPs: > > > > > > > > FLIP-424: > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864 > > > > FLIP-425 > > > > < > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425 > > >: > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h > > > > FLIP-426 > > > > < > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426 > > >: > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf > > > > FLIP-427 > > > > < > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427 > > >: > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft > > > > FLIP-428 > > > > < > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428 > > >: > > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b > > > > > > > > If you want to talk about the overall architecture, roadmap, > milestones > > > or > > > > something related with multiple FLIPs, please post it here. Otherwise > > you > > > > can discuss some details in separate mails. Let's try to avoid > repeated > > > > discussion in different threads. I will sync important messages here > if > > > > there are any in the above threads. > > > > > > > > And reply to @Jeyhun: We will ensure the content between those FLIPs > is > > > > consistent. > > > > > > > > > > > > Best, > > > > Zakelly > > > > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com> > > wrote: > > > > > > > > > I have been a bit busy these few weeks and sorry for responding > late. > > > > > > > > > > The original thinking of keeping discussion within one thread is > for > > > > easier > > > > > tracking and avoid for repeated discussion in different threads. > > > > > > > > > > For details, It might be good to start in different threads if > > needed. > > > > > > > > > > We will think of a way to better organize the discussion. > > > > > > > > > > Best > > > > > Yuan > > > > > > > > > > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov < > je.kari...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > + 1 for the suggestion. > > > > > > Maybe we can the discussion with the FLIPs with minimum > > dependencies > > > > > (from > > > > > > the other new/proposed FLIPs). > > > > > > Based on our discussion on a particular FLIP, the subsequent (or > > its > > > > > > dependent) FLIP(s) can be updated accordingly? > > > > > > > > > > > > Regards, > > > > > > Jeyhun > > > > > > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hey all! > > > > > > > > > > > > > > This is a massive improvement / work. I just started going > > through > > > > the > > > > > > > Flips and have a more or less meta comment. > > > > > > > > > > > > > > While it's good to keep the overall architecture discussion > > here, I > > > > > think > > > > > > > we should still have separate discussions for each FLIP where > we > > > can > > > > > > > discuss interface details etc. With so much content if we start > > > > adding > > > > > > > minor comments here that will lead to nowhere but those > > discussions > > > > are > > > > > > > still important and we should have them in separate threads > (one > > > for > > > > > each > > > > > > > FLIP) > > > > > > > > > > > > > > What do you think? > > > > > > > Gyula > > > > > > > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > Hi team, > > > > > > > > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have > > > > supplemented > > > > > > > > several updates to answer high-frequency questions: > > > > > > > > > > > > > > > > 1. We captured a flame graph of the Hashmap state backend in > > > > > > > > "Synchronous execution with asynchronous APIs"[1], which > > reveals > > > > that > > > > > > > > the framework overhead (including reference counting, > > > > future-related > > > > > > > > code and so on) consumes about 9% of the keyed operator CPU > > time. > > > > > > > > 2. We added a set of comparative experiments for watermark > > > > > processing, > > > > > > > > the performance of Out-Of-Order mode is 70% better than > > > > > > > > strictly-ordered mode under ~140MB state size. Instructions > on > > > how > > > > to > > > > > > > > run this test have also been added[2]. > > > > > > > > 3. Regarding the order of StreamRecord, whether it has state > > > access > > > > > or > > > > > > > > not. We supplemented a new *Strict order of > > 'processElement'*[3]. > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs > > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing > > > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Yanfei > > > > > > > > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 > > 09:25写道: > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > > 5. I'm not very sure ... revisiting this later since it > is > > > not > > > > > > > > important. > > > > > > > > > > > > > > > > > > It seems that we still have some details to confirm about > > this > > > > > > > > > question. Let's postpone this to after the critical parts > of > > > the > > > > > > > > > design are settled. > > > > > > > > > > > > > > > > > > > 8. Yes, we had considered ... metrics should be like > > > > afterwards. > > > > > > > > > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this > > topic > > > > in > > > > > > > > milestone 2. > > > > > > > > > > > > > > > > > > Looking forward to the detailed design about the strict > mode > > > > > between > > > > > > > > > same-key records and the benchmark results about the epoch > > > > > mechanism. > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > Yunfeng > > > > > > > > > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan < > > > > zakelly....@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Hi Yunfeng, > > > > > > > > > > > > > > > > > > > > For 1: > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize it is > a > > > > common > > > > > > > case > > > > > > > > the same-key record should be blocked before the > > > `processElement`. > > > > It > > > > > > is > > > > > > > > easier for users to understand. Thus I will introduce a > strict > > > mode > > > > > for > > > > > > > > this and make it default. My rough idea is just like yours, > by > > > > > invoking > > > > > > > > some method of AEC instance before `processElement`. The > > detailed > > > > > > design > > > > > > > > will be described in FLIP later. > > > > > > > > > > > > > > > > > > > > For 2: > > > > > > > > > > I agree with you. We could throw exceptions for now and > > > > optimize > > > > > > this > > > > > > > > later. > > > > > > > > > > > > > > > > > > > > For 5: > > > > > > > > > >> > > > > > > > > > >> It might be better to move the default values to the > > > Proposed > > > > > > > Changes > > > > > > > > > >> section instead of making them public for now, as there > > will > > > > be > > > > > > > > > >> compatibility issues once we want to dynamically adjust > > the > > > > > > > thresholds > > > > > > > > > >> and timeouts in future. > > > > > > > > > > > > > > > > > > > > Agreed. The whole framework is under experiment until we > > > think > > > > it > > > > > > is > > > > > > > > complete in 2.0 or later. The default value should be better > > > > > determined > > > > > > > > with more testing results and production experience. > > > > > > > > > > > > > > > > > > > >> The configuration execution.async-state.enabled seems > > > > > unnecessary, > > > > > > > as > > > > > > > > > >> the infrastructure may automatically get this > information > > > from > > > > > the > > > > > > > > > >> detailed state backend configurations. We may revisit > this > > > > part > > > > > > > after > > > > > > > > > >> the core designs have reached an agreement. WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm not very sure if there is any use case where users > > write > > > > > their > > > > > > > > code using async APIs but run their job in a synchronous way. > > The > > > > > first > > > > > > > two > > > > > > > > scenarios that come to me are for benchmarking or for a small > > > > state, > > > > > > > while > > > > > > > > they don't want to rewrite their code. Actually it is easy to > > > > > support, > > > > > > so > > > > > > > > I'd suggest providing it. But I'm fine with revisiting this > > later > > > > > since > > > > > > > it > > > > > > > > is not important. WDYT? > > > > > > > > > > > > > > > > > > > > For 8: > > > > > > > > > > Yes, we had considered the I/O metrics group especially > the > > > > > > > > back-pressure, idle and task busy per second. In the current > > plan > > > > we > > > > > > can > > > > > > > do > > > > > > > > state access during back-pressure, meaning that those metrics > > for > > > > > input > > > > > > > > would better be redefined. I suggest we discuss these > existing > > > > > metrics > > > > > > as > > > > > > > > well as some new metrics that should be introduced in > FLIP-431 > > > > later > > > > > in > > > > > > > > milestone 2, since we have basically finished the framework > > thus > > > we > > > > > > will > > > > > > > > have a better view of what metrics should be like afterwards. > > > WDYT? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou < > > > > > > > > flink.zhouyunf...@gmail.com> wrote: > > > > > > > > > >> > > > > > > > > > >> Hi Zakelly, > > > > > > > > > >> > > > > > > > > > >> Thanks for the responses! > > > > > > > > > >> > > > > > > > > > >> > 1. I will discuss this with some expert SQL > developers. > > > ... > > > > > mode > > > > > > > > for StreamRecord processing. > > > > > > > > > >> > > > > > > > > > >> In DataStream API there should also be use cases when > the > > > > order > > > > > of > > > > > > > > > >> output is strictly required. I agree with it that SQL > > > experts > > > > > may > > > > > > > help > > > > > > > > > >> provide more concrete use cases that can accelerate our > > > > > > discussion, > > > > > > > > > >> but please allow me to search for DataStream use cases > > that > > > > can > > > > > > > prove > > > > > > > > > >> the necessity of this strict order preservation mode, if > > > > answers > > > > > > > from > > > > > > > > > >> SQL experts are shown to be negative. > > > > > > > > > >> > > > > > > > > > >> For your convenience, my current rough idea is that we > can > > > > add a > > > > > > > > > >> module between the Input(s) and processElement() module > in > > > > Fig 2 > > > > > > of > > > > > > > > > >> FLIP-425. The module will be responsible for caching > > records > > > > > whose > > > > > > > > > >> keys collide with in-flight records, and AEC will only > be > > > > > > > responsible > > > > > > > > > >> for handling async state calls, without knowing the > record > > > > each > > > > > > call > > > > > > > > > >> belongs to. We may revisit this topic once the necessity > > of > > > > the > > > > > > > strict > > > > > > > > > >> order mode is clarified. > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > 2. The amount of parallel StateRequests ... instead of > > > > > invoking > > > > > > > > yield > > > > > > > > > >> > > > > > > > > > >> Your suggestions generally appeal to me. I think we may > > let > > > > > > > > > >> corresponding Flink jobs fail with OOM for now, since > the > > > > > majority > > > > > > > of > > > > > > > > > >> a StateRequest should just be references to existing > Java > > > > > objects, > > > > > > > > > >> which only occupies very small memory space and can > hardly > > > > cause > > > > > > OOM > > > > > > > > > >> in common cases. We can monitor the pending > StateRequests > > > and > > > > if > > > > > > > there > > > > > > > > > >> is really a risk of OOM in extreme cases, we can throw > > > > > Exceptions > > > > > > > with > > > > > > > > > >> proper messages notifying users what to do, like > > increasing > > > > > memory > > > > > > > > > >> through configurations. > > > > > > > > > >> > > > > > > > > > >> Your suggestions to adjust threshold adaptively or to > use > > > the > > > > > > > blocking > > > > > > > > > >> buffer sounds good, and in my opinion we can postpone > them > > > to > > > > > > future > > > > > > > > > >> FLIPs since they seem to only benefit users in rare > cases. > > > > Given > > > > > > > that > > > > > > > > > >> FLIP-423~428 has already been a big enough design, it > > might > > > be > > > > > > > better > > > > > > > > > >> to focus on the most critical design for now and > postpone > > > > > > > > > >> optimizations like this. WDYT? > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > 5. Sure, we will introduce new configs as well as > their > > > > > default > > > > > > > > value. > > > > > > > > > >> > > > > > > > > > >> Thanks for adding the default values and the values > > > themselves > > > > > > LGTM. > > > > > > > > > >> It might be better to move the default values to the > > > Proposed > > > > > > > Changes > > > > > > > > > >> section instead of making them public for now, as there > > will > > > > be > > > > > > > > > >> compatibility issues once we want to dynamically adjust > > the > > > > > > > thresholds > > > > > > > > > >> and timeouts in future. > > > > > > > > > >> > > > > > > > > > >> The configuration execution.async-state.enabled seems > > > > > unnecessary, > > > > > > > as > > > > > > > > > >> the infrastructure may automatically get this > information > > > from > > > > > the > > > > > > > > > >> detailed state backend configurations. We may revisit > this > > > > part > > > > > > > after > > > > > > > > > >> the core designs have reached an agreement. WDYT? > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes to me > > that > > > > > > > > > >> > > > > > > > > > >> 8. Should this FLIP introduce metrics that measure the > > time > > > a > > > > > > Flink > > > > > > > > > >> job is back-pressured by State IOs? Under the current > > > design, > > > > > this > > > > > > > > > >> metric could measure the time when the blocking buffer > is > > > full > > > > > and > > > > > > > > > >> yield() cannot get callbacks to process, which means the > > > > > operator > > > > > > is > > > > > > > > > >> fully waiting for state responses. > > > > > > > > > >> > > > > > > > > > >> Best regards, > > > > > > > > > >> Yunfeng > > > > > > > > > >> > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan < > > > > > > zakelly....@gmail.com> > > > > > > > > wrote: > > > > > > > > > >> > > > > > > > > > > >> > Hi Yunfeng, > > > > > > > > > >> > > > > > > > > > > >> > Thanks for your detailed comments! > > > > > > > > > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on StateIterator? > > This > > > > > > method > > > > > > > > seems > > > > > > > > > >> >> unused in the usage example codes. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > The `close()` is introduced to release internal > > resources, > > > > but > > > > > > it > > > > > > > > does not seem to require the user to call it. I removed this. > > > > > > > > > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is > stated > > > that > > > > > "No > > > > > > > > null > > > > > > > > > >> >> entries are allowed". It might be better to further > > > explain > > > > > > what > > > > > > > > will > > > > > > > > > >> >> happen if a null value is passed, ignoring the value > in > > > the > > > > > > > > returned > > > > > > > > > >> >> Collection or throwing exceptions. Given that > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the > > example > > > > > code, > > > > > > I > > > > > > > > > >> >> suppose the former one might be correct. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > The statement "No null entries are allowed" refers to > > the > > > > > > > > parameters, it means some arrayList like [null, StateFuture1, > > > > > > > StateFuture2] > > > > > > > > passed in are not allowed, and an Exception will be thrown. > > > > > > > > > >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This > > situation > > > > > should > > > > > > > be > > > > > > > > > >> >> avoided and the order of same-key records should be > > > > strictly > > > > > > > > > >> >> preserved. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > I will discuss this with some expert SQL developers. > And > > > if > > > > it > > > > > > is > > > > > > > > valid and common, I suggest a strict order preservation mode > > for > > > > > > > > StreamRecord processing. WDYT? > > > > > > > > > >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by > > > Callbacks > > > > > will > > > > > > > not > > > > > > > > > >> >> invoke further yield() methods. Given that yield() is > > > used > > > > > when > > > > > > > > there > > > > > > > > > >> >> is "too much" in-flight data, does it mean > > StateRequests > > > > > > > submitted > > > > > > > > by > > > > > > > > > >> >> Callbacks will never be "too much"? What if the total > > > > number > > > > > of > > > > > > > > > >> >> StateRequests exceed the capacity of Flink operator's > > > > memory > > > > > > > space? > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > The amount of parallel StateRequests for one > > StreamRecord > > > > > cannot > > > > > > > be > > > > > > > > determined since the code is written by users. So the > in-flight > > > > > > requests > > > > > > > > may be "too much", and may cause OOM. Users should > re-configure > > > > their > > > > > > > job, > > > > > > > > controlling the amount of on-going StreamRecord. And I > suggest > > > two > > > > > ways > > > > > > > to > > > > > > > > avoid this: > > > > > > > > > >> > > > > > > > > > > >> > Adaptively adjust the count of on-going StreamRecord > > > > according > > > > > > to > > > > > > > > historical StateRequests amount. > > > > > > > > > >> > Also control the max StateRequests that can be > executed > > in > > > > > > > parallel > > > > > > > > for each StreamRecord, and if it exceeds, put the new > > > StateRequest > > > > in > > > > > > the > > > > > > > > blocking buffer waiting for execution (instead of invoking > > > > yield()). > > > > > > > > > >> > > > > > > > > > > >> > WDYT? > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order execution > mode, > > > > along > > > > > > > with > > > > > > > > the > > > > > > > > > >> >> epoch mechanism, would bring more complexity to the > > > > execution > > > > > > > model > > > > > > > > > >> >> than the performance improvement it promises. Could > we > > > add > > > > > some > > > > > > > > > >> >> benchmark results proving the benefit of this mode? > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Agreed, will do. > > > > > > > > > >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API section > > > > > describing > > > > > > > how > > > > > > > > > >> >> users or developers can switch between these two > > > execution > > > > > > modes. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Good point. We will add a Public API section. > > > > > > > > > >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint mentioned > > in > > > > this > > > > > > > FLIP, > > > > > > > > > >> >> there are also more other events that might appear in > > the > > > > > > stream > > > > > > > of > > > > > > > > > >> >> data records. It might be better to generalize the > > > > execution > > > > > > mode > > > > > > > > > >> >> mechanism to handle all possible events. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the reminder. > > > > > > > > > >> > > > > > > > > > > >> >> 4. It might be better to treat callback-handling as a > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid the > > > > overhead > > > > > > of > > > > > > > > > >> >> repeatedly creating Mail objects. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > I thought the intermediated wrapper for callback can > > not > > > be > > > > > > > > omitted, since there will be some context switch before each > > > > > execution. > > > > > > > The > > > > > > > > MailboxDefaultAction in most cases is processInput right? > While > > > the > > > > > > > > callback should be executed with higher priority. I'd suggest > > not > > > > > > > changing > > > > > > > > the basic logic of Mailbox and the default action since it is > > > very > > > > > > > critical > > > > > > > > for performance. But yes, we will try our best to avoid > > creating > > > > > > > > intermediated objects. > > > > > > > > > >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current default values > > for > > > > > > things > > > > > > > > like > > > > > > > > > >> >> active buffer size thresholds and timeouts? These > could > > > > help > > > > > > with > > > > > > > > > >> >> memory consumption and latency analysis. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Sure, we will introduce new configs as well as their > > > default > > > > > > > value. > > > > > > > > > >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a record > in > > > its > > > > > > > > > >> >> RecordContext? It seems not used. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > The context switch before each callback execution > > involves > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We cache > it > > > for > > > > > > > > accelerating. > > > > > > > > > >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or > > RocksDB", > > > > the > > > > > > link > > > > > > > > > >> >> points to a document in flink-1.15. It might be > better > > to > > > > > > verify > > > > > > > > the > > > > > > > > > >> >> referenced content is still valid in the latest Flink > > and > > > > > > update > > > > > > > > the > > > > > > > > > >> >> link accordingly. Same for other references if any. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Thanks for the reminder! Will check. > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > Thanks a lot & Best, > > > > > > > > > >> > Zakelly > > > > > > > > > >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov < > > > > > > > je.kari...@gmail.com> > > > > > > > > wrote: > > > > > > > > > >> >> > > > > > > > > > >> >> Hi, > > > > > > > > > >> >> > > > > > > > > > >> >> Thanks for the great proposals. I have a few comments > > > > > comments: > > > > > > > > > >> >> > > > > > > > > > >> >> - Backpressure Handling. Flink's original > backpressure > > > > > handling > > > > > > > is > > > > > > > > quite > > > > > > > > > >> >> robust and the semantics is quite "simple" (simple is > > > > > > beautiful). > > > > > > > > > >> >> This mechanism has proven to perform better/robust > than > > > the > > > > > > other > > > > > > > > open > > > > > > > > > >> >> source streaming systems, where they were relying on > > some > > > > > > > loopback > > > > > > > > > >> >> information. > > > > > > > > > >> >> Now that the proposal also relies on loopback (yield > in > > > > this > > > > > > > > case), it is > > > > > > > > > >> >> not clear how well the new backpressure handling > > proposed > > > > in > > > > > > > > FLIP-425 is > > > > > > > > > >> >> robust and handle fluctuating workloads. > > > > > > > > > >> >> > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments apply > for > > > > > > watermark > > > > > > > > and timer > > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing the > > > > overhead > > > > > > > > > >> >> of epoch management with different parameters (e.g., > > > window > > > > > > size, > > > > > > > > watermark > > > > > > > > > >> >> strategy, etc) > > > > > > > > > >> >> > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in > FLIP-427 > > is > > > > > > > > DFS-agnostic. > > > > > > > > > >> >> However, different cloud providers have different > > storage > > > > > > > > consistency > > > > > > > > > >> >> models. > > > > > > > > > >> >> How do we want to deal with them? > > > > > > > > > >> >> > > > > > > > > > >> >> Regards, > > > > > > > > > >> >> Jeyhun > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan < > > > > > > > zakelly....@gmail.com> > > > > > > > > wrote: > > > > > > > > > >> >> > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts! > > > > > > > > > >> >> > > > > > > > > > > >> >> > I guess it depends how we would like to treat the > > local > > > > > > disks. > > > > > > > > I've always > > > > > > > > > >> >> > > thought about them that almost always eventually > > all > > > > > state > > > > > > > > from the DFS > > > > > > > > > >> >> > > should end up cached in the local disks. > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat local disk as > > an > > > > > > optional > > > > > > > > cache, so > > > > > > > > > >> >> > the basic design will handle the case with state > > > residing > > > > > in > > > > > > > DFS > > > > > > > > only. It > > > > > > > > > >> >> > is a more 'cloud-native' approach that does not > rely > > on > > > > any > > > > > > > > local storage > > > > > > > > > >> >> > assumptions, which allow users to dynamically > adjust > > > the > > > > > > > > capacity or I/O > > > > > > > > > >> >> > bound of remote storage to gain performance or save > > the > > > > > cost, > > > > > > > > even without > > > > > > > > > >> >> > a job restart. > > > > > > > > > >> >> > > > > > > > > > > >> >> > In > > > > > > > > > >> >> > > the currently proposed more fine grained > solution, > > > you > > > > > > make a > > > > > > > > single > > > > > > > > > >> >> > > request to DFS per each state access. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the > state > > > > > requests > > > > > > > > and process > > > > > > > > > >> >> > them in batch, multiple requests will correspond to > > one > > > > DFS > > > > > > > > access (One > > > > > > > > > >> >> > block access for multiple keys performed by > RocksDB). > > > > > > > > > >> >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you requesting > > the > > > > > state > > > > > > > > > >> >> > > asynchronously from local disks into memory? If > the > > > > > benefit > > > > > > > > comes from > > > > > > > > > >> >> > > parallel I/O, then I would expect the benefit to > > > > > > > > disappear/shrink when > > > > > > > > > >> >> > > running multiple subtasks on the same machine, as > > > they > > > > > > would > > > > > > > > be making > > > > > > > > > >> >> > > their own parallel requests, right? Also enabling > > > > > > > > checkpointing would > > > > > > > > > >> >> > > further cut into the available I/O budget. > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > That's an interesting topic. Our proposal is > > > specifically > > > > > > aimed > > > > > > > > at the > > > > > > > > > >> >> > scenario where the machine I/O is not fully loaded > > but > > > > the > > > > > > I/O > > > > > > > > latency has > > > > > > > > > >> >> > indeed become a bottleneck for each subtask. While > > the > > > > > > > > distributed file > > > > > > > > > >> >> > system is a prime example of a scenario > characterized > > > by > > > > > > > > abundant and > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with higher > I/O > > > > > > latency. > > > > > > > > You may > > > > > > > > > >> >> > expect to increase the parallelism of a job to > > enhance > > > > the > > > > > > > > performance as > > > > > > > > > >> >> > well, but that also brings in more waste of CPU's > and > > > > > memory > > > > > > > for > > > > > > > > building > > > > > > > > > >> >> > up more subtasks. This is one drawback for the > > > > > > > > computation-storage tightly > > > > > > > > > >> >> > coupled nodes. While in our proposal, the parallel > > I/O > > > > with > > > > > > all > > > > > > > > the > > > > > > > > > >> >> > callbacks still running in one task, pre-allocated > > > > > > > computational > > > > > > > > resources > > > > > > > > > >> >> > are better utilized. It is a much more lightweight > > way > > > to > > > > > > > > perform parallel > > > > > > > > > >> >> > I/O. > > > > > > > > > >> >> > > > > > > > > > > >> >> > Just with what granularity those async requests > > should > > > be > > > > > > made. > > > > > > > > > >> >> > > Making state access asynchronous is definitely > the > > > > right > > > > > > way > > > > > > > > to go! > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > I think the current proposal is based on such core > > > ideas: > > > > > > > > > >> >> > > > > > > > > > > >> >> > - A pure cloud-native disaggregated state. > > > > > > > > > >> >> > - Fully utilize the given resources and try not > to > > > > waste > > > > > > > them > > > > > > > > (including > > > > > > > > > >> >> > I/O). > > > > > > > > > >> >> > - The ability to scale isolated resources (I/O > or > > > CPU > > > > or > > > > > > > > memory) > > > > > > > > > >> >> > independently. > > > > > > > > > >> >> > > > > > > > > > > >> >> > We think a fine-grained granularity is more inline > > with > > > > > those > > > > > > > > ideas, > > > > > > > > > >> >> > especially without local disk assumptions and > without > > > any > > > > > > waste > > > > > > > > of I/O by > > > > > > > > > >> >> > prefetching. Please note that it is not a > replacement > > > of > > > > > the > > > > > > > > original local > > > > > > > > > >> >> > state with synchronous execution. Instead this is a > > > > > solution > > > > > > > > embracing the > > > > > > > > > >> >> > cloud-native era, providing much more scalability > and > > > > > > resource > > > > > > > > efficiency > > > > > > > > > >> >> > when handling a *huge state*. > > > > > > > > > >> >> > > > > > > > > > > >> >> > What also worries me a lot in this fine grained > model > > > is > > > > > the > > > > > > > > effect on the > > > > > > > > > >> >> > > checkpointing times. > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster > > checkpointing > > > > is > > > > > > > > always a core > > > > > > > > > >> >> > advantage of disaggregated state, but only for the > > > async > > > > > > phase. > > > > > > > > There will > > > > > > > > > >> >> > be some complexity introduced by in-flight > requests, > > > but > > > > > I'd > > > > > > > > suggest a > > > > > > > > > >> >> > checkpoint containing those in-flight state > requests > > as > > > > > part > > > > > > of > > > > > > > > the state, > > > > > > > > > >> >> > to accelerate the sync phase by skipping the buffer > > > > > draining. > > > > > > > > This makes > > > > > > > > > >> >> > the buffer size have little impact on checkpoint > > time. > > > > And > > > > > > all > > > > > > > > the changes > > > > > > > > > >> >> > keep within the execution model we proposed while > the > > > > > > > checkpoint > > > > > > > > barrier > > > > > > > > > >> >> > alignment or handling will not be touched in our > > > > proposal, > > > > > > so I > > > > > > > > guess > > > > > > > > > >> >> > the complexity is relatively controllable. I have > > faith > > > > in > > > > > > that > > > > > > > > :) > > > > > > > > > >> >> > > > > > > > > > > >> >> > Also regarding the overheads, it would be great if > > you > > > > > could > > > > > > > > provide > > > > > > > > > >> >> > > profiling results of the benchmarks that you > > > conducted > > > > to > > > > > > > > verify the > > > > > > > > > >> >> > > results. Or maybe if you could describe the steps > > to > > > > > > > reproduce > > > > > > > > the > > > > > > > > > >> >> > results? > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with > async > > > > API". > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for the > > > > comparison > > > > > > of > > > > > > > > "Hashmap > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we conduct a > > > > Wordcount > > > > > > job > > > > > > > > written > > > > > > > > > >> >> > with async APIs but disabling the async execution > by > > > > > directly > > > > > > > > completing > > > > > > > > > >> >> > the future using sync state access. This evaluates > > the > > > > > > overhead > > > > > > > > of newly > > > > > > > > > >> >> > introduced modules like 'AEC' in sync execution > (even > > > > > though > > > > > > > > they are not > > > > > > > > > >> >> > designed for it). The code will be provided later. > > For > > > > > other > > > > > > > > results of our > > > > > > > > > >> >> > PoC[1], you can follow the instructions here[2] to > > > > > reproduce. > > > > > > > > Since the > > > > > > > > > >> >> > compilation may take some effort, we will directly > > > > provide > > > > > > the > > > > > > > > jar for > > > > > > > > > >> >> > testing next week. > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it > > is a > > > > bit > > > > > > > late > > > > > > > > in my > > > > > > > > > >> >> > local time and the next few days are weekends. So I > > > will > > > > > > reply > > > > > > > > to you > > > > > > > > > >> >> > later. Thanks for your response! > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > [1] > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults > > > > > > > > > >> >> > [2] > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > Best, > > > > > > > > > >> >> > Zakelly > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou < > > > > > > > > flink.zhouyunf...@gmail.com> > > > > > > > > > >> >> > wrote: > > > > > > > > > >> >> > > > > > > > > > > >> >> > > Hi, > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > Thanks for proposing this design! I just read > > > FLIP-424 > > > > > and > > > > > > > > FLIP-425 > > > > > > > > > >> >> > > and have some questions about the proposed > changes. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > For Async API (FLIP-424) > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 1. Why do we need a close() method on > > StateIterator? > > > > This > > > > > > > > method seems > > > > > > > > > >> >> > > unused in the usage example codes. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is > > > stated > > > > > that > > > > > > > > "No null > > > > > > > > > >> >> > > entries are allowed". It might be better to > further > > > > > explain > > > > > > > > what will > > > > > > > > > >> >> > > happen if a null value is passed, ignoring the > > value > > > in > > > > > the > > > > > > > > returned > > > > > > > > > >> >> > > Collection or throwing exceptions. Given that > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned in the > > > > example > > > > > > > code, > > > > > > > > I > > > > > > > > > >> >> > > suppose the former one might be correct. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > For Async Execution (FLIP-425) > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB > > has > > > > its > > > > > > key > > > > > > > > collide > > > > > > > > > >> >> > > with an ongoing recordA, its processElement() > > method > > > > can > > > > > > > still > > > > > > > > be > > > > > > > > > >> >> > > triggered immediately, and then it might be moved > > to > > > > the > > > > > > > > blocking > > > > > > > > > >> >> > > buffer in AEC if it involves state operations. > This > > > > means > > > > > > > that > > > > > > > > > >> >> > > recordB's output will precede recordA's output in > > > > > > downstream > > > > > > > > > >> >> > > operators, if recordA involves state operations > > while > > > > > > recordB > > > > > > > > does > > > > > > > > > >> >> > > not. This will harm the correctness of Flink jobs > > in > > > > some > > > > > > use > > > > > > > > cases. > > > > > > > > > >> >> > > For example, in dim table join cases, recordA > could > > > be > > > > a > > > > > > > delete > > > > > > > > > >> >> > > operation that involves state access, while > recordB > > > > could > > > > > > be > > > > > > > > an insert > > > > > > > > > >> >> > > operation that needs to visit external storage > > > without > > > > > > state > > > > > > > > access. > > > > > > > > > >> >> > > If recordB's output precedes recordA's, then an > > entry > > > > > that > > > > > > is > > > > > > > > supposed > > > > > > > > > >> >> > > to finally exist with recordB's value in the sink > > > table > > > > > > might > > > > > > > > actually > > > > > > > > > >> >> > > be deleted according to recordA's command. This > > > > situation > > > > > > > > should be > > > > > > > > > >> >> > > avoided and the order of same-key records should > be > > > > > > strictly > > > > > > > > > >> >> > > preserved. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests submitted by > > > > > Callbacks > > > > > > > > will not > > > > > > > > > >> >> > > invoke further yield() methods. Given that > yield() > > is > > > > > used > > > > > > > > when there > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean > > > > StateRequests > > > > > > > > submitted by > > > > > > > > > >> >> > > Callbacks will never be "too much"? What if the > > total > > > > > > number > > > > > > > of > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink > > operator's > > > > > > memory > > > > > > > > space? > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP provided > > an > > > > > > > > out-of-order > > > > > > > > > >> >> > > execution mode apart from the default > > > strictly-ordered > > > > > > mode, > > > > > > > > which can > > > > > > > > > >> >> > > optimize performance by allowing more concurrent > > > > > > executions. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order execution > > > mode, > > > > > > along > > > > > > > > with the > > > > > > > > > >> >> > > epoch mechanism, would bring more complexity to > the > > > > > > execution > > > > > > > > model > > > > > > > > > >> >> > > than the performance improvement it promises. > Could > > > we > > > > > add > > > > > > > some > > > > > > > > > >> >> > > benchmark results proving the benefit of this > mode? > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API > section > > > > > > > describing > > > > > > > > how > > > > > > > > > >> >> > > users or developers can switch between these two > > > > > execution > > > > > > > > modes. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint > > mentioned > > > > in > > > > > > this > > > > > > > > FLIP, > > > > > > > > > >> >> > > there are also more other events that might > appear > > in > > > > the > > > > > > > > stream of > > > > > > > > > >> >> > > data records. It might be better to generalize > the > > > > > > execution > > > > > > > > mode > > > > > > > > > >> >> > > mechanism to handle all possible events. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 4. It might be better to treat callback-handling > > as a > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid > > the > > > > > > overhead > > > > > > > > of > > > > > > > > > >> >> > > repeatedly creating Mail objects. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current default > > values > > > > for > > > > > > > > things like > > > > > > > > > >> >> > > active buffer size thresholds and timeouts? These > > > could > > > > > > help > > > > > > > > with > > > > > > > > > >> >> > > memory consumption and latency analysis. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of a > > record > > > in > > > > > its > > > > > > > > > >> >> > > RecordContext? It seems not used. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap or > > > > RocksDB", > > > > > > the > > > > > > > > link > > > > > > > > > >> >> > > points to a document in flink-1.15. It might be > > > better > > > > to > > > > > > > > verify the > > > > > > > > > >> >> > > referenced content is still valid in the latest > > Flink > > > > and > > > > > > > > update the > > > > > > > > > >> >> > > link accordingly. Same for other references if > any. > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > Best, > > > > > > > > > >> >> > > Yunfeng Zhou > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei < > > > > > > > > yuanmei.w...@gmail.com> wrote: > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > Hi Devs, > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, > > > > Jinzhong > > > > > > Li, > > > > > > > > Hangxiang > > > > > > > > > >> >> > Yu, > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a > > > > > discussion > > > > > > > > about > > > > > > > > > >> >> > > introducing > > > > > > > > > >> >> > > > Disaggregated State Storage and Management in > > Flink > > > > > 2.0. > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic shift > in > > > > > Flink's > > > > > > > > deployment > > > > > > > > > >> >> > > mode, > > > > > > > > > >> >> > > > workload patterns, and hardware improvements. > > We've > > > > > moved > > > > > > > > from the > > > > > > > > > >> >> > > > map-reduce era where workers are > > > computation-storage > > > > > > > tightly > > > > > > > > coupled > > > > > > > > > >> >> > > nodes > > > > > > > > > >> >> > > > to a cloud-native world where containerized > > > > deployments > > > > > > on > > > > > > > > Kubernetes > > > > > > > > > >> >> > > > become standard. To enable Flink's Cloud-Native > > > > future, > > > > > > we > > > > > > > > introduce > > > > > > > > > >> >> > > > Disaggregated State Storage and Management that > > > uses > > > > > DFS > > > > > > as > > > > > > > > primary > > > > > > > > > >> >> > > storage > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 > > Roadmap. > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > Design Details can be found in FLIP-423[1]. > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > This new architecture is aimed to solve the > > > following > > > > > > > > challenges > > > > > > > > > >> >> > brought > > > > > > > > > >> >> > > in > > > > > > > > > >> >> > > > the cloud-native era for Flink. > > > > > > > > > >> >> > > > 1. Local Disk Constraints in containerization > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by compaction in > > the > > > > > > current > > > > > > > > state model > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large states > > > > (hundreds > > > > > of > > > > > > > > Terabytes) > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native way > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > More specifically, we want to reach a consensus > > on > > > > the > > > > > > > > following issues > > > > > > > > > >> >> > > in > > > > > > > > > >> >> > > > this discussion: > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > 1. Overall design > > > > > > > > > >> >> > > > 2. Proposed Changes > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1 > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline > > > > version > > > > > > > > using DFS as > > > > > > > > > >> >> > > > primary storage and complete core > > functionalities, > > > > > > > including: > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: > > Introduce > > > > new > > > > > > APIs > > > > > > > > for > > > > > > > > > >> >> > > > asynchronous state access. > > > > > > > > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: > > > > > Implement a > > > > > > > > non-blocking > > > > > > > > > >> >> > > > execution model leveraging the asynchronous > APIs > > > > > > introduced > > > > > > > > in > > > > > > > > > >> >> > FLIP-424. > > > > > > > > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: > > > Enable > > > > > > > > retrieval of > > > > > > > > > >> >> > remote > > > > > > > > > >> >> > > > state data in batches to avoid unnecessary > > > round-trip > > > > > > costs > > > > > > > > for remote > > > > > > > > > >> >> > > > access > > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: > > > Introduce > > > > > the > > > > > > > > initial > > > > > > > > > >> >> > version > > > > > > > > > >> >> > > of > > > > > > > > > >> >> > > > the ForSt disaggregated state store. > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration > > > (FLIP-428)[6]: > > > > > > > > Integrate > > > > > > > > > >> >> > > > checkpointing mechanisms with the disaggregated > > > state > > > > > > store > > > > > > > > for fault > > > > > > > > > >> >> > > > tolerance and fast rescaling. > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate threads > to > > > make > > > > > > sure > > > > > > > > each FLIP > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep the > > > > discussion > > > > > > > > within a > > > > > > > > > >> >> > focused > > > > > > > > > >> >> > > > thread (this thread) for easier tracking of > > > contexts > > > > to > > > > > > > avoid > > > > > > > > > >> >> > duplicated > > > > > > > > > >> >> > > > questions/discussions and also to think of the > > > > > > > > problem/solution in a > > > > > > > > > >> >> > full > > > > > > > > > >> >> > > > picture. > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > Looking forward to your feedback > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > Best, > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and > > Feng > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > [1] > https://cwiki.apache.org/confluence/x/R4p3EQ > > > > > > > > > >> >> > > > [2] > https://cwiki.apache.org/confluence/x/SYp3EQ > > > > > > > > > >> >> > > > [3] > https://cwiki.apache.org/confluence/x/S4p3EQ > > > > > > > > > >> >> > > > [4] > https://cwiki.apache.org/confluence/x/TYp3EQ > > > > > > > > > >> >> > > > [5] > https://cwiki.apache.org/confluence/x/T4p3EQ > > > > > > > > > >> >> > > > [6] > https://cwiki.apache.org/confluence/x/UYp3EQ > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >