Hi Jeyhun,

Thanks for your reply!

- Backpressure Handling.


I agree that Flink's original back pressure handling is simple and robust,
and in the current proposal we try our best to make the minimal change to
Mailbox for callback execution. When jumping out of the detailed execution
logic of state requests, you may find there is a single entry controlling
the size of in-flight StreamRecord (the AEC) and invoking yield(), which I
believe is relatively predictable. Asynchronous execution is never easy to
achieve, and I think we could prove the robustness in our UTs/ITs and
benchmarks before we release it.

- Watermark/Timer Handling
>

Yes, we need every benchmark for the newly introduced framework before the
release. But since the change is huge and we may not have time to
implement each mode and parameters combination, I suggest we first decide
whether to go based on some necessary PoC benchmarks. @Yunfeng Zhou also
suggests the benchmark for out-of-order execution of epoch manager, we'll
wait for that.

- DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
> However, different cloud providers have different storage consistency
> models.
> How do we want to deal with them?


Yes, current proposals treat DFS as a black box. The main change is to move
the local runtime files to DFS, and those files are read/written by local
clients in TM (won't read across TMs) and they can be lost if the job
stopped. Also the Flink's file system layer conceals many underlying
details, so I guess the file consistency of different DFS is not a big
thing in our implementation. Of course, there may be some optimization when
dealing with different DFS, we may achieve this later.


Thanks again & Best,
Zakelly


On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Yunfeng,
>
> Thanks for your detailed comments!
>
> 1. Why do we need a close() method on StateIterator? This method seems
>> unused in the usage example codes.
>
>
> The `close()` is introduced to release internal resources, but it does not
> seem to require the user to call it. I removed this.
>
> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
>> entries are allowed". It might be better to further explain what will
>> happen if a null value is passed, ignoring the value in the returned
>> Collection or throwing exceptions. Given that
>> FutureUtils.emptyFuture() can be returned in the example code, I
>> suppose the former one might be correct.
>
>
> The statement "No null entries are allowed" refers to the parameters, it
> means some arrayList like [null, StateFuture1, StateFuture2] passed in are
> not allowed, and an Exception will be thrown.
>
> 1. According to Fig 2 of this FLIP, ... . This situation should be
>> avoided and the order of same-key records should be strictly
>> preserved.
>
>
> I will discuss this with some expert SQL developers. And if it is valid
> and common, I suggest a strict order preservation mode for StreamRecord
> processing. WDYT?
>
> 2. The FLIP says that StateRequests submitted by Callbacks will not
>> invoke further yield() methods. Given that yield() is used when there
>> is "too much" in-flight data, does it mean StateRequests submitted by
>> Callbacks will never be "too much"? What if the total number of
>> StateRequests exceed the capacity of Flink operator's memory space?
>
>
> The amount of parallel StateRequests for one StreamRecord cannot be
> determined since the code is written by users. So the in-flight requests
> may be "too much", and may cause OOM. Users should re-configure their job,
> controlling the amount of on-going StreamRecord. And I suggest two ways to
> avoid this:
>
>    1. Adaptively adjust the count of on-going StreamRecord according to
>    historical StateRequests amount.
>    2. Also control the max StateRequests that can be executed in parallel
>    for each StreamRecord, and if it exceeds, put the new StateRequest in the
>    blocking buffer waiting for execution (instead of invoking yield()).
>
> WDYT?
>
>
> 3.1 I'm concerned that the out-of-order execution mode, along with the
>> epoch mechanism, would bring more complexity to the execution model
>> than the performance improvement it promises. Could we add some
>> benchmark results proving the benefit of this mode?
>
>
> Agreed, will do.
>
> 3.2 The FLIP might need to add a public API section describing how
>> users or developers can switch between these two execution modes.
>
>
> Good point. We will add a Public API section.
>
> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
>> there are also more other events that might appear in the stream of
>> data records. It might be better to generalize the execution mode
>> mechanism to handle all possible events.
>
>
> Yes, I missed this point. Thanks for the reminder.
>
> 4. It might be better to treat callback-handling as a
>> MailboxDefaultAction, instead of Mails, to avoid the overhead of
>> repeatedly creating Mail objects.
>
>
>  I thought the intermediated wrapper for callback can not be omitted,
> since there will be some context switch before each execution. The
> MailboxDefaultAction in most cases is processInput right? While the
> callback should be executed with higher priority. I'd suggest not changing
> the basic logic of Mailbox and the default action since it is very critical
> for performance. But yes, we will try our best to avoid creating
> intermediated objects.
>
> 5. Could this FLIP provide the current default values for things like
>> active buffer size thresholds and timeouts? These could help with
>> memory consumption and latency analysis.
>
>
> Sure, we will introduce new configs as well as their default value.
>
> 6. Why do we need to record the hashcode of a record in its
>> RecordContext? It seems not used.
>
>
> The context switch before each callback execution involves setCurrentKey,
> where the hashCode is re-calculated. We cache it for accelerating.
>
> 7. In "timers can be stored on the JVM heap or RocksDB", the link
>> points to a document in flink-1.15. It might be better to verify the
>> referenced content is still valid in the latest Flink and update the
>> link accordingly. Same for other references if any.
>
>
> Thanks for the reminder! Will check.
>
>
> Thanks a lot & Best,
> Zakelly
>
> On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Thanks for the great proposals. I have a few comments comments:
>>
>> - Backpressure Handling. Flink's original backpressure handling is quite
>> robust and the semantics is quite "simple" (simple is beautiful).
>> This mechanism has proven to perform better/robust than the other open
>> source streaming systems, where they were relying on some loopback
>> information.
>> Now that the proposal also relies on loopback (yield in this case), it is
>> not clear how well the new backpressure handling proposed in FLIP-425 is
>> robust and handle fluctuating workloads.
>>
>> - Watermark/Timer Handling: Similar arguments apply for watermark and
>> timer
>> handling. IMHO, we need more benchmarks showing the overhead
>> of epoch management with different parameters (e.g., window size,
>> watermark
>> strategy, etc)
>>
>> - DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
>> However, different cloud providers have different storage consistency
>> models.
>> How do we want to deal with them?
>>
>>  Regards,
>> Jeyhun
>>
>>
>>
>>
>> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>>
>> > Thanks Piotr for sharing your thoughts!
>> >
>> > I guess it depends how we would like to treat the local disks. I've
>> always
>> > > thought about them that almost always eventually all state from the
>> DFS
>> > > should end up cached in the local disks.
>> >
>> >
>> > OK I got it. In our proposal we treat local disk as an optional cache,
>> so
>> > the basic design will handle the case with state residing in DFS only.
>> It
>> > is a more 'cloud-native' approach that does not rely on any local
>> storage
>> > assumptions, which allow users to dynamically adjust the capacity or I/O
>> > bound of remote storage to gain performance or save the cost, even
>> without
>> > a job restart.
>> >
>> > In
>> > > the currently proposed more fine grained solution, you make a single
>> > > request to DFS per each state access.
>> > >
>> >
>> > Ah that's not accurate. Actually we buffer the state requests and
>> process
>> > them in batch, multiple requests will correspond to one DFS access (One
>> > block access for multiple keys performed by RocksDB).
>> >
>> > In that benchmark you mentioned, are you requesting the state
>> > > asynchronously from local disks into memory? If the benefit comes from
>> > > parallel I/O, then I would expect the benefit to disappear/shrink when
>> > > running multiple subtasks on the same machine, as they would be making
>> > > their own parallel requests, right? Also enabling checkpointing would
>> > > further cut into the available I/O budget.
>> >
>> >
>> > That's an interesting topic. Our proposal is specifically aimed at the
>> > scenario where the machine I/O is not fully loaded but the I/O latency
>> has
>> > indeed become a bottleneck for each subtask. While the distributed file
>> > system is a prime example of a scenario characterized by abundant and
>> > easily scalable I/O bandwidth coupled with higher I/O latency. You may
>> > expect to increase the parallelism of a job to enhance the performance
>> as
>> > well, but that also brings in more waste of CPU's and memory for
>> building
>> > up more subtasks. This is one drawback for the computation-storage
>> tightly
>> > coupled nodes. While in our proposal, the parallel I/O with all the
>> > callbacks still running in one task, pre-allocated computational
>> resources
>> > are better utilized. It is a much more lightweight way to perform
>> parallel
>> > I/O.
>> >
>> > Just with what granularity those async requests should be made.
>> > > Making state access asynchronous is definitely the right way to go!
>> >
>> >
>> > I think the current proposal is based on such core ideas:
>> >
>> >    - A pure cloud-native disaggregated state.
>> >    - Fully utilize the given resources and try not to waste them
>> (including
>> >    I/O).
>> >    - The ability to scale isolated resources (I/O or CPU or memory)
>> >    independently.
>> >
>> > We think a fine-grained granularity is more inline with those ideas,
>> > especially without local disk assumptions and without any waste of I/O
>> by
>> > prefetching. Please note that it is not a replacement of the original
>> local
>> > state with synchronous execution. Instead this is a solution embracing
>> the
>> > cloud-native era, providing much more scalability and resource
>> efficiency
>> > when handling a *huge state*.
>> >
>> > What also worries me a lot in this fine grained model is the effect on
>> the
>> > > checkpointing times.
>> >
>> >
>> > Your concerns are very reasonable. Faster checkpointing is always a core
>> > advantage of disaggregated state, but only for the async phase. There
>> will
>> > be some complexity introduced by in-flight requests, but I'd suggest a
>> > checkpoint containing those in-flight state requests as part of the
>> state,
>> > to accelerate the sync phase by skipping the buffer draining. This makes
>> > the buffer size have little impact on checkpoint time. And all the
>> changes
>> > keep within the execution model we proposed while the checkpoint barrier
>> > alignment or handling will not be touched in our proposal, so I guess
>> > the complexity is relatively controllable. I have faith in that :)
>> >
>> > Also regarding the overheads, it would be great if you could provide
>> > > profiling results of the benchmarks that you conducted to verify the
>> > > results. Or maybe if you could describe the steps to reproduce the
>> > results?
>> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
>> > >
>> >
>> > Yes we could profile the benchmarks. And for the comparison of "Hashmap
>> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job written
>> > with async APIs but disabling the async execution by directly completing
>> > the future using sync state access. This evaluates the overhead of newly
>> > introduced modules like 'AEC' in sync execution (even though they are
>> not
>> > designed for it). The code will be provided later. For other results of
>> our
>> > PoC[1], you can follow the instructions here[2] to reproduce. Since the
>> > compilation may take some effort, we will directly provide the jar for
>> > testing next week.
>> >
>> >
>> > And @Yunfeng Zhou, I have noticed your mail but it is a bit late in my
>> > local time and the next few days are weekends. So I will reply to you
>> > later. Thanks for your response!
>> >
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
>> > [2]
>> >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
>> >
>> >
>> > Best,
>> > Zakelly
>> >
>> >
>> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
>> flink.zhouyunf...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > Thanks for proposing this design! I just read FLIP-424 and FLIP-425
>> > > and have some questions about the proposed changes.
>> > >
>> > > For Async API (FLIP-424)
>> > >
>> > > 1. Why do we need a close() method on StateIterator? This method seems
>> > > unused in the usage example codes.
>> > >
>> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
>> > > entries are allowed". It might be better to further explain what will
>> > > happen if a null value is passed, ignoring the value in the returned
>> > > Collection or throwing exceptions. Given that
>> > > FutureUtils.emptyFuture() can be returned in the example code, I
>> > > suppose the former one might be correct.
>> > >
>> > >
>> > > For Async Execution (FLIP-425)
>> > >
>> > > 1. According to Fig 2 of this FLIP, if a recordB has its key collide
>> > > with an ongoing recordA, its processElement() method can still be
>> > > triggered immediately, and then it might be moved to the blocking
>> > > buffer in AEC if it involves state operations. This means that
>> > > recordB's output will precede recordA's output in downstream
>> > > operators, if recordA involves state operations while recordB does
>> > > not. This will harm the correctness of Flink jobs in some use cases.
>> > > For example, in dim table join cases, recordA could be a delete
>> > > operation that involves state access, while recordB could be an insert
>> > > operation that needs to visit external storage without state access.
>> > > If recordB's output precedes recordA's, then an entry that is supposed
>> > > to finally exist with recordB's value in the sink table might actually
>> > > be deleted according to recordA's command. This situation should be
>> > > avoided and the order of same-key records should be strictly
>> > > preserved.
>> > >
>> > > 2. The FLIP says that StateRequests submitted by Callbacks will not
>> > > invoke further yield() methods. Given that yield() is used when there
>> > > is "too much" in-flight data, does it mean StateRequests submitted by
>> > > Callbacks will never be "too much"? What if the total number of
>> > > StateRequests exceed the capacity of Flink operator's memory space?
>> > >
>> > > 3. In the "Watermark" section, this FLIP provided an out-of-order
>> > > execution mode apart from the default strictly-ordered mode, which can
>> > > optimize performance by allowing more concurrent executions.
>> > >
>> > > 3.1 I'm concerned that the out-of-order execution mode, along with the
>> > > epoch mechanism, would bring more complexity to the execution model
>> > > than the performance improvement it promises. Could we add some
>> > > benchmark results proving the benefit of this mode?
>> > >
>> > > 3.2 The FLIP might need to add a public API section describing how
>> > > users or developers can switch between these two execution modes.
>> > >
>> > > 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
>> > > there are also more other events that might appear in the stream of
>> > > data records. It might be better to generalize the execution mode
>> > > mechanism to handle all possible events.
>> > >
>> > > 4. It might be better to treat callback-handling as a
>> > > MailboxDefaultAction, instead of Mails, to avoid the overhead of
>> > > repeatedly creating Mail objects.
>> > >
>> > > 5. Could this FLIP provide the current default values for things like
>> > > active buffer size thresholds and timeouts? These could help with
>> > > memory consumption and latency analysis.
>> > >
>> > > 6. Why do we need to record the hashcode of a record in its
>> > > RecordContext? It seems not used.
>> > >
>> > > 7. In "timers can be stored on the JVM heap or RocksDB", the link
>> > > points to a document in flink-1.15. It might be better to verify the
>> > > referenced content is still valid in the latest Flink and update the
>> > > link accordingly. Same for other references if any.
>> > >
>> > > Best,
>> > > Yunfeng Zhou
>> > >
>> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com>
>> wrote:
>> > > >
>> > > > Hi Devs,
>> > > >
>> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li,
>> Hangxiang
>> > Yu,
>> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion about
>> > > introducing
>> > > > Disaggregated State Storage and Management in Flink 2.0.
>> > > >
>> > > > The past decade has witnessed a dramatic shift in Flink's deployment
>> > > mode,
>> > > > workload patterns, and hardware improvements. We've moved from the
>> > > > map-reduce era where workers are computation-storage tightly coupled
>> > > nodes
>> > > > to a cloud-native world where containerized deployments on
>> Kubernetes
>> > > > become standard. To enable Flink's Cloud-Native future, we introduce
>> > > > Disaggregated State Storage and Management that uses DFS as primary
>> > > storage
>> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
>> > > >
>> > > > Design Details can be found in FLIP-423[1].
>> > > >
>> > > > This new architecture is aimed to solve the following challenges
>> > brought
>> > > in
>> > > > the cloud-native era for Flink.
>> > > > 1. Local Disk Constraints in containerization
>> > > > 2. Spiky Resource Usage caused by compaction in the current state
>> model
>> > > > 3. Fast Rescaling for jobs with large states (hundreds of Terabytes)
>> > > > 4. Light and Fast Checkpoint in a native way
>> > > >
>> > > > More specifically, we want to reach a consensus on the following
>> issues
>> > > in
>> > > > this discussion:
>> > > >
>> > > > 1. Overall design
>> > > > 2. Proposed Changes
>> > > > 3. Design details to achieve Milestone1
>> > > >
>> > > > In M1, we aim to achieve an end-to-end baseline version using DFS as
>> > > > primary storage and complete core functionalities, including:
>> > > >
>> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
>> > > > asynchronous state access.
>> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a
>> non-blocking
>> > > > execution model leveraging the asynchronous APIs introduced in
>> > FLIP-424.
>> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of
>> > remote
>> > > > state data in batches to avoid unnecessary round-trip costs for
>> remote
>> > > > access
>> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial
>> > version
>> > > of
>> > > > the ForSt disaggregated state store.
>> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
>> > > > checkpointing mechanisms with the disaggregated state store for
>> fault
>> > > > tolerance and fast rescaling.
>> > > >
>> > > > We will vote on each FLIP in separate threads to make sure each FLIP
>> > > > reaches a consensus. But we want to keep the discussion within a
>> > focused
>> > > > thread (this thread) for easier tracking of contexts to avoid
>> > duplicated
>> > > > questions/discussions and also to think of the problem/solution in a
>> > full
>> > > > picture.
>> > > >
>> > > > Looking forward to your feedback
>> > > >
>> > > > Best,
>> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
>> > > >
>> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
>> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
>> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
>> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
>> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
>> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
>> > >
>> >
>>
>

Reply via email to