Hi team, Thanks for your discussion. Regarding FLIP-425, we have supplemented several updates to answer high-frequency questions:
1. We captured a flame graph of the Hashmap state backend in "Synchronous execution with asynchronous APIs"[1], which reveals that the framework overhead (including reference counting, future-related code and so on) consumes about 9% of the keyed operator CPU time. 2. We added a set of comparative experiments for watermark processing, the performance of Out-Of-Order mode is 70% better than strictly-ordered mode under ~140MB state size. Instructions on how to run this test have also been added[2]. 3. Regarding the order of StreamRecord, whether it has state access or not. We supplemented a new *Strict order of 'processElement'*[3]. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing [3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder Best regards, Yanfei Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道: > > Hi Zakelly, > > > 5. I'm not very sure ... revisiting this later since it is not important. > > It seems that we still have some details to confirm about this > question. Let's postpone this to after the critical parts of the > design are settled. > > > 8. Yes, we had considered ... metrics should be like afterwards. > > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in milestone > 2. > > Looking forward to the detailed design about the strict mode between > same-key records and the benchmark results about the epoch mechanism. > > Best regards, > Yunfeng > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com> wrote: > > > > Hi Yunfeng, > > > > For 1: > > I had a discussion with Lincoln Lee, and I realize it is a common case the > > same-key record should be blocked before the `processElement`. It is easier > > for users to understand. Thus I will introduce a strict mode for this and > > make it default. My rough idea is just like yours, by invoking some method > > of AEC instance before `processElement`. The detailed design will be > > described in FLIP later. > > > > For 2: > > I agree with you. We could throw exceptions for now and optimize this later. > > > > For 5: > >> > >> It might be better to move the default values to the Proposed Changes > >> section instead of making them public for now, as there will be > >> compatibility issues once we want to dynamically adjust the thresholds > >> and timeouts in future. > > > > Agreed. The whole framework is under experiment until we think it is > > complete in 2.0 or later. The default value should be better determined > > with more testing results and production experience. > > > >> The configuration execution.async-state.enabled seems unnecessary, as > >> the infrastructure may automatically get this information from the > >> detailed state backend configurations. We may revisit this part after > >> the core designs have reached an agreement. WDYT? > > > > > > I'm not very sure if there is any use case where users write their code > > using async APIs but run their job in a synchronous way. The first two > > scenarios that come to me are for benchmarking or for a small state, while > > they don't want to rewrite their code. Actually it is easy to support, so > > I'd suggest providing it. But I'm fine with revisiting this later since it > > is not important. WDYT? > > > > For 8: > > Yes, we had considered the I/O metrics group especially the back-pressure, > > idle and task busy per second. In the current plan we can do state access > > during back-pressure, meaning that those metrics for input would better be > > redefined. I suggest we discuss these existing metrics as well as some new > > metrics that should be introduced in FLIP-431 later in milestone 2, since > > we have basically finished the framework thus we will have a better view of > > what metrics should be like afterwards. WDYT? > > > > > > Best, > > Zakelly > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> > > wrote: > >> > >> Hi Zakelly, > >> > >> Thanks for the responses! > >> > >> > 1. I will discuss this with some expert SQL developers. ... mode for > >> > StreamRecord processing. > >> > >> In DataStream API there should also be use cases when the order of > >> output is strictly required. I agree with it that SQL experts may help > >> provide more concrete use cases that can accelerate our discussion, > >> but please allow me to search for DataStream use cases that can prove > >> the necessity of this strict order preservation mode, if answers from > >> SQL experts are shown to be negative. > >> > >> For your convenience, my current rough idea is that we can add a > >> module between the Input(s) and processElement() module in Fig 2 of > >> FLIP-425. The module will be responsible for caching records whose > >> keys collide with in-flight records, and AEC will only be responsible > >> for handling async state calls, without knowing the record each call > >> belongs to. We may revisit this topic once the necessity of the strict > >> order mode is clarified. > >> > >> > >> > 2. The amount of parallel StateRequests ... instead of invoking yield > >> > >> Your suggestions generally appeal to me. I think we may let > >> corresponding Flink jobs fail with OOM for now, since the majority of > >> a StateRequest should just be references to existing Java objects, > >> which only occupies very small memory space and can hardly cause OOM > >> in common cases. We can monitor the pending StateRequests and if there > >> is really a risk of OOM in extreme cases, we can throw Exceptions with > >> proper messages notifying users what to do, like increasing memory > >> through configurations. > >> > >> Your suggestions to adjust threshold adaptively or to use the blocking > >> buffer sounds good, and in my opinion we can postpone them to future > >> FLIPs since they seem to only benefit users in rare cases. Given that > >> FLIP-423~428 has already been a big enough design, it might be better > >> to focus on the most critical design for now and postpone > >> optimizations like this. WDYT? > >> > >> > >> > 5. Sure, we will introduce new configs as well as their default value. > >> > >> Thanks for adding the default values and the values themselves LGTM. > >> It might be better to move the default values to the Proposed Changes > >> section instead of making them public for now, as there will be > >> compatibility issues once we want to dynamically adjust the thresholds > >> and timeouts in future. > >> > >> The configuration execution.async-state.enabled seems unnecessary, as > >> the infrastructure may automatically get this information from the > >> detailed state backend configurations. We may revisit this part after > >> the core designs have reached an agreement. WDYT? > >> > >> > >> Besides, inspired by Jeyhun's comments, it comes to me that > >> > >> 8. Should this FLIP introduce metrics that measure the time a Flink > >> job is back-pressured by State IOs? Under the current design, this > >> metric could measure the time when the blocking buffer is full and > >> yield() cannot get callbacks to process, which means the operator is > >> fully waiting for state responses. > >> > >> Best regards, > >> Yunfeng > >> > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com> wrote: > >> > > >> > Hi Yunfeng, > >> > > >> > Thanks for your detailed comments! > >> > > >> >> 1. Why do we need a close() method on StateIterator? This method seems > >> >> unused in the usage example codes. > >> > > >> > > >> > The `close()` is introduced to release internal resources, but it does > >> > not seem to require the user to call it. I removed this. > >> > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null > >> >> entries are allowed". It might be better to further explain what will > >> >> happen if a null value is passed, ignoring the value in the returned > >> >> Collection or throwing exceptions. Given that > >> >> FutureUtils.emptyFuture() can be returned in the example code, I > >> >> suppose the former one might be correct. > >> > > >> > > >> > The statement "No null entries are allowed" refers to the parameters, it > >> > means some arrayList like [null, StateFuture1, StateFuture2] passed in > >> > are not allowed, and an Exception will be thrown. > >> > > >> >> 1. According to Fig 2 of this FLIP, ... . This situation should be > >> >> avoided and the order of same-key records should be strictly > >> >> preserved. > >> > > >> > > >> > I will discuss this with some expert SQL developers. And if it is valid > >> > and common, I suggest a strict order preservation mode for StreamRecord > >> > processing. WDYT? > >> > > >> >> 2. The FLIP says that StateRequests submitted by Callbacks will not > >> >> invoke further yield() methods. Given that yield() is used when there > >> >> is "too much" in-flight data, does it mean StateRequests submitted by > >> >> Callbacks will never be "too much"? What if the total number of > >> >> StateRequests exceed the capacity of Flink operator's memory space? > >> > > >> > > >> > The amount of parallel StateRequests for one StreamRecord cannot be > >> > determined since the code is written by users. So the in-flight requests > >> > may be "too much", and may cause OOM. Users should re-configure their > >> > job, controlling the amount of on-going StreamRecord. And I suggest two > >> > ways to avoid this: > >> > > >> > Adaptively adjust the count of on-going StreamRecord according to > >> > historical StateRequests amount. > >> > Also control the max StateRequests that can be executed in parallel for > >> > each StreamRecord, and if it exceeds, put the new StateRequest in the > >> > blocking buffer waiting for execution (instead of invoking yield()). > >> > > >> > WDYT? > >> > > >> > > >> >> 3.1 I'm concerned that the out-of-order execution mode, along with the > >> >> epoch mechanism, would bring more complexity to the execution model > >> >> than the performance improvement it promises. Could we add some > >> >> benchmark results proving the benefit of this mode? > >> > > >> > > >> > Agreed, will do. > >> > > >> >> 3.2 The FLIP might need to add a public API section describing how > >> >> users or developers can switch between these two execution modes. > >> > > >> > > >> > Good point. We will add a Public API section. > >> > > >> >> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP, > >> >> there are also more other events that might appear in the stream of > >> >> data records. It might be better to generalize the execution mode > >> >> mechanism to handle all possible events. > >> > > >> > > >> > Yes, I missed this point. Thanks for the reminder. > >> > > >> >> 4. It might be better to treat callback-handling as a > >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of > >> >> repeatedly creating Mail objects. > >> > > >> > > >> > I thought the intermediated wrapper for callback can not be omitted, > >> > since there will be some context switch before each execution. The > >> > MailboxDefaultAction in most cases is processInput right? While the > >> > callback should be executed with higher priority. I'd suggest not > >> > changing the basic logic of Mailbox and the default action since it is > >> > very critical for performance. But yes, we will try our best to avoid > >> > creating intermediated objects. > >> > > >> >> 5. Could this FLIP provide the current default values for things like > >> >> active buffer size thresholds and timeouts? These could help with > >> >> memory consumption and latency analysis. > >> > > >> > > >> > Sure, we will introduce new configs as well as their default value. > >> > > >> >> 6. Why do we need to record the hashcode of a record in its > >> >> RecordContext? It seems not used. > >> > > >> > > >> > The context switch before each callback execution involves > >> > setCurrentKey, where the hashCode is re-calculated. We cache it for > >> > accelerating. > >> > > >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link > >> >> points to a document in flink-1.15. It might be better to verify the > >> >> referenced content is still valid in the latest Flink and update the > >> >> link accordingly. Same for other references if any. > >> > > >> > > >> > Thanks for the reminder! Will check. > >> > > >> > > >> > Thanks a lot & Best, > >> > Zakelly > >> > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <je.kari...@gmail.com> > >> > wrote: > >> >> > >> >> Hi, > >> >> > >> >> Thanks for the great proposals. I have a few comments comments: > >> >> > >> >> - Backpressure Handling. Flink's original backpressure handling is quite > >> >> robust and the semantics is quite "simple" (simple is beautiful). > >> >> This mechanism has proven to perform better/robust than the other open > >> >> source streaming systems, where they were relying on some loopback > >> >> information. > >> >> Now that the proposal also relies on loopback (yield in this case), it > >> >> is > >> >> not clear how well the new backpressure handling proposed in FLIP-425 is > >> >> robust and handle fluctuating workloads. > >> >> > >> >> - Watermark/Timer Handling: Similar arguments apply for watermark and > >> >> timer > >> >> handling. IMHO, we need more benchmarks showing the overhead > >> >> of epoch management with different parameters (e.g., window size, > >> >> watermark > >> >> strategy, etc) > >> >> > >> >> - DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic. > >> >> However, different cloud providers have different storage consistency > >> >> models. > >> >> How do we want to deal with them? > >> >> > >> >> Regards, > >> >> Jeyhun > >> >> > >> >> > >> >> > >> >> > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com> > >> >> wrote: > >> >> > >> >> > Thanks Piotr for sharing your thoughts! > >> >> > > >> >> > I guess it depends how we would like to treat the local disks. I've > >> >> > always > >> >> > > thought about them that almost always eventually all state from the > >> >> > > DFS > >> >> > > should end up cached in the local disks. > >> >> > > >> >> > > >> >> > OK I got it. In our proposal we treat local disk as an optional > >> >> > cache, so > >> >> > the basic design will handle the case with state residing in DFS > >> >> > only. It > >> >> > is a more 'cloud-native' approach that does not rely on any local > >> >> > storage > >> >> > assumptions, which allow users to dynamically adjust the capacity or > >> >> > I/O > >> >> > bound of remote storage to gain performance or save the cost, even > >> >> > without > >> >> > a job restart. > >> >> > > >> >> > In > >> >> > > the currently proposed more fine grained solution, you make a single > >> >> > > request to DFS per each state access. > >> >> > > > >> >> > > >> >> > Ah that's not accurate. Actually we buffer the state requests and > >> >> > process > >> >> > them in batch, multiple requests will correspond to one DFS access > >> >> > (One > >> >> > block access for multiple keys performed by RocksDB). > >> >> > > >> >> > In that benchmark you mentioned, are you requesting the state > >> >> > > asynchronously from local disks into memory? If the benefit comes > >> >> > > from > >> >> > > parallel I/O, then I would expect the benefit to disappear/shrink > >> >> > > when > >> >> > > running multiple subtasks on the same machine, as they would be > >> >> > > making > >> >> > > their own parallel requests, right? Also enabling checkpointing > >> >> > > would > >> >> > > further cut into the available I/O budget. > >> >> > > >> >> > > >> >> > That's an interesting topic. Our proposal is specifically aimed at the > >> >> > scenario where the machine I/O is not fully loaded but the I/O > >> >> > latency has > >> >> > indeed become a bottleneck for each subtask. While the distributed > >> >> > file > >> >> > system is a prime example of a scenario characterized by abundant and > >> >> > easily scalable I/O bandwidth coupled with higher I/O latency. You may > >> >> > expect to increase the parallelism of a job to enhance the > >> >> > performance as > >> >> > well, but that also brings in more waste of CPU's and memory for > >> >> > building > >> >> > up more subtasks. This is one drawback for the computation-storage > >> >> > tightly > >> >> > coupled nodes. While in our proposal, the parallel I/O with all the > >> >> > callbacks still running in one task, pre-allocated computational > >> >> > resources > >> >> > are better utilized. It is a much more lightweight way to perform > >> >> > parallel > >> >> > I/O. > >> >> > > >> >> > Just with what granularity those async requests should be made. > >> >> > > Making state access asynchronous is definitely the right way to go! > >> >> > > >> >> > > >> >> > I think the current proposal is based on such core ideas: > >> >> > > >> >> > - A pure cloud-native disaggregated state. > >> >> > - Fully utilize the given resources and try not to waste them > >> >> > (including > >> >> > I/O). > >> >> > - The ability to scale isolated resources (I/O or CPU or memory) > >> >> > independently. > >> >> > > >> >> > We think a fine-grained granularity is more inline with those ideas, > >> >> > especially without local disk assumptions and without any waste of > >> >> > I/O by > >> >> > prefetching. Please note that it is not a replacement of the original > >> >> > local > >> >> > state with synchronous execution. Instead this is a solution > >> >> > embracing the > >> >> > cloud-native era, providing much more scalability and resource > >> >> > efficiency > >> >> > when handling a *huge state*. > >> >> > > >> >> > What also worries me a lot in this fine grained model is the effect > >> >> > on the > >> >> > > checkpointing times. > >> >> > > >> >> > > >> >> > Your concerns are very reasonable. Faster checkpointing is always a > >> >> > core > >> >> > advantage of disaggregated state, but only for the async phase. There > >> >> > will > >> >> > be some complexity introduced by in-flight requests, but I'd suggest a > >> >> > checkpoint containing those in-flight state requests as part of the > >> >> > state, > >> >> > to accelerate the sync phase by skipping the buffer draining. This > >> >> > makes > >> >> > the buffer size have little impact on checkpoint time. And all the > >> >> > changes > >> >> > keep within the execution model we proposed while the checkpoint > >> >> > barrier > >> >> > alignment or handling will not be touched in our proposal, so I guess > >> >> > the complexity is relatively controllable. I have faith in that :) > >> >> > > >> >> > Also regarding the overheads, it would be great if you could provide > >> >> > > profiling results of the benchmarks that you conducted to verify the > >> >> > > results. Or maybe if you could describe the steps to reproduce the > >> >> > results? > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API". > >> >> > > > >> >> > > >> >> > Yes we could profile the benchmarks. And for the comparison of > >> >> > "Hashmap > >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job > >> >> > written > >> >> > with async APIs but disabling the async execution by directly > >> >> > completing > >> >> > the future using sync state access. This evaluates the overhead of > >> >> > newly > >> >> > introduced modules like 'AEC' in sync execution (even though they are > >> >> > not > >> >> > designed for it). The code will be provided later. For other results > >> >> > of our > >> >> > PoC[1], you can follow the instructions here[2] to reproduce. Since > >> >> > the > >> >> > compilation may take some effort, we will directly provide the jar for > >> >> > testing next week. > >> >> > > >> >> > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit late in my > >> >> > local time and the next few days are weekends. So I will reply to you > >> >> > later. Thanks for your response! > >> >> > > >> >> > > >> >> > [1] > >> >> > > >> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults > >> >> > [2] > >> >> > > >> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC > >> >> > > >> >> > > >> >> > Best, > >> >> > Zakelly > >> >> > > >> >> > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou > >> >> > <flink.zhouyunf...@gmail.com> > >> >> > wrote: > >> >> > > >> >> > > Hi, > >> >> > > > >> >> > > Thanks for proposing this design! I just read FLIP-424 and FLIP-425 > >> >> > > and have some questions about the proposed changes. > >> >> > > > >> >> > > For Async API (FLIP-424) > >> >> > > > >> >> > > 1. Why do we need a close() method on StateIterator? This method > >> >> > > seems > >> >> > > unused in the usage example codes. > >> >> > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null > >> >> > > entries are allowed". It might be better to further explain what > >> >> > > will > >> >> > > happen if a null value is passed, ignoring the value in the returned > >> >> > > Collection or throwing exceptions. Given that > >> >> > > FutureUtils.emptyFuture() can be returned in the example code, I > >> >> > > suppose the former one might be correct. > >> >> > > > >> >> > > > >> >> > > For Async Execution (FLIP-425) > >> >> > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key collide > >> >> > > with an ongoing recordA, its processElement() method can still be > >> >> > > triggered immediately, and then it might be moved to the blocking > >> >> > > buffer in AEC if it involves state operations. This means that > >> >> > > recordB's output will precede recordA's output in downstream > >> >> > > operators, if recordA involves state operations while recordB does > >> >> > > not. This will harm the correctness of Flink jobs in some use cases. > >> >> > > For example, in dim table join cases, recordA could be a delete > >> >> > > operation that involves state access, while recordB could be an > >> >> > > insert > >> >> > > operation that needs to visit external storage without state access. > >> >> > > If recordB's output precedes recordA's, then an entry that is > >> >> > > supposed > >> >> > > to finally exist with recordB's value in the sink table might > >> >> > > actually > >> >> > > be deleted according to recordA's command. This situation should be > >> >> > > avoided and the order of same-key records should be strictly > >> >> > > preserved. > >> >> > > > >> >> > > 2. The FLIP says that StateRequests submitted by Callbacks will not > >> >> > > invoke further yield() methods. Given that yield() is used when > >> >> > > there > >> >> > > is "too much" in-flight data, does it mean StateRequests submitted > >> >> > > by > >> >> > > Callbacks will never be "too much"? What if the total number of > >> >> > > StateRequests exceed the capacity of Flink operator's memory space? > >> >> > > > >> >> > > 3. In the "Watermark" section, this FLIP provided an out-of-order > >> >> > > execution mode apart from the default strictly-ordered mode, which > >> >> > > can > >> >> > > optimize performance by allowing more concurrent executions. > >> >> > > > >> >> > > 3.1 I'm concerned that the out-of-order execution mode, along with > >> >> > > the > >> >> > > epoch mechanism, would bring more complexity to the execution model > >> >> > > than the performance improvement it promises. Could we add some > >> >> > > benchmark results proving the benefit of this mode? > >> >> > > > >> >> > > 3.2 The FLIP might need to add a public API section describing how > >> >> > > users or developers can switch between these two execution modes. > >> >> > > > >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this FLIP, > >> >> > > there are also more other events that might appear in the stream of > >> >> > > data records. It might be better to generalize the execution mode > >> >> > > mechanism to handle all possible events. > >> >> > > > >> >> > > 4. It might be better to treat callback-handling as a > >> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead of > >> >> > > repeatedly creating Mail objects. > >> >> > > > >> >> > > 5. Could this FLIP provide the current default values for things > >> >> > > like > >> >> > > active buffer size thresholds and timeouts? These could help with > >> >> > > memory consumption and latency analysis. > >> >> > > > >> >> > > 6. Why do we need to record the hashcode of a record in its > >> >> > > RecordContext? It seems not used. > >> >> > > > >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the link > >> >> > > points to a document in flink-1.15. It might be better to verify the > >> >> > > referenced content is still valid in the latest Flink and update the > >> >> > > link accordingly. Same for other references if any. > >> >> > > > >> >> > > Best, > >> >> > > Yunfeng Zhou > >> >> > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com> > >> >> > > wrote: > >> >> > > > > >> >> > > > Hi Devs, > >> >> > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, > >> >> > > > Hangxiang > >> >> > Yu, > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion about > >> >> > > introducing > >> >> > > > Disaggregated State Storage and Management in Flink 2.0. > >> >> > > > > >> >> > > > The past decade has witnessed a dramatic shift in Flink's > >> >> > > > deployment > >> >> > > mode, > >> >> > > > workload patterns, and hardware improvements. We've moved from the > >> >> > > > map-reduce era where workers are computation-storage tightly > >> >> > > > coupled > >> >> > > nodes > >> >> > > > to a cloud-native world where containerized deployments on > >> >> > > > Kubernetes > >> >> > > > become standard. To enable Flink's Cloud-Native future, we > >> >> > > > introduce > >> >> > > > Disaggregated State Storage and Management that uses DFS as > >> >> > > > primary > >> >> > > storage > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap. > >> >> > > > > >> >> > > > Design Details can be found in FLIP-423[1]. > >> >> > > > > >> >> > > > This new architecture is aimed to solve the following challenges > >> >> > brought > >> >> > > in > >> >> > > > the cloud-native era for Flink. > >> >> > > > 1. Local Disk Constraints in containerization > >> >> > > > 2. Spiky Resource Usage caused by compaction in the current state > >> >> > > > model > >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of > >> >> > > > Terabytes) > >> >> > > > 4. Light and Fast Checkpoint in a native way > >> >> > > > > >> >> > > > More specifically, we want to reach a consensus on the following > >> >> > > > issues > >> >> > > in > >> >> > > > this discussion: > >> >> > > > > >> >> > > > 1. Overall design > >> >> > > > 2. Proposed Changes > >> >> > > > 3. Design details to achieve Milestone1 > >> >> > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline version using DFS > >> >> > > > as > >> >> > > > primary storage and complete core functionalities, including: > >> >> > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for > >> >> > > > asynchronous state access. > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a > >> >> > > > non-blocking > >> >> > > > execution model leveraging the asynchronous APIs introduced in > >> >> > FLIP-424. > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of > >> >> > remote > >> >> > > > state data in batches to avoid unnecessary round-trip costs for > >> >> > > > remote > >> >> > > > access > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial > >> >> > version > >> >> > > of > >> >> > > > the ForSt disaggregated state store. > >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate > >> >> > > > checkpointing mechanisms with the disaggregated state store for > >> >> > > > fault > >> >> > > > tolerance and fast rescaling. > >> >> > > > > >> >> > > > We will vote on each FLIP in separate threads to make sure each > >> >> > > > FLIP > >> >> > > > reaches a consensus. But we want to keep the discussion within a > >> >> > focused > >> >> > > > thread (this thread) for easier tracking of contexts to avoid > >> >> > duplicated > >> >> > > > questions/discussions and also to think of the problem/solution > >> >> > > > in a > >> >> > full > >> >> > > > picture. > >> >> > > > > >> >> > > > Looking forward to your feedback > >> >> > > > > >> >> > > > Best, > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng > >> >> > > > > >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ > >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ > >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ > >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ > >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ > >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ > >> >> > > > >> >> >