Hi team,

Thanks for your discussion. Regarding FLIP-425, we have supplemented
several updates to answer high-frequency questions:

1. We captured a flame graph of the Hashmap state backend in
"Synchronous execution with asynchronous APIs"[1], which reveals that
the framework overhead (including reference counting, future-related
code and so on) consumes about 9% of the keyed operator CPU time.
2. We added a set of comparative experiments for watermark processing,
the performance of Out-Of-Order mode is 70% better than
strictly-ordered mode under ~140MB state size. Instructions on how to
run this test have also been added[2].
3. Regarding the order of StreamRecord, whether it has state access or
not. We supplemented a new *Strict order of 'processElement'*[3].

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder


Best regards,
Yanfei

Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道:
>
> Hi Zakelly,
>
> > 5. I'm not very sure ... revisiting this later since it is not important.
>
> It seems that we still have some details to confirm about this
> question. Let's postpone this to after the critical parts of the
> design are settled.
>
> > 8. Yes, we had considered ... metrics should be like afterwards.
>
> Oh sorry I missed FLIP-431. I'm fine with discussing this topic in milestone 
> 2.
>
> Looking forward to the detailed design about the strict mode between
> same-key records and the benchmark results about the epoch mechanism.
>
> Best regards,
> Yunfeng
>
> On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > Hi Yunfeng,
> >
> > For 1:
> > I had a discussion with Lincoln Lee, and I realize it is a common case the 
> > same-key record should be blocked before the `processElement`. It is easier 
> > for users to understand. Thus I will introduce a strict mode for this and 
> > make it default. My rough idea is just like yours, by invoking some method 
> > of AEC instance before `processElement`. The detailed design will be 
> > described in FLIP later.
> >
> > For 2:
> > I agree with you. We could throw exceptions for now and optimize this later.
> >
> > For 5:
> >>
> >> It might be better to move the default values to the Proposed Changes
> >> section instead of making them public for now, as there will be
> >> compatibility issues once we want to dynamically adjust the thresholds
> >> and timeouts in future.
> >
> > Agreed. The whole framework is under experiment until we think it is 
> > complete in 2.0 or later. The default value should be better determined 
> > with more testing results and production experience.
> >
> >> The configuration execution.async-state.enabled seems unnecessary, as
> >> the infrastructure may automatically get this information from the
> >> detailed state backend configurations. We may revisit this part after
> >> the core designs have reached an agreement. WDYT?
> >
> >
> > I'm not very sure if there is any use case where users write their code 
> > using async APIs but run their job in a synchronous way. The first two 
> > scenarios that come to me are for benchmarking or for a small state, while 
> > they don't want to rewrite their code. Actually it is easy to support, so 
> > I'd suggest providing it. But I'm fine with revisiting this later since it 
> > is not important. WDYT?
> >
> > For 8:
> > Yes, we had considered the I/O metrics group especially the back-pressure, 
> > idle and task busy per second. In the current plan we can do state access 
> > during back-pressure, meaning that those metrics for input would better be 
> > redefined. I suggest we discuss these existing metrics as well as some new 
> > metrics that should be introduced in FLIP-431 later in milestone 2, since 
> > we have basically finished the framework thus we will have a better view of 
> > what metrics should be like afterwards. WDYT?
> >
> >
> > Best,
> > Zakelly
> >
> > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> 
> > wrote:
> >>
> >> Hi Zakelly,
> >>
> >> Thanks for the responses!
> >>
> >> > 1. I will discuss this with some expert SQL developers. ... mode for 
> >> > StreamRecord processing.
> >>
> >> In DataStream API there should also be use cases when the order of
> >> output is strictly required. I agree with it that SQL experts may help
> >> provide more concrete use cases that can accelerate our discussion,
> >> but please allow me to search for DataStream use cases that can prove
> >> the necessity of this strict order preservation mode, if answers from
> >> SQL experts are shown to be negative.
> >>
> >> For your convenience, my current rough idea is that we can add a
> >> module between the Input(s) and processElement() module in Fig 2 of
> >> FLIP-425. The module will be responsible for caching records whose
> >> keys collide with in-flight records, and AEC will only be responsible
> >> for handling async state calls, without knowing the record each call
> >> belongs to. We may revisit this topic once the necessity of the strict
> >> order mode is clarified.
> >>
> >>
> >> > 2. The amount of parallel StateRequests ... instead of invoking yield
> >>
> >> Your suggestions generally appeal to me. I think we may let
> >> corresponding Flink jobs fail with OOM for now, since the majority of
> >> a StateRequest should just be references to existing Java objects,
> >> which only occupies very small memory space and can hardly cause OOM
> >> in common cases. We can monitor the pending StateRequests and if there
> >> is really a risk of OOM in extreme cases, we can throw Exceptions with
> >> proper messages notifying users what to do, like increasing memory
> >> through configurations.
> >>
> >> Your suggestions to adjust threshold adaptively or to use the blocking
> >> buffer sounds good, and in my opinion we can postpone them to future
> >> FLIPs since they seem to only benefit users in rare cases. Given that
> >> FLIP-423~428 has already been a big enough design, it might be better
> >> to focus on the most critical design for now and postpone
> >> optimizations like this. WDYT?
> >>
> >>
> >> > 5. Sure, we will introduce new configs as well as their default value.
> >>
> >> Thanks for adding the default values and the values themselves LGTM.
> >> It might be better to move the default values to the Proposed Changes
> >> section instead of making them public for now, as there will be
> >> compatibility issues once we want to dynamically adjust the thresholds
> >> and timeouts in future.
> >>
> >> The configuration execution.async-state.enabled seems unnecessary, as
> >> the infrastructure may automatically get this information from the
> >> detailed state backend configurations. We may revisit this part after
> >> the core designs have reached an agreement. WDYT?
> >>
> >>
> >> Besides, inspired by Jeyhun's comments, it comes to me that
> >>
> >> 8. Should this FLIP introduce metrics that measure the time a Flink
> >> job is back-pressured by State IOs? Under the current design, this
> >> metric could measure the time when the blocking buffer is full and
> >> yield() cannot get callbacks to process, which means the operator is
> >> fully waiting for state responses.
> >>
> >> Best regards,
> >> Yunfeng
> >>
> >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >> >
> >> > Hi Yunfeng,
> >> >
> >> > Thanks for your detailed comments!
> >> >
> >> >> 1. Why do we need a close() method on StateIterator? This method seems
> >> >> unused in the usage example codes.
> >> >
> >> >
> >> > The `close()` is introduced to release internal resources, but it does 
> >> > not seem to require the user to call it. I removed this.
> >> >
> >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> >> >> entries are allowed". It might be better to further explain what will
> >> >> happen if a null value is passed, ignoring the value in the returned
> >> >> Collection or throwing exceptions. Given that
> >> >> FutureUtils.emptyFuture() can be returned in the example code, I
> >> >> suppose the former one might be correct.
> >> >
> >> >
> >> > The statement "No null entries are allowed" refers to the parameters, it 
> >> > means some arrayList like [null, StateFuture1, StateFuture2] passed in 
> >> > are not allowed, and an Exception will be thrown.
> >> >
> >> >> 1. According to Fig 2 of this FLIP, ... . This situation should be
> >> >> avoided and the order of same-key records should be strictly
> >> >> preserved.
> >> >
> >> >
> >> > I will discuss this with some expert SQL developers. And if it is valid 
> >> > and common, I suggest a strict order preservation mode for StreamRecord 
> >> > processing. WDYT?
> >> >
> >> >> 2. The FLIP says that StateRequests submitted by Callbacks will not
> >> >> invoke further yield() methods. Given that yield() is used when there
> >> >> is "too much" in-flight data, does it mean StateRequests submitted by
> >> >> Callbacks will never be "too much"? What if the total number of
> >> >> StateRequests exceed the capacity of Flink operator's memory space?
> >> >
> >> >
> >> > The amount of parallel StateRequests for one StreamRecord cannot be 
> >> > determined since the code is written by users. So the in-flight requests 
> >> > may be "too much", and may cause OOM. Users should re-configure their 
> >> > job, controlling the amount of on-going StreamRecord. And I suggest two 
> >> > ways to avoid this:
> >> >
> >> > Adaptively adjust the count of on-going StreamRecord according to 
> >> > historical StateRequests amount.
> >> > Also control the max StateRequests that can be executed in parallel for 
> >> > each StreamRecord, and if it exceeds, put the new StateRequest in the 
> >> > blocking buffer waiting for execution (instead of invoking yield()).
> >> >
> >> > WDYT?
> >> >
> >> >
> >> >> 3.1 I'm concerned that the out-of-order execution mode, along with the
> >> >> epoch mechanism, would bring more complexity to the execution model
> >> >> than the performance improvement it promises. Could we add some
> >> >> benchmark results proving the benefit of this mode?
> >> >
> >> >
> >> > Agreed, will do.
> >> >
> >> >> 3.2 The FLIP might need to add a public API section describing how
> >> >> users or developers can switch between these two execution modes.
> >> >
> >> >
> >> > Good point. We will add a Public API section.
> >> >
> >> >> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> >> >> there are also more other events that might appear in the stream of
> >> >> data records. It might be better to generalize the execution mode
> >> >> mechanism to handle all possible events.
> >> >
> >> >
> >> > Yes, I missed this point. Thanks for the reminder.
> >> >
> >> >> 4. It might be better to treat callback-handling as a
> >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> >> >> repeatedly creating Mail objects.
> >> >
> >> >
> >> >  I thought the intermediated wrapper for callback can not be omitted, 
> >> > since there will be some context switch before each execution. The 
> >> > MailboxDefaultAction in most cases is processInput right? While the 
> >> > callback should be executed with higher priority. I'd suggest not 
> >> > changing the basic logic of Mailbox and the default action since it is 
> >> > very critical for performance. But yes, we will try our best to avoid 
> >> > creating intermediated objects.
> >> >
> >> >> 5. Could this FLIP provide the current default values for things like
> >> >> active buffer size thresholds and timeouts? These could help with
> >> >> memory consumption and latency analysis.
> >> >
> >> >
> >> > Sure, we will introduce new configs as well as their default value.
> >> >
> >> >> 6. Why do we need to record the hashcode of a record in its
> >> >> RecordContext? It seems not used.
> >> >
> >> >
> >> > The context switch before each callback execution involves 
> >> > setCurrentKey, where the hashCode is re-calculated. We cache it for 
> >> > accelerating.
> >> >
> >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link
> >> >> points to a document in flink-1.15. It might be better to verify the
> >> >> referenced content is still valid in the latest Flink and update the
> >> >> link accordingly. Same for other references if any.
> >> >
> >> >
> >> > Thanks for the reminder! Will check.
> >> >
> >> >
> >> > Thanks a lot & Best,
> >> > Zakelly
> >> >
> >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <je.kari...@gmail.com> 
> >> > wrote:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Thanks for the great proposals. I have a few comments comments:
> >> >>
> >> >> - Backpressure Handling. Flink's original backpressure handling is quite
> >> >> robust and the semantics is quite "simple" (simple is beautiful).
> >> >> This mechanism has proven to perform better/robust than the other open
> >> >> source streaming systems, where they were relying on some loopback
> >> >> information.
> >> >> Now that the proposal also relies on loopback (yield in this case), it 
> >> >> is
> >> >> not clear how well the new backpressure handling proposed in FLIP-425 is
> >> >> robust and handle fluctuating workloads.
> >> >>
> >> >> - Watermark/Timer Handling: Similar arguments apply for watermark and 
> >> >> timer
> >> >> handling. IMHO, we need more benchmarks showing the overhead
> >> >> of epoch management with different parameters (e.g., window size, 
> >> >> watermark
> >> >> strategy, etc)
> >> >>
> >> >> - DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
> >> >> However, different cloud providers have different storage consistency
> >> >> models.
> >> >> How do we want to deal with them?
> >> >>
> >> >>  Regards,
> >> >> Jeyhun
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com> 
> >> >> wrote:
> >> >>
> >> >> > Thanks Piotr for sharing your thoughts!
> >> >> >
> >> >> > I guess it depends how we would like to treat the local disks. I've 
> >> >> > always
> >> >> > > thought about them that almost always eventually all state from the 
> >> >> > > DFS
> >> >> > > should end up cached in the local disks.
> >> >> >
> >> >> >
> >> >> > OK I got it. In our proposal we treat local disk as an optional 
> >> >> > cache, so
> >> >> > the basic design will handle the case with state residing in DFS 
> >> >> > only. It
> >> >> > is a more 'cloud-native' approach that does not rely on any local 
> >> >> > storage
> >> >> > assumptions, which allow users to dynamically adjust the capacity or 
> >> >> > I/O
> >> >> > bound of remote storage to gain performance or save the cost, even 
> >> >> > without
> >> >> > a job restart.
> >> >> >
> >> >> > In
> >> >> > > the currently proposed more fine grained solution, you make a single
> >> >> > > request to DFS per each state access.
> >> >> > >
> >> >> >
> >> >> > Ah that's not accurate. Actually we buffer the state requests and 
> >> >> > process
> >> >> > them in batch, multiple requests will correspond to one DFS access 
> >> >> > (One
> >> >> > block access for multiple keys performed by RocksDB).
> >> >> >
> >> >> > In that benchmark you mentioned, are you requesting the state
> >> >> > > asynchronously from local disks into memory? If the benefit comes 
> >> >> > > from
> >> >> > > parallel I/O, then I would expect the benefit to disappear/shrink 
> >> >> > > when
> >> >> > > running multiple subtasks on the same machine, as they would be 
> >> >> > > making
> >> >> > > their own parallel requests, right? Also enabling checkpointing 
> >> >> > > would
> >> >> > > further cut into the available I/O budget.
> >> >> >
> >> >> >
> >> >> > That's an interesting topic. Our proposal is specifically aimed at the
> >> >> > scenario where the machine I/O is not fully loaded but the I/O 
> >> >> > latency has
> >> >> > indeed become a bottleneck for each subtask. While the distributed 
> >> >> > file
> >> >> > system is a prime example of a scenario characterized by abundant and
> >> >> > easily scalable I/O bandwidth coupled with higher I/O latency. You may
> >> >> > expect to increase the parallelism of a job to enhance the 
> >> >> > performance as
> >> >> > well, but that also brings in more waste of CPU's and memory for 
> >> >> > building
> >> >> > up more subtasks. This is one drawback for the computation-storage 
> >> >> > tightly
> >> >> > coupled nodes. While in our proposal, the parallel I/O with all the
> >> >> > callbacks still running in one task, pre-allocated computational 
> >> >> > resources
> >> >> > are better utilized. It is a much more lightweight way to perform 
> >> >> > parallel
> >> >> > I/O.
> >> >> >
> >> >> > Just with what granularity those async requests should be made.
> >> >> > > Making state access asynchronous is definitely the right way to go!
> >> >> >
> >> >> >
> >> >> > I think the current proposal is based on such core ideas:
> >> >> >
> >> >> >    - A pure cloud-native disaggregated state.
> >> >> >    - Fully utilize the given resources and try not to waste them 
> >> >> > (including
> >> >> >    I/O).
> >> >> >    - The ability to scale isolated resources (I/O or CPU or memory)
> >> >> >    independently.
> >> >> >
> >> >> > We think a fine-grained granularity is more inline with those ideas,
> >> >> > especially without local disk assumptions and without any waste of 
> >> >> > I/O by
> >> >> > prefetching. Please note that it is not a replacement of the original 
> >> >> > local
> >> >> > state with synchronous execution. Instead this is a solution 
> >> >> > embracing the
> >> >> > cloud-native era, providing much more scalability and resource 
> >> >> > efficiency
> >> >> > when handling a *huge state*.
> >> >> >
> >> >> > What also worries me a lot in this fine grained model is the effect 
> >> >> > on the
> >> >> > > checkpointing times.
> >> >> >
> >> >> >
> >> >> > Your concerns are very reasonable. Faster checkpointing is always a 
> >> >> > core
> >> >> > advantage of disaggregated state, but only for the async phase. There 
> >> >> > will
> >> >> > be some complexity introduced by in-flight requests, but I'd suggest a
> >> >> > checkpoint containing those in-flight state requests as part of the 
> >> >> > state,
> >> >> > to accelerate the sync phase by skipping the buffer draining. This 
> >> >> > makes
> >> >> > the buffer size have little impact on checkpoint time. And all the 
> >> >> > changes
> >> >> > keep within the execution model we proposed while the checkpoint 
> >> >> > barrier
> >> >> > alignment or handling will not be touched in our proposal, so I guess
> >> >> > the complexity is relatively controllable. I have faith in that :)
> >> >> >
> >> >> > Also regarding the overheads, it would be great if you could provide
> >> >> > > profiling results of the benchmarks that you conducted to verify the
> >> >> > > results. Or maybe if you could describe the steps to reproduce the
> >> >> > results?
> >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
> >> >> > >
> >> >> >
> >> >> > Yes we could profile the benchmarks. And for the comparison of 
> >> >> > "Hashmap
> >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job 
> >> >> > written
> >> >> > with async APIs but disabling the async execution by directly 
> >> >> > completing
> >> >> > the future using sync state access. This evaluates the overhead of 
> >> >> > newly
> >> >> > introduced modules like 'AEC' in sync execution (even though they are 
> >> >> > not
> >> >> > designed for it). The code will be provided later. For other results 
> >> >> > of our
> >> >> > PoC[1], you can follow the instructions here[2] to reproduce. Since 
> >> >> > the
> >> >> > compilation may take some effort, we will directly provide the jar for
> >> >> > testing next week.
> >> >> >
> >> >> >
> >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit late in my
> >> >> > local time and the next few days are weekends. So I will reply to you
> >> >> > later. Thanks for your response!
> >> >> >
> >> >> >
> >> >> > [1]
> >> >> >
> >> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> >> >> > [2]
> >> >> >
> >> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> >> >> >
> >> >> >
> >> >> > Best,
> >> >> > Zakelly
> >> >> >
> >> >> >
> >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou 
> >> >> > <flink.zhouyunf...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> > > Hi,
> >> >> > >
> >> >> > > Thanks for proposing this design! I just read FLIP-424 and FLIP-425
> >> >> > > and have some questions about the proposed changes.
> >> >> > >
> >> >> > > For Async API (FLIP-424)
> >> >> > >
> >> >> > > 1. Why do we need a close() method on StateIterator? This method 
> >> >> > > seems
> >> >> > > unused in the usage example codes.
> >> >> > >
> >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> >> >> > > entries are allowed". It might be better to further explain what 
> >> >> > > will
> >> >> > > happen if a null value is passed, ignoring the value in the returned
> >> >> > > Collection or throwing exceptions. Given that
> >> >> > > FutureUtils.emptyFuture() can be returned in the example code, I
> >> >> > > suppose the former one might be correct.
> >> >> > >
> >> >> > >
> >> >> > > For Async Execution (FLIP-425)
> >> >> > >
> >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key collide
> >> >> > > with an ongoing recordA, its processElement() method can still be
> >> >> > > triggered immediately, and then it might be moved to the blocking
> >> >> > > buffer in AEC if it involves state operations. This means that
> >> >> > > recordB's output will precede recordA's output in downstream
> >> >> > > operators, if recordA involves state operations while recordB does
> >> >> > > not. This will harm the correctness of Flink jobs in some use cases.
> >> >> > > For example, in dim table join cases, recordA could be a delete
> >> >> > > operation that involves state access, while recordB could be an 
> >> >> > > insert
> >> >> > > operation that needs to visit external storage without state access.
> >> >> > > If recordB's output precedes recordA's, then an entry that is 
> >> >> > > supposed
> >> >> > > to finally exist with recordB's value in the sink table might 
> >> >> > > actually
> >> >> > > be deleted according to recordA's command. This situation should be
> >> >> > > avoided and the order of same-key records should be strictly
> >> >> > > preserved.
> >> >> > >
> >> >> > > 2. The FLIP says that StateRequests submitted by Callbacks will not
> >> >> > > invoke further yield() methods. Given that yield() is used when 
> >> >> > > there
> >> >> > > is "too much" in-flight data, does it mean StateRequests submitted 
> >> >> > > by
> >> >> > > Callbacks will never be "too much"? What if the total number of
> >> >> > > StateRequests exceed the capacity of Flink operator's memory space?
> >> >> > >
> >> >> > > 3. In the "Watermark" section, this FLIP provided an out-of-order
> >> >> > > execution mode apart from the default strictly-ordered mode, which 
> >> >> > > can
> >> >> > > optimize performance by allowing more concurrent executions.
> >> >> > >
> >> >> > > 3.1 I'm concerned that the out-of-order execution mode, along with 
> >> >> > > the
> >> >> > > epoch mechanism, would bring more complexity to the execution model
> >> >> > > than the performance improvement it promises. Could we add some
> >> >> > > benchmark results proving the benefit of this mode?
> >> >> > >
> >> >> > > 3.2 The FLIP might need to add a public API section describing how
> >> >> > > users or developers can switch between these two execution modes.
> >> >> > >
> >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> >> >> > > there are also more other events that might appear in the stream of
> >> >> > > data records. It might be better to generalize the execution mode
> >> >> > > mechanism to handle all possible events.
> >> >> > >
> >> >> > > 4. It might be better to treat callback-handling as a
> >> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead of
> >> >> > > repeatedly creating Mail objects.
> >> >> > >
> >> >> > > 5. Could this FLIP provide the current default values for things 
> >> >> > > like
> >> >> > > active buffer size thresholds and timeouts? These could help with
> >> >> > > memory consumption and latency analysis.
> >> >> > >
> >> >> > > 6. Why do we need to record the hashcode of a record in its
> >> >> > > RecordContext? It seems not used.
> >> >> > >
> >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the link
> >> >> > > points to a document in flink-1.15. It might be better to verify the
> >> >> > > referenced content is still valid in the latest Flink and update the
> >> >> > > link accordingly. Same for other references if any.
> >> >> > >
> >> >> > > Best,
> >> >> > > Yunfeng Zhou
> >> >> > >
> >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com> 
> >> >> > > wrote:
> >> >> > > >
> >> >> > > > Hi Devs,
> >> >> > > >
> >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, 
> >> >> > > > Hangxiang
> >> >> > Yu,
> >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> >> >> > > introducing
> >> >> > > > Disaggregated State Storage and Management in Flink 2.0.
> >> >> > > >
> >> >> > > > The past decade has witnessed a dramatic shift in Flink's 
> >> >> > > > deployment
> >> >> > > mode,
> >> >> > > > workload patterns, and hardware improvements. We've moved from the
> >> >> > > > map-reduce era where workers are computation-storage tightly 
> >> >> > > > coupled
> >> >> > > nodes
> >> >> > > > to a cloud-native world where containerized deployments on 
> >> >> > > > Kubernetes
> >> >> > > > become standard. To enable Flink's Cloud-Native future, we 
> >> >> > > > introduce
> >> >> > > > Disaggregated State Storage and Management that uses DFS as 
> >> >> > > > primary
> >> >> > > storage
> >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> >> >> > > >
> >> >> > > > Design Details can be found in FLIP-423[1].
> >> >> > > >
> >> >> > > > This new architecture is aimed to solve the following challenges
> >> >> > brought
> >> >> > > in
> >> >> > > > the cloud-native era for Flink.
> >> >> > > > 1. Local Disk Constraints in containerization
> >> >> > > > 2. Spiky Resource Usage caused by compaction in the current state 
> >> >> > > > model
> >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of 
> >> >> > > > Terabytes)
> >> >> > > > 4. Light and Fast Checkpoint in a native way
> >> >> > > >
> >> >> > > > More specifically, we want to reach a consensus on the following 
> >> >> > > > issues
> >> >> > > in
> >> >> > > > this discussion:
> >> >> > > >
> >> >> > > > 1. Overall design
> >> >> > > > 2. Proposed Changes
> >> >> > > > 3. Design details to achieve Milestone1
> >> >> > > >
> >> >> > > > In M1, we aim to achieve an end-to-end baseline version using DFS 
> >> >> > > > as
> >> >> > > > primary storage and complete core functionalities, including:
> >> >> > > >
> >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> >> >> > > > asynchronous state access.
> >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a 
> >> >> > > > non-blocking
> >> >> > > > execution model leveraging the asynchronous APIs introduced in
> >> >> > FLIP-424.
> >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of
> >> >> > remote
> >> >> > > > state data in batches to avoid unnecessary round-trip costs for 
> >> >> > > > remote
> >> >> > > > access
> >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial
> >> >> > version
> >> >> > > of
> >> >> > > > the ForSt disaggregated state store.
> >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> >> >> > > > checkpointing mechanisms with the disaggregated state store for 
> >> >> > > > fault
> >> >> > > > tolerance and fast rescaling.
> >> >> > > >
> >> >> > > > We will vote on each FLIP in separate threads to make sure each 
> >> >> > > > FLIP
> >> >> > > > reaches a consensus. But we want to keep the discussion within a
> >> >> > focused
> >> >> > > > thread (this thread) for easier tracking of contexts to avoid
> >> >> > duplicated
> >> >> > > > questions/discussions and also to think of the problem/solution 
> >> >> > > > in a
> >> >> > full
> >> >> > > > picture.
> >> >> > > >
> >> >> > > > Looking forward to your feedback
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> >> >> > > >
> >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> >> >> > >
> >> >> >

Reply via email to