Hi, + 1 for the suggestion. Maybe we can the discussion with the FLIPs with minimum dependencies (from the other new/proposed FLIPs). Based on our discussion on a particular FLIP, the subsequent (or its dependent) FLIP(s) can be updated accordingly?
Regards, Jeyhun On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hey all! > > This is a massive improvement / work. I just started going through the > Flips and have a more or less meta comment. > > While it's good to keep the overall architecture discussion here, I think > we should still have separate discussions for each FLIP where we can > discuss interface details etc. With so much content if we start adding > minor comments here that will lead to nowhere but those discussions are > still important and we should have them in separate threads (one for each > FLIP) > > What do you think? > Gyula > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com> wrote: > > > Hi team, > > > > Thanks for your discussion. Regarding FLIP-425, we have supplemented > > several updates to answer high-frequency questions: > > > > 1. We captured a flame graph of the Hashmap state backend in > > "Synchronous execution with asynchronous APIs"[1], which reveals that > > the framework overhead (including reference counting, future-related > > code and so on) consumes about 9% of the keyed operator CPU time. > > 2. We added a set of comparative experiments for watermark processing, > > the performance of Out-Of-Order mode is 70% better than > > strictly-ordered mode under ~140MB state size. Instructions on how to > > run this test have also been added[2]. > > 3. Regarding the order of StreamRecord, whether it has state access or > > not. We supplemented a new *Strict order of 'processElement'*[3]. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing > > [3] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder > > > > > > Best regards, > > Yanfei > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道: > > > > > > Hi Zakelly, > > > > > > > 5. I'm not very sure ... revisiting this later since it is not > > important. > > > > > > It seems that we still have some details to confirm about this > > > question. Let's postpone this to after the critical parts of the > > > design are settled. > > > > > > > 8. Yes, we had considered ... metrics should be like afterwards. > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in > > milestone 2. > > > > > > Looking forward to the detailed design about the strict mode between > > > same-key records and the benchmark results about the epoch mechanism. > > > > > > Best regards, > > > Yunfeng > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > > > > Hi Yunfeng, > > > > > > > > For 1: > > > > I had a discussion with Lincoln Lee, and I realize it is a common > case > > the same-key record should be blocked before the `processElement`. It is > > easier for users to understand. Thus I will introduce a strict mode for > > this and make it default. My rough idea is just like yours, by invoking > > some method of AEC instance before `processElement`. The detailed design > > will be described in FLIP later. > > > > > > > > For 2: > > > > I agree with you. We could throw exceptions for now and optimize this > > later. > > > > > > > > For 5: > > > >> > > > >> It might be better to move the default values to the Proposed > Changes > > > >> section instead of making them public for now, as there will be > > > >> compatibility issues once we want to dynamically adjust the > thresholds > > > >> and timeouts in future. > > > > > > > > Agreed. The whole framework is under experiment until we think it is > > complete in 2.0 or later. The default value should be better determined > > with more testing results and production experience. > > > > > > > >> The configuration execution.async-state.enabled seems unnecessary, > as > > > >> the infrastructure may automatically get this information from the > > > >> detailed state backend configurations. We may revisit this part > after > > > >> the core designs have reached an agreement. WDYT? > > > > > > > > > > > > I'm not very sure if there is any use case where users write their > > code using async APIs but run their job in a synchronous way. The first > two > > scenarios that come to me are for benchmarking or for a small state, > while > > they don't want to rewrite their code. Actually it is easy to support, so > > I'd suggest providing it. But I'm fine with revisiting this later since > it > > is not important. WDYT? > > > > > > > > For 8: > > > > Yes, we had considered the I/O metrics group especially the > > back-pressure, idle and task busy per second. In the current plan we can > do > > state access during back-pressure, meaning that those metrics for input > > would better be redefined. I suggest we discuss these existing metrics as > > well as some new metrics that should be introduced in FLIP-431 later in > > milestone 2, since we have basically finished the framework thus we will > > have a better view of what metrics should be like afterwards. WDYT? > > > > > > > > > > > > Best, > > > > Zakelly > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou < > > flink.zhouyunf...@gmail.com> wrote: > > > >> > > > >> Hi Zakelly, > > > >> > > > >> Thanks for the responses! > > > >> > > > >> > 1. I will discuss this with some expert SQL developers. ... mode > > for StreamRecord processing. > > > >> > > > >> In DataStream API there should also be use cases when the order of > > > >> output is strictly required. I agree with it that SQL experts may > help > > > >> provide more concrete use cases that can accelerate our discussion, > > > >> but please allow me to search for DataStream use cases that can > prove > > > >> the necessity of this strict order preservation mode, if answers > from > > > >> SQL experts are shown to be negative. > > > >> > > > >> For your convenience, my current rough idea is that we can add a > > > >> module between the Input(s) and processElement() module in Fig 2 of > > > >> FLIP-425. The module will be responsible for caching records whose > > > >> keys collide with in-flight records, and AEC will only be > responsible > > > >> for handling async state calls, without knowing the record each call > > > >> belongs to. We may revisit this topic once the necessity of the > strict > > > >> order mode is clarified. > > > >> > > > >> > > > >> > 2. The amount of parallel StateRequests ... instead of invoking > > yield > > > >> > > > >> Your suggestions generally appeal to me. I think we may let > > > >> corresponding Flink jobs fail with OOM for now, since the majority > of > > > >> a StateRequest should just be references to existing Java objects, > > > >> which only occupies very small memory space and can hardly cause OOM > > > >> in common cases. We can monitor the pending StateRequests and if > there > > > >> is really a risk of OOM in extreme cases, we can throw Exceptions > with > > > >> proper messages notifying users what to do, like increasing memory > > > >> through configurations. > > > >> > > > >> Your suggestions to adjust threshold adaptively or to use the > blocking > > > >> buffer sounds good, and in my opinion we can postpone them to future > > > >> FLIPs since they seem to only benefit users in rare cases. Given > that > > > >> FLIP-423~428 has already been a big enough design, it might be > better > > > >> to focus on the most critical design for now and postpone > > > >> optimizations like this. WDYT? > > > >> > > > >> > > > >> > 5. Sure, we will introduce new configs as well as their default > > value. > > > >> > > > >> Thanks for adding the default values and the values themselves LGTM. > > > >> It might be better to move the default values to the Proposed > Changes > > > >> section instead of making them public for now, as there will be > > > >> compatibility issues once we want to dynamically adjust the > thresholds > > > >> and timeouts in future. > > > >> > > > >> The configuration execution.async-state.enabled seems unnecessary, > as > > > >> the infrastructure may automatically get this information from the > > > >> detailed state backend configurations. We may revisit this part > after > > > >> the core designs have reached an agreement. WDYT? > > > >> > > > >> > > > >> Besides, inspired by Jeyhun's comments, it comes to me that > > > >> > > > >> 8. Should this FLIP introduce metrics that measure the time a Flink > > > >> job is back-pressured by State IOs? Under the current design, this > > > >> metric could measure the time when the blocking buffer is full and > > > >> yield() cannot get callbacks to process, which means the operator is > > > >> fully waiting for state responses. > > > >> > > > >> Best regards, > > > >> Yunfeng > > > >> > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > >> > > > > >> > Hi Yunfeng, > > > >> > > > > >> > Thanks for your detailed comments! > > > >> > > > > >> >> 1. Why do we need a close() method on StateIterator? This method > > seems > > > >> >> unused in the usage example codes. > > > >> > > > > >> > > > > >> > The `close()` is introduced to release internal resources, but it > > does not seem to require the user to call it. I removed this. > > > >> > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No > > null > > > >> >> entries are allowed". It might be better to further explain what > > will > > > >> >> happen if a null value is passed, ignoring the value in the > > returned > > > >> >> Collection or throwing exceptions. Given that > > > >> >> FutureUtils.emptyFuture() can be returned in the example code, I > > > >> >> suppose the former one might be correct. > > > >> > > > > >> > > > > >> > The statement "No null entries are allowed" refers to the > > parameters, it means some arrayList like [null, StateFuture1, > StateFuture2] > > passed in are not allowed, and an Exception will be thrown. > > > >> > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This situation should > be > > > >> >> avoided and the order of same-key records should be strictly > > > >> >> preserved. > > > >> > > > > >> > > > > >> > I will discuss this with some expert SQL developers. And if it is > > valid and common, I suggest a strict order preservation mode for > > StreamRecord processing. WDYT? > > > >> > > > > >> >> 2. The FLIP says that StateRequests submitted by Callbacks will > not > > > >> >> invoke further yield() methods. Given that yield() is used when > > there > > > >> >> is "too much" in-flight data, does it mean StateRequests > submitted > > by > > > >> >> Callbacks will never be "too much"? What if the total number of > > > >> >> StateRequests exceed the capacity of Flink operator's memory > space? > > > >> > > > > >> > > > > >> > The amount of parallel StateRequests for one StreamRecord cannot > be > > determined since the code is written by users. So the in-flight requests > > may be "too much", and may cause OOM. Users should re-configure their > job, > > controlling the amount of on-going StreamRecord. And I suggest two ways > to > > avoid this: > > > >> > > > > >> > Adaptively adjust the count of on-going StreamRecord according to > > historical StateRequests amount. > > > >> > Also control the max StateRequests that can be executed in > parallel > > for each StreamRecord, and if it exceeds, put the new StateRequest in the > > blocking buffer waiting for execution (instead of invoking yield()). > > > >> > > > > >> > WDYT? > > > >> > > > > >> > > > > >> >> 3.1 I'm concerned that the out-of-order execution mode, along > with > > the > > > >> >> epoch mechanism, would bring more complexity to the execution > model > > > >> >> than the performance improvement it promises. Could we add some > > > >> >> benchmark results proving the benefit of this mode? > > > >> > > > > >> > > > > >> > Agreed, will do. > > > >> > > > > >> >> 3.2 The FLIP might need to add a public API section describing > how > > > >> >> users or developers can switch between these two execution modes. > > > >> > > > > >> > > > > >> > Good point. We will add a Public API section. > > > >> > > > > >> >> 3.3 Apart from the watermark and checkpoint mentioned in this > FLIP, > > > >> >> there are also more other events that might appear in the stream > of > > > >> >> data records. It might be better to generalize the execution mode > > > >> >> mechanism to handle all possible events. > > > >> > > > > >> > > > > >> > Yes, I missed this point. Thanks for the reminder. > > > >> > > > > >> >> 4. It might be better to treat callback-handling as a > > > >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of > > > >> >> repeatedly creating Mail objects. > > > >> > > > > >> > > > > >> > I thought the intermediated wrapper for callback can not be > > omitted, since there will be some context switch before each execution. > The > > MailboxDefaultAction in most cases is processInput right? While the > > callback should be executed with higher priority. I'd suggest not > changing > > the basic logic of Mailbox and the default action since it is very > critical > > for performance. But yes, we will try our best to avoid creating > > intermediated objects. > > > >> > > > > >> >> 5. Could this FLIP provide the current default values for things > > like > > > >> >> active buffer size thresholds and timeouts? These could help with > > > >> >> memory consumption and latency analysis. > > > >> > > > > >> > > > > >> > Sure, we will introduce new configs as well as their default > value. > > > >> > > > > >> >> 6. Why do we need to record the hashcode of a record in its > > > >> >> RecordContext? It seems not used. > > > >> > > > > >> > > > > >> > The context switch before each callback execution involves > > setCurrentKey, where the hashCode is re-calculated. We cache it for > > accelerating. > > > >> > > > > >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link > > > >> >> points to a document in flink-1.15. It might be better to verify > > the > > > >> >> referenced content is still valid in the latest Flink and update > > the > > > >> >> link accordingly. Same for other references if any. > > > >> > > > > >> > > > > >> > Thanks for the reminder! Will check. > > > >> > > > > >> > > > > >> > Thanks a lot & Best, > > > >> > Zakelly > > > >> > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov < > je.kari...@gmail.com> > > wrote: > > > >> >> > > > >> >> Hi, > > > >> >> > > > >> >> Thanks for the great proposals. I have a few comments comments: > > > >> >> > > > >> >> - Backpressure Handling. Flink's original backpressure handling > is > > quite > > > >> >> robust and the semantics is quite "simple" (simple is beautiful). > > > >> >> This mechanism has proven to perform better/robust than the other > > open > > > >> >> source streaming systems, where they were relying on some > loopback > > > >> >> information. > > > >> >> Now that the proposal also relies on loopback (yield in this > > case), it is > > > >> >> not clear how well the new backpressure handling proposed in > > FLIP-425 is > > > >> >> robust and handle fluctuating workloads. > > > >> >> > > > >> >> - Watermark/Timer Handling: Similar arguments apply for watermark > > and timer > > > >> >> handling. IMHO, we need more benchmarks showing the overhead > > > >> >> of epoch management with different parameters (e.g., window size, > > watermark > > > >> >> strategy, etc) > > > >> >> > > > >> >> - DFS consistency guarantees. The proposal in FLIP-427 is > > DFS-agnostic. > > > >> >> However, different cloud providers have different storage > > consistency > > > >> >> models. > > > >> >> How do we want to deal with them? > > > >> >> > > > >> >> Regards, > > > >> >> Jeyhun > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan < > zakelly....@gmail.com> > > wrote: > > > >> >> > > > >> >> > Thanks Piotr for sharing your thoughts! > > > >> >> > > > > >> >> > I guess it depends how we would like to treat the local disks. > > I've always > > > >> >> > > thought about them that almost always eventually all state > > from the DFS > > > >> >> > > should end up cached in the local disks. > > > >> >> > > > > >> >> > > > > >> >> > OK I got it. In our proposal we treat local disk as an optional > > cache, so > > > >> >> > the basic design will handle the case with state residing in > DFS > > only. It > > > >> >> > is a more 'cloud-native' approach that does not rely on any > > local storage > > > >> >> > assumptions, which allow users to dynamically adjust the > > capacity or I/O > > > >> >> > bound of remote storage to gain performance or save the cost, > > even without > > > >> >> > a job restart. > > > >> >> > > > > >> >> > In > > > >> >> > > the currently proposed more fine grained solution, you make a > > single > > > >> >> > > request to DFS per each state access. > > > >> >> > > > > > >> >> > > > > >> >> > Ah that's not accurate. Actually we buffer the state requests > > and process > > > >> >> > them in batch, multiple requests will correspond to one DFS > > access (One > > > >> >> > block access for multiple keys performed by RocksDB). > > > >> >> > > > > >> >> > In that benchmark you mentioned, are you requesting the state > > > >> >> > > asynchronously from local disks into memory? If the benefit > > comes from > > > >> >> > > parallel I/O, then I would expect the benefit to > > disappear/shrink when > > > >> >> > > running multiple subtasks on the same machine, as they would > > be making > > > >> >> > > their own parallel requests, right? Also enabling > > checkpointing would > > > >> >> > > further cut into the available I/O budget. > > > >> >> > > > > >> >> > > > > >> >> > That's an interesting topic. Our proposal is specifically aimed > > at the > > > >> >> > scenario where the machine I/O is not fully loaded but the I/O > > latency has > > > >> >> > indeed become a bottleneck for each subtask. While the > > distributed file > > > >> >> > system is a prime example of a scenario characterized by > > abundant and > > > >> >> > easily scalable I/O bandwidth coupled with higher I/O latency. > > You may > > > >> >> > expect to increase the parallelism of a job to enhance the > > performance as > > > >> >> > well, but that also brings in more waste of CPU's and memory > for > > building > > > >> >> > up more subtasks. This is one drawback for the > > computation-storage tightly > > > >> >> > coupled nodes. While in our proposal, the parallel I/O with all > > the > > > >> >> > callbacks still running in one task, pre-allocated > computational > > resources > > > >> >> > are better utilized. It is a much more lightweight way to > > perform parallel > > > >> >> > I/O. > > > >> >> > > > > >> >> > Just with what granularity those async requests should be made. > > > >> >> > > Making state access asynchronous is definitely the right way > > to go! > > > >> >> > > > > >> >> > > > > >> >> > I think the current proposal is based on such core ideas: > > > >> >> > > > > >> >> > - A pure cloud-native disaggregated state. > > > >> >> > - Fully utilize the given resources and try not to waste > them > > (including > > > >> >> > I/O). > > > >> >> > - The ability to scale isolated resources (I/O or CPU or > > memory) > > > >> >> > independently. > > > >> >> > > > > >> >> > We think a fine-grained granularity is more inline with those > > ideas, > > > >> >> > especially without local disk assumptions and without any waste > > of I/O by > > > >> >> > prefetching. Please note that it is not a replacement of the > > original local > > > >> >> > state with synchronous execution. Instead this is a solution > > embracing the > > > >> >> > cloud-native era, providing much more scalability and resource > > efficiency > > > >> >> > when handling a *huge state*. > > > >> >> > > > > >> >> > What also worries me a lot in this fine grained model is the > > effect on the > > > >> >> > > checkpointing times. > > > >> >> > > > > >> >> > > > > >> >> > Your concerns are very reasonable. Faster checkpointing is > > always a core > > > >> >> > advantage of disaggregated state, but only for the async phase. > > There will > > > >> >> > be some complexity introduced by in-flight requests, but I'd > > suggest a > > > >> >> > checkpoint containing those in-flight state requests as part of > > the state, > > > >> >> > to accelerate the sync phase by skipping the buffer draining. > > This makes > > > >> >> > the buffer size have little impact on checkpoint time. And all > > the changes > > > >> >> > keep within the execution model we proposed while the > checkpoint > > barrier > > > >> >> > alignment or handling will not be touched in our proposal, so I > > guess > > > >> >> > the complexity is relatively controllable. I have faith in that > > :) > > > >> >> > > > > >> >> > Also regarding the overheads, it would be great if you could > > provide > > > >> >> > > profiling results of the benchmarks that you conducted to > > verify the > > > >> >> > > results. Or maybe if you could describe the steps to > reproduce > > the > > > >> >> > results? > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API". > > > >> >> > > > > > >> >> > > > > >> >> > Yes we could profile the benchmarks. And for the comparison of > > "Hashmap > > > >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job > > written > > > >> >> > with async APIs but disabling the async execution by directly > > completing > > > >> >> > the future using sync state access. This evaluates the overhead > > of newly > > > >> >> > introduced modules like 'AEC' in sync execution (even though > > they are not > > > >> >> > designed for it). The code will be provided later. For other > > results of our > > > >> >> > PoC[1], you can follow the instructions here[2] to reproduce. > > Since the > > > >> >> > compilation may take some effort, we will directly provide the > > jar for > > > >> >> > testing next week. > > > >> >> > > > > >> >> > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit > late > > in my > > > >> >> > local time and the next few days are weekends. So I will reply > > to you > > > >> >> > later. Thanks for your response! > > > >> >> > > > > >> >> > > > > >> >> > [1] > > > >> >> > > > > >> >> > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults > > > >> >> > [2] > > > >> >> > > > > >> >> > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC > > > >> >> > > > > >> >> > > > > >> >> > Best, > > > >> >> > Zakelly > > > >> >> > > > > >> >> > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou < > > flink.zhouyunf...@gmail.com> > > > >> >> > wrote: > > > >> >> > > > > >> >> > > Hi, > > > >> >> > > > > > >> >> > > Thanks for proposing this design! I just read FLIP-424 and > > FLIP-425 > > > >> >> > > and have some questions about the proposed changes. > > > >> >> > > > > > >> >> > > For Async API (FLIP-424) > > > >> >> > > > > > >> >> > > 1. Why do we need a close() method on StateIterator? This > > method seems > > > >> >> > > unused in the usage example codes. > > > >> >> > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that > > "No null > > > >> >> > > entries are allowed". It might be better to further explain > > what will > > > >> >> > > happen if a null value is passed, ignoring the value in the > > returned > > > >> >> > > Collection or throwing exceptions. Given that > > > >> >> > > FutureUtils.emptyFuture() can be returned in the example > code, > > I > > > >> >> > > suppose the former one might be correct. > > > >> >> > > > > > >> >> > > > > > >> >> > > For Async Execution (FLIP-425) > > > >> >> > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key > > collide > > > >> >> > > with an ongoing recordA, its processElement() method can > still > > be > > > >> >> > > triggered immediately, and then it might be moved to the > > blocking > > > >> >> > > buffer in AEC if it involves state operations. This means > that > > > >> >> > > recordB's output will precede recordA's output in downstream > > > >> >> > > operators, if recordA involves state operations while recordB > > does > > > >> >> > > not. This will harm the correctness of Flink jobs in some use > > cases. > > > >> >> > > For example, in dim table join cases, recordA could be a > delete > > > >> >> > > operation that involves state access, while recordB could be > > an insert > > > >> >> > > operation that needs to visit external storage without state > > access. > > > >> >> > > If recordB's output precedes recordA's, then an entry that is > > supposed > > > >> >> > > to finally exist with recordB's value in the sink table might > > actually > > > >> >> > > be deleted according to recordA's command. This situation > > should be > > > >> >> > > avoided and the order of same-key records should be strictly > > > >> >> > > preserved. > > > >> >> > > > > > >> >> > > 2. The FLIP says that StateRequests submitted by Callbacks > > will not > > > >> >> > > invoke further yield() methods. Given that yield() is used > > when there > > > >> >> > > is "too much" in-flight data, does it mean StateRequests > > submitted by > > > >> >> > > Callbacks will never be "too much"? What if the total number > of > > > >> >> > > StateRequests exceed the capacity of Flink operator's memory > > space? > > > >> >> > > > > > >> >> > > 3. In the "Watermark" section, this FLIP provided an > > out-of-order > > > >> >> > > execution mode apart from the default strictly-ordered mode, > > which can > > > >> >> > > optimize performance by allowing more concurrent executions. > > > >> >> > > > > > >> >> > > 3.1 I'm concerned that the out-of-order execution mode, along > > with the > > > >> >> > > epoch mechanism, would bring more complexity to the execution > > model > > > >> >> > > than the performance improvement it promises. Could we add > some > > > >> >> > > benchmark results proving the benefit of this mode? > > > >> >> > > > > > >> >> > > 3.2 The FLIP might need to add a public API section > describing > > how > > > >> >> > > users or developers can switch between these two execution > > modes. > > > >> >> > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this > > FLIP, > > > >> >> > > there are also more other events that might appear in the > > stream of > > > >> >> > > data records. It might be better to generalize the execution > > mode > > > >> >> > > mechanism to handle all possible events. > > > >> >> > > > > > >> >> > > 4. It might be better to treat callback-handling as a > > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead > > of > > > >> >> > > repeatedly creating Mail objects. > > > >> >> > > > > > >> >> > > 5. Could this FLIP provide the current default values for > > things like > > > >> >> > > active buffer size thresholds and timeouts? These could help > > with > > > >> >> > > memory consumption and latency analysis. > > > >> >> > > > > > >> >> > > 6. Why do we need to record the hashcode of a record in its > > > >> >> > > RecordContext? It seems not used. > > > >> >> > > > > > >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the > > link > > > >> >> > > points to a document in flink-1.15. It might be better to > > verify the > > > >> >> > > referenced content is still valid in the latest Flink and > > update the > > > >> >> > > link accordingly. Same for other references if any. > > > >> >> > > > > > >> >> > > Best, > > > >> >> > > Yunfeng Zhou > > > >> >> > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei < > > yuanmei.w...@gmail.com> wrote: > > > >> >> > > > > > > >> >> > > > Hi Devs, > > > >> >> > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, > > Hangxiang > > > >> >> > Yu, > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion > > about > > > >> >> > > introducing > > > >> >> > > > Disaggregated State Storage and Management in Flink 2.0. > > > >> >> > > > > > > >> >> > > > The past decade has witnessed a dramatic shift in Flink's > > deployment > > > >> >> > > mode, > > > >> >> > > > workload patterns, and hardware improvements. We've moved > > from the > > > >> >> > > > map-reduce era where workers are computation-storage > tightly > > coupled > > > >> >> > > nodes > > > >> >> > > > to a cloud-native world where containerized deployments on > > Kubernetes > > > >> >> > > > become standard. To enable Flink's Cloud-Native future, we > > introduce > > > >> >> > > > Disaggregated State Storage and Management that uses DFS as > > primary > > > >> >> > > storage > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap. > > > >> >> > > > > > > >> >> > > > Design Details can be found in FLIP-423[1]. > > > >> >> > > > > > > >> >> > > > This new architecture is aimed to solve the following > > challenges > > > >> >> > brought > > > >> >> > > in > > > >> >> > > > the cloud-native era for Flink. > > > >> >> > > > 1. Local Disk Constraints in containerization > > > >> >> > > > 2. Spiky Resource Usage caused by compaction in the current > > state model > > > >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of > > Terabytes) > > > >> >> > > > 4. Light and Fast Checkpoint in a native way > > > >> >> > > > > > > >> >> > > > More specifically, we want to reach a consensus on the > > following issues > > > >> >> > > in > > > >> >> > > > this discussion: > > > >> >> > > > > > > >> >> > > > 1. Overall design > > > >> >> > > > 2. Proposed Changes > > > >> >> > > > 3. Design details to achieve Milestone1 > > > >> >> > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline version > > using DFS as > > > >> >> > > > primary storage and complete core functionalities, > including: > > > >> >> > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs > > for > > > >> >> > > > asynchronous state access. > > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a > > non-blocking > > > >> >> > > > execution model leveraging the asynchronous APIs introduced > > in > > > >> >> > FLIP-424. > > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable > > retrieval of > > > >> >> > remote > > > >> >> > > > state data in batches to avoid unnecessary round-trip costs > > for remote > > > >> >> > > > access > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the > > initial > > > >> >> > version > > > >> >> > > of > > > >> >> > > > the ForSt disaggregated state store. > > > >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: > > Integrate > > > >> >> > > > checkpointing mechanisms with the disaggregated state store > > for fault > > > >> >> > > > tolerance and fast rescaling. > > > >> >> > > > > > > >> >> > > > We will vote on each FLIP in separate threads to make sure > > each FLIP > > > >> >> > > > reaches a consensus. But we want to keep the discussion > > within a > > > >> >> > focused > > > >> >> > > > thread (this thread) for easier tracking of contexts to > avoid > > > >> >> > duplicated > > > >> >> > > > questions/discussions and also to think of the > > problem/solution in a > > > >> >> > full > > > >> >> > > > picture. > > > >> >> > > > > > > >> >> > > > Looking forward to your feedback > > > >> >> > > > > > > >> >> > > > Best, > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng > > > >> >> > > > > > > >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ > > > >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ > > > >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ > > > >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ > > > >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ > > > >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ > > > >> >> > > > > > >> >> > > > >