Hi Yunfeng,

Thanks for the suggestion!

I will reorganize the FLIP-425 accordingly.


Best,
Zakelly

On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com>
wrote:

> Hi Xintong and Zakelly,
>
> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> I agree with it that watermarks can use only out-of-order mode for
> now, because there is still not a concrete example showing the
> correctness risk about it. However, the strictly-ordered mode should
> still be supported as the default option for non-record event types
> other than watermark, at least for checkpoint barriers.
>
> I noticed that this information has already been documented in "For
> other non-record events, such as RecordAttributes ...", but it's at
> the bottom of the "Watermark" section, which might not be very
> obvious. Thus it might be better to reorganize the FLIP to better
> claim that the two order modes are designed for all non-record events,
> and which mode this FLIP would choose for each type of event.
>
> Best,
> Yunfeng
>
> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com>
> wrote:
> >
> > Thanks for the quick response. Sounds good to me.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> >
> > > Hi Xintong,
> > >
> > > Thanks for sharing your thoughts!
> > >
> > > 1. Regarding Record-ordered and State-ordered of processElement.
> > > >
> > > > I understand that while State-ordered likely provides better
> performance,
> > > > Record-ordered is sometimes required for correctness. The question
> is how
> > > > should a user choose between these two modes? My concern is that
> such a
> > > > decision may require users to have in-depth knowledge about the Flink
> > > > internals, and may lead to correctness issues if State-ordered is
> chosen
> > > > improperly.
> > > >
> > > > I'd suggest not to expose such a knob, at least in the first version.
> > > That
> > > > means always use Record-ordered for custom operators / UDFs, and keep
> > > > State-ordered for internal usages (built-in operators) only.
> > > >
> > >
> > > Indeed, users may not be able to choose the mode properly. I agree to
> keep
> > > such options for internal use.
> > >
> > >
> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> > > >
> > > > I'm not entirely sure about Strictly-ordered being the default, or
> even
> > > > being supported. From my understanding, a Watermark(T) only suggests
> that
> > > > all records with event time before T has arrived, and it has nothing
> to
> > > do
> > > > with whether records with event time after T has arrived or not. From
> > > that
> > > > perspective, preventing certain records from arriving before a
> Watermark
> > > is
> > > > never supported. I also cannot come up with any use case where
> > > > Strictly-ordered is necessary. This implies the same issue as 1): how
> > > does
> > > > the user choose between the two modes?
> > > >
> > > > I'd suggest not expose the knob to users and only support
> Out-of-order,
> > > > until we see a concrete use case that Strictly-ordered is needed.
> > > >
> > >
> > > The semantics of watermarks do not define the sequence between a
> watermark
> > > and subsequent records. For the most part, this is inconsequential,
> except
> > > it may affect some current users who have previously relied on the
> implicit
> > > assumption of an ordered execution. I'd be fine with initially
> supporting
> > > only out-of-order processing. We may consider exposing the
> > > 'Strictly-ordered' mode once we encounter a concrete use case that
> > > necessitates it.
> > >
> > >
> > > My philosophies behind not exposing the two config options are:
> > > > - There are already too many options in Flink that barely know how
> to use
> > > > them. I think Flink should try as much as possible to decide its own
> > > > behavior, rather than throwing all the decisions to the users.
> > > > - It's much harder to take back knobs than to introduce them.
> Therefore,
> > > > options should be introduced only if concrete use cases are
> identified.
> > > >
> > >
> > > I agree to keep minimal configurable items especially for the MVP.
> Given
> > > that we have the opportunity to refine the functionality before the
> > > framework transitions from @Experimental to @PublicEvolving, it makes
> sense
> > > to refrain from presenting user-facing options until we have ensured
> > > their necessity.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com>
> > > wrote:
> > >
> > > > Sorry for joining the discussion late.
> > > >
> > > > I have two questions about FLIP-425.
> > > >
> > > > 1. Regarding Record-ordered and State-ordered of processElement.
> > > >
> > > > I understand that while State-ordered likely provides better
> performance,
> > > > Record-ordered is sometimes required for correctness. The question
> is how
> > > > should a user choose between these two modes? My concern is that
> such a
> > > > decision may require users to have in-depth knowledge about the Flink
> > > > internals, and may lead to correctness issues if State-ordered is
> chosen
> > > > improperly.
> > > >
> > > > I'd suggest not to expose such a knob, at least in the first version.
> > > That
> > > > means always use Record-ordered for custom operators / UDFs, and keep
> > > > State-ordered for internal usages (built-in operators) only.
> > > >
> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> > > >
> > > > I'm not entirely sure about Strictly-ordered being the default, or
> even
> > > > being supported. From my understanding, a Watermark(T) only suggests
> that
> > > > all records with event time before T has arrived, and it has nothing
> to
> > > do
> > > > with whether records with event time after T has arrived or not. From
> > > that
> > > > perspective, preventing certain records from arriving before a
> Watermark
> > > is
> > > > never supported. I also cannot come up with any use case where
> > > > Strictly-ordered is necessary. This implies the same issue as 1): how
> > > does
> > > > the user choose between the two modes?
> > > >
> > > > I'd suggest not expose the knob to users and only support
> Out-of-order,
> > > > until we see a concrete use case that Strictly-ordered is needed.
> > > >
> > > >
> > > > My philosophies behind not exposing the two config options are:
> > > > - There are already too many options in Flink that barely know how
> to use
> > > > them. I think Flink should try as much as possible to decide its own
> > > > behavior, rather than throwing all the decisions to the users.
> > > > - It's much harder to take back knobs than to introduce them.
> Therefore,
> > > > options should be introduced only if concrete use cases are
> identified.
> > > >
> > > > WDYT?
> > > >
> > > > Best,
> > > >
> > > > Xintong
> > > >
> > > >
> > > >
> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid>
> > > wrote:
> > > >
> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which
> introduced
> > > the
> > > > > intention of the big change and high level architecture. Great
> content
> > > > btw!
> > > > > The only public interface change for this FLIP is one new config
> to use
> > > > > ForSt. It makes sense to have one dedicated discussion thread for
> each
> > > > > concrete system design.
> > > > >
> > > > > @Zakelly The links in your mail do not work except the last one,
> > > because
> > > > > the FLIP-xxx has been included into the url like
> > > > >
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > > .
> > > > >
> > > > > NIT fix:
> > > > >
> > > > > FLIP-424:
> > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > > >
> > > > > FLIP-425:
> > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > > >
> > > > > FLIP-426:
> > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > > >
> > > > > FLIP-427:
> > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > > >
> > > > > FLIP-428:
> > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <zakelly....@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > Thank you all for a lively discussion here, and it is a good
> time to
> > > > move
> > > > > > forward to more detailed discussions. Thus we open several
> threads
> > > for
> > > > > > sub-FLIPs:
> > > > > >
> > > > > > FLIP-424:
> > > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > > > > FLIP-425
> > > > > > <
> > > > >
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > > >:
> > > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > > > > FLIP-426
> > > > > > <
> > > > >
> > >
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> > > > >:
> > > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > > > > FLIP-427
> > > > > > <
> > > > >
> > >
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> > > > >:
> > > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > > > > FLIP-428
> > > > > > <
> > > > >
> > >
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> > > > >:
> > > > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > > > >
> > > > > > If you want to talk about the overall architecture, roadmap,
> > > milestones
> > > > > or
> > > > > > something related with multiple FLIPs, please post it here.
> Otherwise
> > > > you
> > > > > > can discuss some details in separate mails. Let's try to avoid
> > > repeated
> > > > > > discussion in different threads. I will sync important messages
> here
> > > if
> > > > > > there are any in the above threads.
> > > > > >
> > > > > > And reply to @Jeyhun: We will ensure the content between those
> FLIPs
> > > is
> > > > > > consistent.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > I have been a bit busy these few weeks and sorry for responding
> > > late.
> > > > > > >
> > > > > > > The original thinking of keeping discussion within one thread
> is
> > > for
> > > > > > easier
> > > > > > > tracking and avoid for repeated discussion in different
> threads.
> > > > > > >
> > > > > > > For details, It might be good to start in different threads if
> > > > needed.
> > > > > > >
> > > > > > > We will think of a way to better organize the discussion.
> > > > > > >
> > > > > > > Best
> > > > > > > Yuan
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> > > je.kari...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > + 1 for the suggestion.
> > > > > > > > Maybe we can the discussion with the FLIPs with minimum
> > > > dependencies
> > > > > > > (from
> > > > > > > > the other new/proposed FLIPs).
> > > > > > > > Based on our discussion on a particular FLIP, the subsequent
> (or
> > > > its
> > > > > > > > dependent) FLIP(s) can be updated accordingly?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Jeyhun
> > > > > > > >
> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
> gyula.f...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey all!
> > > > > > > > >
> > > > > > > > > This is a massive improvement / work. I just started going
> > > > through
> > > > > > the
> > > > > > > > > Flips and have a more or less meta comment.
> > > > > > > > >
> > > > > > > > > While it's good to keep the overall architecture discussion
> > > > here, I
> > > > > > > think
> > > > > > > > > we should still have separate discussions for each FLIP
> where
> > > we
> > > > > can
> > > > > > > > > discuss interface details etc. With so much content if we
> start
> > > > > > adding
> > > > > > > > > minor comments here that will lead to nowhere but those
> > > > discussions
> > > > > > are
> > > > > > > > > still important and we should have them in separate threads
> > > (one
> > > > > for
> > > > > > > each
> > > > > > > > > FLIP)
> > > > > > > > >
> > > > > > > > > What do you think?
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
> fredia...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi team,
> > > > > > > > > >
> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have
> > > > > > supplemented
> > > > > > > > > > several updates to answer high-frequency questions:
> > > > > > > > > >
> > > > > > > > > > 1. We captured a flame graph of the Hashmap state
> backend in
> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1], which
> > > > reveals
> > > > > > that
> > > > > > > > > > the framework overhead (including reference counting,
> > > > > > future-related
> > > > > > > > > > code and so on) consumes about 9% of the keyed operator
> CPU
> > > > time.
> > > > > > > > > > 2. We added a set of comparative experiments for
> watermark
> > > > > > > processing,
> > > > > > > > > > the performance of Out-Of-Order mode is 70% better than
> > > > > > > > > > strictly-ordered mode under ~140MB state size.
> Instructions
> > > on
> > > > > how
> > > > > > to
> > > > > > > > > > run this test have also been added[2].
> > > > > > > > > > 3. Regarding the order of StreamRecord, whether it has
> state
> > > > > access
> > > > > > > or
> > > > > > > > > > not. We supplemented a new *Strict order of
> > > > 'processElement'*[3].
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > > > > > > > [3]
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Yanfei
> > > > > > > > > >
> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二
> > > > 09:25写道:
> > > > > > > > > > >
> > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > >
> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later since
> it
> > > is
> > > > > not
> > > > > > > > > > important.
> > > > > > > > > > >
> > > > > > > > > > > It seems that we still have some details to confirm
> about
> > > > this
> > > > > > > > > > > question. Let's postpone this to after the critical
> parts
> > > of
> > > > > the
> > > > > > > > > > > design are settled.
> > > > > > > > > > >
> > > > > > > > > > > > 8. Yes, we had considered ... metrics should be like
> > > > > > afterwards.
> > > > > > > > > > >
> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing
> this
> > > > topic
> > > > > > in
> > > > > > > > > > milestone 2.
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to the detailed design about the strict
> > > mode
> > > > > > > between
> > > > > > > > > > > same-key records and the benchmark results about the
> epoch
> > > > > > > mechanism.
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > > Yunfeng
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
> > > > > > zakelly....@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Yunfeng,
> > > > > > > > > > > >
> > > > > > > > > > > > For 1:
> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize
> it is
> > > a
> > > > > > common
> > > > > > > > > case
> > > > > > > > > > the same-key record should be blocked before the
> > > > > `processElement`.
> > > > > > It
> > > > > > > > is
> > > > > > > > > > easier for users to understand. Thus I will introduce a
> > > strict
> > > > > mode
> > > > > > > for
> > > > > > > > > > this and make it default. My rough idea is just like
> yours,
> > > by
> > > > > > > invoking
> > > > > > > > > > some method of AEC instance before `processElement`. The
> > > > detailed
> > > > > > > > design
> > > > > > > > > > will be described in FLIP later.
> > > > > > > > > > > >
> > > > > > > > > > > > For 2:
> > > > > > > > > > > > I agree with you. We could throw exceptions for now
> and
> > > > > > optimize
> > > > > > > > this
> > > > > > > > > > later.
> > > > > > > > > > > >
> > > > > > > > > > > > For 5:
> > > > > > > > > > > >>
> > > > > > > > > > > >> It might be better to move the default values to the
> > > > > Proposed
> > > > > > > > > Changes
> > > > > > > > > > > >> section instead of making them public for now, as
> there
> > > > will
> > > > > > be
> > > > > > > > > > > >> compatibility issues once we want to dynamically
> adjust
> > > > the
> > > > > > > > > thresholds
> > > > > > > > > > > >> and timeouts in future.
> > > > > > > > > > > >
> > > > > > > > > > > > Agreed. The whole framework is under experiment
> until we
> > > > > think
> > > > > > it
> > > > > > > > is
> > > > > > > > > > complete in 2.0 or later. The default value should be
> better
> > > > > > > determined
> > > > > > > > > > with more testing results and production experience.
> > > > > > > > > > > >
> > > > > > > > > > > >> The configuration execution.async-state.enabled
> seems
> > > > > > > unnecessary,
> > > > > > > > > as
> > > > > > > > > > > >> the infrastructure may automatically get this
> > > information
> > > > > from
> > > > > > > the
> > > > > > > > > > > >> detailed state backend configurations. We may
> revisit
> > > this
> > > > > > part
> > > > > > > > > after
> > > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > I'm not very sure if there is any use case where
> users
> > > > write
> > > > > > > their
> > > > > > > > > > code using async APIs but run their job in a synchronous
> way.
> > > > The
> > > > > > > first
> > > > > > > > > two
> > > > > > > > > > scenarios that come to me are for benchmarking or for a
> small
> > > > > > state,
> > > > > > > > > while
> > > > > > > > > > they don't want to rewrite their code. Actually it is
> easy to
> > > > > > > support,
> > > > > > > > so
> > > > > > > > > > I'd suggest providing it. But I'm fine with revisiting
> this
> > > > later
> > > > > > > since
> > > > > > > > > it
> > > > > > > > > > is not important. WDYT?
> > > > > > > > > > > >
> > > > > > > > > > > > For 8:
> > > > > > > > > > > > Yes, we had considered the I/O metrics group
> especially
> > > the
> > > > > > > > > > back-pressure, idle and task busy per second. In the
> current
> > > > plan
> > > > > > we
> > > > > > > > can
> > > > > > > > > do
> > > > > > > > > > state access during back-pressure, meaning that those
> metrics
> > > > for
> > > > > > > input
> > > > > > > > > > would better be redefined. I suggest we discuss these
> > > existing
> > > > > > > metrics
> > > > > > > > as
> > > > > > > > > > well as some new metrics that should be introduced in
> > > FLIP-431
> > > > > > later
> > > > > > > in
> > > > > > > > > > milestone 2, since we have basically finished the
> framework
> > > > thus
> > > > > we
> > > > > > > > will
> > > > > > > > > > have a better view of what metrics should be like
> afterwards.
> > > > > WDYT?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Zakelly
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> Hi Zakelly,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for the responses!
> > > > > > > > > > > >>
> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL
> > > developers.
> > > > > ...
> > > > > > > mode
> > > > > > > > > > for StreamRecord processing.
> > > > > > > > > > > >>
> > > > > > > > > > > >> In DataStream API there should also be use cases
> when
> > > the
> > > > > > order
> > > > > > > of
> > > > > > > > > > > >> output is strictly required. I agree with it that
> SQL
> > > > > experts
> > > > > > > may
> > > > > > > > > help
> > > > > > > > > > > >> provide more concrete use cases that can accelerate
> our
> > > > > > > > discussion,
> > > > > > > > > > > >> but please allow me to search for DataStream use
> cases
> > > > that
> > > > > > can
> > > > > > > > > prove
> > > > > > > > > > > >> the necessity of this strict order preservation
> mode, if
> > > > > > answers
> > > > > > > > > from
> > > > > > > > > > > >> SQL experts are shown to be negative.
> > > > > > > > > > > >>
> > > > > > > > > > > >> For your convenience, my current rough idea is that
> we
> > > can
> > > > > > add a
> > > > > > > > > > > >> module between the Input(s) and processElement()
> module
> > > in
> > > > > > Fig 2
> > > > > > > > of
> > > > > > > > > > > >> FLIP-425. The module will be responsible for caching
> > > > records
> > > > > > > whose
> > > > > > > > > > > >> keys collide with in-flight records, and AEC will
> only
> > > be
> > > > > > > > > responsible
> > > > > > > > > > > >> for handling async state calls, without knowing the
> > > record
> > > > > > each
> > > > > > > > call
> > > > > > > > > > > >> belongs to. We may revisit this topic once the
> necessity
> > > > of
> > > > > > the
> > > > > > > > > strict
> > > > > > > > > > > >> order mode is clarified.
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ...
> instead of
> > > > > > > invoking
> > > > > > > > > > yield
> > > > > > > > > > > >>
> > > > > > > > > > > >> Your suggestions generally appeal to me. I think we
> may
> > > > let
> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for now,
> since
> > > the
> > > > > > > majority
> > > > > > > > > of
> > > > > > > > > > > >> a StateRequest should just be references to existing
> > > Java
> > > > > > > objects,
> > > > > > > > > > > >> which only occupies very small memory space and can
> > > hardly
> > > > > > cause
> > > > > > > > OOM
> > > > > > > > > > > >> in common cases. We can monitor the pending
> > > StateRequests
> > > > > and
> > > > > > if
> > > > > > > > > there
> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we can
> throw
> > > > > > > Exceptions
> > > > > > > > > with
> > > > > > > > > > > >> proper messages notifying users what to do, like
> > > > increasing
> > > > > > > memory
> > > > > > > > > > > >> through configurations.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Your suggestions to adjust threshold adaptively or
> to
> > > use
> > > > > the
> > > > > > > > > blocking
> > > > > > > > > > > >> buffer sounds good, and in my opinion we can
> postpone
> > > them
> > > > > to
> > > > > > > > future
> > > > > > > > > > > >> FLIPs since they seem to only benefit users in rare
> > > cases.
> > > > > > Given
> > > > > > > > > that
> > > > > > > > > > > >> FLIP-423~428 has already been a big enough design,
> it
> > > > might
> > > > > be
> > > > > > > > > better
> > > > > > > > > > > >> to focus on the most critical design for now and
> > > postpone
> > > > > > > > > > > >> optimizations like this. WDYT?
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as well as
> > > their
> > > > > > > default
> > > > > > > > > > value.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks for adding the default values and the values
> > > > > themselves
> > > > > > > > LGTM.
> > > > > > > > > > > >> It might be better to move the default values to the
> > > > > Proposed
> > > > > > > > > Changes
> > > > > > > > > > > >> section instead of making them public for now, as
> there
> > > > will
> > > > > > be
> > > > > > > > > > > >> compatibility issues once we want to dynamically
> adjust
> > > > the
> > > > > > > > > thresholds
> > > > > > > > > > > >> and timeouts in future.
> > > > > > > > > > > >>
> > > > > > > > > > > >> The configuration execution.async-state.enabled
> seems
> > > > > > > unnecessary,
> > > > > > > > > as
> > > > > > > > > > > >> the infrastructure may automatically get this
> > > information
> > > > > from
> > > > > > > the
> > > > > > > > > > > >> detailed state backend configurations. We may
> revisit
> > > this
> > > > > > part
> > > > > > > > > after
> > > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes to
> me
> > > > that
> > > > > > > > > > > >>
> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that measure
> the
> > > > time
> > > > > a
> > > > > > > > Flink
> > > > > > > > > > > >> job is back-pressured by State IOs? Under the
> current
> > > > > design,
> > > > > > > this
> > > > > > > > > > > >> metric could measure the time when the blocking
> buffer
> > > is
> > > > > full
> > > > > > > and
> > > > > > > > > > > >> yield() cannot get callbacks to process, which
> means the
> > > > > > > operator
> > > > > > > > is
> > > > > > > > > > > >> fully waiting for state responses.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Best regards,
> > > > > > > > > > > >> Yunfeng
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
> > > > > > > > zakelly....@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Hi Yunfeng,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for your detailed comments!
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 1. Why do we need a close() method on
> StateIterator?
> > > > This
> > > > > > > > method
> > > > > > > > > > seems
> > > > > > > > > > > >> >> unused in the usage example codes.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The `close()` is introduced to release internal
> > > > resources,
> > > > > > but
> > > > > > > > it
> > > > > > > > > > does not seem to require the user to call it. I removed
> this.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is
> > > stated
> > > > > that
> > > > > > > "No
> > > > > > > > > > null
> > > > > > > > > > > >> >> entries are allowed". It might be better to
> further
> > > > > explain
> > > > > > > > what
> > > > > > > > > > will
> > > > > > > > > > > >> >> happen if a null value is passed, ignoring the
> value
> > > in
> > > > > the
> > > > > > > > > > returned
> > > > > > > > > > > >> >> Collection or throwing exceptions. Given that
> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the
> > > > example
> > > > > > > code,
> > > > > > > > I
> > > > > > > > > > > >> >> suppose the former one might be correct.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The statement "No null entries are allowed"
> refers to
> > > > the
> > > > > > > > > > parameters, it means some arrayList like [null,
> StateFuture1,
> > > > > > > > > StateFuture2]
> > > > > > > > > > passed in are not allowed, and an Exception will be
> thrown.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This
> > > > situation
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > > >> >> avoided and the order of same-key records should
> be
> > > > > > strictly
> > > > > > > > > > > >> >> preserved.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I will discuss this with some expert SQL
> developers.
> > > And
> > > > > if
> > > > > > it
> > > > > > > > is
> > > > > > > > > > valid and common, I suggest a strict order preservation
> mode
> > > > for
> > > > > > > > > > StreamRecord processing. WDYT?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by
> > > > > Callbacks
> > > > > > > will
> > > > > > > > > not
> > > > > > > > > > > >> >> invoke further yield() methods. Given that
> yield() is
> > > > > used
> > > > > > > when
> > > > > > > > > > there
> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean
> > > > StateRequests
> > > > > > > > > submitted
> > > > > > > > > > by
> > > > > > > > > > > >> >> Callbacks will never be "too much"? What if the
> total
> > > > > > number
> > > > > > > of
> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink
> operator's
> > > > > > memory
> > > > > > > > > space?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The amount of parallel StateRequests for one
> > > > StreamRecord
> > > > > > > cannot
> > > > > > > > > be
> > > > > > > > > > determined since the code is written by users. So the
> > > in-flight
> > > > > > > > requests
> > > > > > > > > > may be "too much", and may cause OOM. Users should
> > > re-configure
> > > > > > their
> > > > > > > > > job,
> > > > > > > > > > controlling the amount of on-going StreamRecord. And I
> > > suggest
> > > > > two
> > > > > > > ways
> > > > > > > > > to
> > > > > > > > > > avoid this:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Adaptively adjust the count of on-going
> StreamRecord
> > > > > > according
> > > > > > > > to
> > > > > > > > > > historical StateRequests amount.
> > > > > > > > > > > >> > Also control the max StateRequests that can be
> > > executed
> > > > in
> > > > > > > > > parallel
> > > > > > > > > > for each StreamRecord, and if it exceeds, put the new
> > > > > StateRequest
> > > > > > in
> > > > > > > > the
> > > > > > > > > > blocking buffer waiting for execution (instead of
> invoking
> > > > > > yield()).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > WDYT?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order execution
> > > mode,
> > > > > > along
> > > > > > > > > with
> > > > > > > > > > the
> > > > > > > > > > > >> >> epoch mechanism, would bring more complexity to
> the
> > > > > > execution
> > > > > > > > > model
> > > > > > > > > > > >> >> than the performance improvement it promises.
> Could
> > > we
> > > > > add
> > > > > > > some
> > > > > > > > > > > >> >> benchmark results proving the benefit of this
> mode?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Agreed, will do.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API
> section
> > > > > > > describing
> > > > > > > > > how
> > > > > > > > > > > >> >> users or developers can switch between these two
> > > > > execution
> > > > > > > > modes.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Good point. We will add a Public API section.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint
> mentioned
> > > > in
> > > > > > this
> > > > > > > > > FLIP,
> > > > > > > > > > > >> >> there are also more other events that might
> appear in
> > > > the
> > > > > > > > stream
> > > > > > > > > of
> > > > > > > > > > > >> >> data records. It might be better to generalize
> the
> > > > > > execution
> > > > > > > > mode
> > > > > > > > > > > >> >> mechanism to handle all possible events.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the reminder.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 4. It might be better to treat callback-handling
> as a
> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid
> the
> > > > > > overhead
> > > > > > > > of
> > > > > > > > > > > >> >> repeatedly creating Mail objects.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >  I thought the intermediated wrapper for callback
> can
> > > > not
> > > > > be
> > > > > > > > > > omitted, since there will be some context switch before
> each
> > > > > > > execution.
> > > > > > > > > The
> > > > > > > > > > MailboxDefaultAction in most cases is processInput right?
> > > While
> > > > > the
> > > > > > > > > > callback should be executed with higher priority. I'd
> suggest
> > > > not
> > > > > > > > > changing
> > > > > > > > > > the basic logic of Mailbox and the default action since
> it is
> > > > > very
> > > > > > > > > critical
> > > > > > > > > > for performance. But yes, we will try our best to avoid
> > > > creating
> > > > > > > > > > intermediated objects.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 5. Could this FLIP provide the current default
> values
> > > > for
> > > > > > > > things
> > > > > > > > > > like
> > > > > > > > > > > >> >> active buffer size thresholds and timeouts? These
> > > could
> > > > > > help
> > > > > > > > with
> > > > > > > > > > > >> >> memory consumption and latency analysis.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Sure, we will introduce new configs as well as
> their
> > > > > default
> > > > > > > > > value.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a
> record
> > > in
> > > > > its
> > > > > > > > > > > >> >> RecordContext? It seems not used.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The context switch before each callback execution
> > > > involves
> > > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We
> cache
> > > it
> > > > > for
> > > > > > > > > > accelerating.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or
> > > > RocksDB",
> > > > > > the
> > > > > > > > link
> > > > > > > > > > > >> >> points to a document in flink-1.15. It might be
> > > better
> > > > to
> > > > > > > > verify
> > > > > > > > > > the
> > > > > > > > > > > >> >> referenced content is still valid in the latest
> Flink
> > > > and
> > > > > > > > update
> > > > > > > > > > the
> > > > > > > > > > > >> >> link accordingly. Same for other references if
> any.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks for the reminder! Will check.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Thanks a lot & Best,
> > > > > > > > > > > >> > Zakelly
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> > > > > > > > > je.kari...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Hi,
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> Thanks for the great proposals. I have a few
> comments
> > > > > > > comments:
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original
> > > backpressure
> > > > > > > handling
> > > > > > > > > is
> > > > > > > > > > quite
> > > > > > > > > > > >> >> robust and the semantics is quite "simple"
> (simple is
> > > > > > > > beautiful).
> > > > > > > > > > > >> >> This mechanism has proven to perform
> better/robust
> > > than
> > > > > the
> > > > > > > > other
> > > > > > > > > > open
> > > > > > > > > > > >> >> source streaming systems, where they were
> relying on
> > > > some
> > > > > > > > > loopback
> > > > > > > > > > > >> >> information.
> > > > > > > > > > > >> >> Now that the proposal also relies on loopback
> (yield
> > > in
> > > > > > this
> > > > > > > > > > case), it is
> > > > > > > > > > > >> >> not clear how well the new backpressure handling
> > > > proposed
> > > > > > in
> > > > > > > > > > FLIP-425 is
> > > > > > > > > > > >> >> robust and handle fluctuating workloads.
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments
> apply
> > > for
> > > > > > > > watermark
> > > > > > > > > > and timer
> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing
> the
> > > > > > overhead
> > > > > > > > > > > >> >> of epoch management with different parameters
> (e.g.,
> > > > > window
> > > > > > > > size,
> > > > > > > > > > watermark
> > > > > > > > > > > >> >> strategy, etc)
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in
> > > FLIP-427
> > > > is
> > > > > > > > > > DFS-agnostic.
> > > > > > > > > > > >> >> However, different cloud providers have different
> > > > storage
> > > > > > > > > > consistency
> > > > > > > > > > > >> >> models.
> > > > > > > > > > > >> >> How do we want to deal with them?
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>  Regards,
> > > > > > > > > > > >> >> Jeyhun
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> > > > > > > > > zakelly....@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >> >>
> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > I guess it depends how we would like to treat
> the
> > > > local
> > > > > > > > disks.
> > > > > > > > > > I've always
> > > > > > > > > > > >> >> > > thought about them that almost always
> eventually
> > > > all
> > > > > > > state
> > > > > > > > > > from the DFS
> > > > > > > > > > > >> >> > > should end up cached in the local disks.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat local
> disk as
> > > > an
> > > > > > > > optional
> > > > > > > > > > cache, so
> > > > > > > > > > > >> >> > the basic design will handle the case with
> state
> > > > > residing
> > > > > > > in
> > > > > > > > > DFS
> > > > > > > > > > only. It
> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that does not
> > > rely
> > > > on
> > > > > > any
> > > > > > > > > > local storage
> > > > > > > > > > > >> >> > assumptions, which allow users to dynamically
> > > adjust
> > > > > the
> > > > > > > > > > capacity or I/O
> > > > > > > > > > > >> >> > bound of remote storage to gain performance or
> save
> > > > the
> > > > > > > cost,
> > > > > > > > > > even without
> > > > > > > > > > > >> >> > a job restart.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > In
> > > > > > > > > > > >> >> > > the currently proposed more fine grained
> > > solution,
> > > > > you
> > > > > > > > make a
> > > > > > > > > > single
> > > > > > > > > > > >> >> > > request to DFS per each state access.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the
> > > state
> > > > > > > requests
> > > > > > > > > > and process
> > > > > > > > > > > >> >> > them in batch, multiple requests will
> correspond to
> > > > one
> > > > > > DFS
> > > > > > > > > > access (One
> > > > > > > > > > > >> >> > block access for multiple keys performed by
> > > RocksDB).
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you
> requesting
> > > > the
> > > > > > > state
> > > > > > > > > > > >> >> > > asynchronously from local disks into memory?
> If
> > > the
> > > > > > > benefit
> > > > > > > > > > comes from
> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the
> benefit to
> > > > > > > > > > disappear/shrink when
> > > > > > > > > > > >> >> > > running multiple subtasks on the same
> machine, as
> > > > > they
> > > > > > > > would
> > > > > > > > > > be making
> > > > > > > > > > > >> >> > > their own parallel requests, right? Also
> enabling
> > > > > > > > > > checkpointing would
> > > > > > > > > > > >> >> > > further cut into the available I/O budget.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > That's an interesting topic. Our proposal is
> > > > > specifically
> > > > > > > > aimed
> > > > > > > > > > at the
> > > > > > > > > > > >> >> > scenario where the machine I/O is not fully
> loaded
> > > > but
> > > > > > the
> > > > > > > > I/O
> > > > > > > > > > latency has
> > > > > > > > > > > >> >> > indeed become a bottleneck for each subtask.
> While
> > > > the
> > > > > > > > > > distributed file
> > > > > > > > > > > >> >> > system is a prime example of a scenario
> > > characterized
> > > > > by
> > > > > > > > > > abundant and
> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with
> higher
> > > I/O
> > > > > > > > latency.
> > > > > > > > > > You may
> > > > > > > > > > > >> >> > expect to increase the parallelism of a job to
> > > > enhance
> > > > > > the
> > > > > > > > > > performance as
> > > > > > > > > > > >> >> > well, but that also brings in more waste of
> CPU's
> > > and
> > > > > > > memory
> > > > > > > > > for
> > > > > > > > > > building
> > > > > > > > > > > >> >> > up more subtasks. This is one drawback for the
> > > > > > > > > > computation-storage tightly
> > > > > > > > > > > >> >> > coupled nodes. While in our proposal, the
> parallel
> > > > I/O
> > > > > > with
> > > > > > > > all
> > > > > > > > > > the
> > > > > > > > > > > >> >> > callbacks still running in one task,
> pre-allocated
> > > > > > > > > computational
> > > > > > > > > > resources
> > > > > > > > > > > >> >> > are better utilized. It is a much more
> lightweight
> > > > way
> > > > > to
> > > > > > > > > > perform parallel
> > > > > > > > > > > >> >> > I/O.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Just with what granularity those async requests
> > > > should
> > > > > be
> > > > > > > > made.
> > > > > > > > > > > >> >> > > Making state access asynchronous is
> definitely
> > > the
> > > > > > right
> > > > > > > > way
> > > > > > > > > > to go!
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > I think the current proposal is based on such
> core
> > > > > ideas:
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >    - A pure cloud-native disaggregated state.
> > > > > > > > > > > >> >> >    - Fully utilize the given resources and try
> not
> > > to
> > > > > > waste
> > > > > > > > > them
> > > > > > > > > > (including
> > > > > > > > > > > >> >> >    I/O).
> > > > > > > > > > > >> >> >    - The ability to scale isolated resources
> (I/O
> > > or
> > > > > CPU
> > > > > > or
> > > > > > > > > > memory)
> > > > > > > > > > > >> >> >    independently.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > We think a fine-grained granularity is more
> inline
> > > > with
> > > > > > > those
> > > > > > > > > > ideas,
> > > > > > > > > > > >> >> > especially without local disk assumptions and
> > > without
> > > > > any
> > > > > > > > waste
> > > > > > > > > > of I/O by
> > > > > > > > > > > >> >> > prefetching. Please note that it is not a
> > > replacement
> > > > > of
> > > > > > > the
> > > > > > > > > > original local
> > > > > > > > > > > >> >> > state with synchronous execution. Instead this
> is a
> > > > > > > solution
> > > > > > > > > > embracing the
> > > > > > > > > > > >> >> > cloud-native era, providing much more
> scalability
> > > and
> > > > > > > > resource
> > > > > > > > > > efficiency
> > > > > > > > > > > >> >> > when handling a *huge state*.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > What also worries me a lot in this fine grained
> > > model
> > > > > is
> > > > > > > the
> > > > > > > > > > effect on the
> > > > > > > > > > > >> >> > > checkpointing times.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster
> > > > checkpointing
> > > > > > is
> > > > > > > > > > always a core
> > > > > > > > > > > >> >> > advantage of disaggregated state, but only for
> the
> > > > > async
> > > > > > > > phase.
> > > > > > > > > > There will
> > > > > > > > > > > >> >> > be some complexity introduced by in-flight
> > > requests,
> > > > > but
> > > > > > > I'd
> > > > > > > > > > suggest a
> > > > > > > > > > > >> >> > checkpoint containing those in-flight state
> > > requests
> > > > as
> > > > > > > part
> > > > > > > > of
> > > > > > > > > > the state,
> > > > > > > > > > > >> >> > to accelerate the sync phase by skipping the
> buffer
> > > > > > > draining.
> > > > > > > > > > This makes
> > > > > > > > > > > >> >> > the buffer size have little impact on
> checkpoint
> > > > time.
> > > > > > And
> > > > > > > > all
> > > > > > > > > > the changes
> > > > > > > > > > > >> >> > keep within the execution model we proposed
> while
> > > the
> > > > > > > > > checkpoint
> > > > > > > > > > barrier
> > > > > > > > > > > >> >> > alignment or handling will not be touched in
> our
> > > > > > proposal,
> > > > > > > > so I
> > > > > > > > > > guess
> > > > > > > > > > > >> >> > the complexity is relatively controllable. I
> have
> > > > faith
> > > > > > in
> > > > > > > > that
> > > > > > > > > > :)
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Also regarding the overheads, it would be
> great if
> > > > you
> > > > > > > could
> > > > > > > > > > provide
> > > > > > > > > > > >> >> > > profiling results of the benchmarks that you
> > > > > conducted
> > > > > > to
> > > > > > > > > > verify the
> > > > > > > > > > > >> >> > > results. Or maybe if you could describe the
> steps
> > > > to
> > > > > > > > > reproduce
> > > > > > > > > > the
> > > > > > > > > > > >> >> > results?
> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with
> > > async
> > > > > > API".
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for
> the
> > > > > > comparison
> > > > > > > > of
> > > > > > > > > > "Hashmap
> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we
> conduct a
> > > > > > Wordcount
> > > > > > > > job
> > > > > > > > > > written
> > > > > > > > > > > >> >> > with async APIs but disabling the async
> execution
> > > by
> > > > > > > directly
> > > > > > > > > > completing
> > > > > > > > > > > >> >> > the future using sync state access. This
> evaluates
> > > > the
> > > > > > > > overhead
> > > > > > > > > > of newly
> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync execution
> > > (even
> > > > > > > though
> > > > > > > > > > they are not
> > > > > > > > > > > >> >> > designed for it). The code will be provided
> later.
> > > > For
> > > > > > > other
> > > > > > > > > > results of our
> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions
> here[2] to
> > > > > > > reproduce.
> > > > > > > > > > Since the
> > > > > > > > > > > >> >> > compilation may take some effort, we will
> directly
> > > > > > provide
> > > > > > > > the
> > > > > > > > > > jar for
> > > > > > > > > > > >> >> > testing next week.
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail
> but it
> > > > is a
> > > > > > bit
> > > > > > > > > late
> > > > > > > > > > in my
> > > > > > > > > > > >> >> > local time and the next few days are weekends.
> So I
> > > > > will
> > > > > > > > reply
> > > > > > > > > > to you
> > > > > > > > > > > >> >> > later. Thanks for your response!
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > [1]
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > > > > > > > > > > >> >> > [2]
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > Best,
> > > > > > > > > > > >> >> > Zakelly
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> > > > > > > > > > flink.zhouyunf...@gmail.com>
> > > > > > > > > > > >> >> > wrote:
> > > > > > > > > > > >> >> >
> > > > > > > > > > > >> >> > > Hi,
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > Thanks for proposing this design! I just read
> > > > > FLIP-424
> > > > > > > and
> > > > > > > > > > FLIP-425
> > > > > > > > > > > >> >> > > and have some questions about the proposed
> > > changes.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > For Async API (FLIP-424)
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on
> > > > StateIterator?
> > > > > > This
> > > > > > > > > > method seems
> > > > > > > > > > > >> >> > > unused in the usage example codes.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it
> is
> > > > > stated
> > > > > > > that
> > > > > > > > > > "No null
> > > > > > > > > > > >> >> > > entries are allowed". It might be better to
> > > further
> > > > > > > explain
> > > > > > > > > > what will
> > > > > > > > > > > >> >> > > happen if a null value is passed, ignoring
> the
> > > > value
> > > > > in
> > > > > > > the
> > > > > > > > > > returned
> > > > > > > > > > > >> >> > > Collection or throwing exceptions. Given that
> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned in
> the
> > > > > > example
> > > > > > > > > code,
> > > > > > > > > > I
> > > > > > > > > > > >> >> > > suppose the former one might be correct.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425)
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a
> recordB
> > > > has
> > > > > > its
> > > > > > > > key
> > > > > > > > > > collide
> > > > > > > > > > > >> >> > > with an ongoing recordA, its processElement()
> > > > method
> > > > > > can
> > > > > > > > > still
> > > > > > > > > > be
> > > > > > > > > > > >> >> > > triggered immediately, and then it might be
> moved
> > > > to
> > > > > > the
> > > > > > > > > > blocking
> > > > > > > > > > > >> >> > > buffer in AEC if it involves state
> operations.
> > > This
> > > > > > means
> > > > > > > > > that
> > > > > > > > > > > >> >> > > recordB's output will precede recordA's
> output in
> > > > > > > > downstream
> > > > > > > > > > > >> >> > > operators, if recordA involves state
> operations
> > > > while
> > > > > > > > recordB
> > > > > > > > > > does
> > > > > > > > > > > >> >> > > not. This will harm the correctness of Flink
> jobs
> > > > in
> > > > > > some
> > > > > > > > use
> > > > > > > > > > cases.
> > > > > > > > > > > >> >> > > For example, in dim table join cases, recordA
> > > could
> > > > > be
> > > > > > a
> > > > > > > > > delete
> > > > > > > > > > > >> >> > > operation that involves state access, while
> > > recordB
> > > > > > could
> > > > > > > > be
> > > > > > > > > > an insert
> > > > > > > > > > > >> >> > > operation that needs to visit external
> storage
> > > > > without
> > > > > > > > state
> > > > > > > > > > access.
> > > > > > > > > > > >> >> > > If recordB's output precedes recordA's, then
> an
> > > > entry
> > > > > > > that
> > > > > > > > is
> > > > > > > > > > supposed
> > > > > > > > > > > >> >> > > to finally exist with recordB's value in the
> sink
> > > > > table
> > > > > > > > might
> > > > > > > > > > actually
> > > > > > > > > > > >> >> > > be deleted according to recordA's command.
> This
> > > > > > situation
> > > > > > > > > > should be
> > > > > > > > > > > >> >> > > avoided and the order of same-key records
> should
> > > be
> > > > > > > > strictly
> > > > > > > > > > > >> >> > > preserved.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests
> submitted by
> > > > > > > Callbacks
> > > > > > > > > > will not
> > > > > > > > > > > >> >> > > invoke further yield() methods. Given that
> > > yield()
> > > > is
> > > > > > > used
> > > > > > > > > > when there
> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean
> > > > > > StateRequests
> > > > > > > > > > submitted by
> > > > > > > > > > > >> >> > > Callbacks will never be "too much"? What if
> the
> > > > total
> > > > > > > > number
> > > > > > > > > of
> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink
> > > > operator's
> > > > > > > > memory
> > > > > > > > > > space?
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP
> provided
> > > > an
> > > > > > > > > > out-of-order
> > > > > > > > > > > >> >> > > execution mode apart from the default
> > > > > strictly-ordered
> > > > > > > > mode,
> > > > > > > > > > which can
> > > > > > > > > > > >> >> > > optimize performance by allowing more
> concurrent
> > > > > > > > executions.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order
> execution
> > > > > mode,
> > > > > > > > along
> > > > > > > > > > with the
> > > > > > > > > > > >> >> > > epoch mechanism, would bring more complexity
> to
> > > the
> > > > > > > > execution
> > > > > > > > > > model
> > > > > > > > > > > >> >> > > than the performance improvement it promises.
> > > Could
> > > > > we
> > > > > > > add
> > > > > > > > > some
> > > > > > > > > > > >> >> > > benchmark results proving the benefit of this
> > > mode?
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API
> > > section
> > > > > > > > > describing
> > > > > > > > > > how
> > > > > > > > > > > >> >> > > users or developers can switch between these
> two
> > > > > > > execution
> > > > > > > > > > modes.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint
> > > > mentioned
> > > > > > in
> > > > > > > > this
> > > > > > > > > > FLIP,
> > > > > > > > > > > >> >> > > there are also more other events that might
> > > appear
> > > > in
> > > > > > the
> > > > > > > > > > stream of
> > > > > > > > > > > >> >> > > data records. It might be better to
> generalize
> > > the
> > > > > > > > execution
> > > > > > > > > > mode
> > > > > > > > > > > >> >> > > mechanism to handle all possible events.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 4. It might be better to treat
> callback-handling
> > > > as a
> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to
> avoid
> > > > the
> > > > > > > > overhead
> > > > > > > > > > of
> > > > > > > > > > > >> >> > > repeatedly creating Mail objects.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current
> default
> > > > values
> > > > > > for
> > > > > > > > > > things like
> > > > > > > > > > > >> >> > > active buffer size thresholds and timeouts?
> These
> > > > > could
> > > > > > > > help
> > > > > > > > > > with
> > > > > > > > > > > >> >> > > memory consumption and latency analysis.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of a
> > > > record
> > > > > in
> > > > > > > its
> > > > > > > > > > > >> >> > > RecordContext? It seems not used.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap
> or
> > > > > > RocksDB",
> > > > > > > > the
> > > > > > > > > > link
> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It might
> be
> > > > > better
> > > > > > to
> > > > > > > > > > verify the
> > > > > > > > > > > >> >> > > referenced content is still valid in the
> latest
> > > > Flink
> > > > > > and
> > > > > > > > > > update the
> > > > > > > > > > > >> >> > > link accordingly. Same for other references
> if
> > > any.
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > Best,
> > > > > > > > > > > >> >> > > Yunfeng Zhou
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> > > > > > > > > > yuanmei.w...@gmail.com> wrote:
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > Hi Devs,
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly
> Lan,
> > > > > > Jinzhong
> > > > > > > > Li,
> > > > > > > > > > Hangxiang
> > > > > > > > > > > >> >> > Yu,
> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to
> start a
> > > > > > > discussion
> > > > > > > > > > about
> > > > > > > > > > > >> >> > > introducing
> > > > > > > > > > > >> >> > > > Disaggregated State Storage and Management
> in
> > > > Flink
> > > > > > > 2.0.
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic
> shift
> > > in
> > > > > > > Flink's
> > > > > > > > > > deployment
> > > > > > > > > > > >> >> > > mode,
> > > > > > > > > > > >> >> > > > workload patterns, and hardware
> improvements.
> > > > We've
> > > > > > > moved
> > > > > > > > > > from the
> > > > > > > > > > > >> >> > > > map-reduce era where workers are
> > > > > computation-storage
> > > > > > > > > tightly
> > > > > > > > > > coupled
> > > > > > > > > > > >> >> > > nodes
> > > > > > > > > > > >> >> > > > to a cloud-native world where containerized
> > > > > > deployments
> > > > > > > > on
> > > > > > > > > > Kubernetes
> > > > > > > > > > > >> >> > > > become standard. To enable Flink's
> Cloud-Native
> > > > > > future,
> > > > > > > > we
> > > > > > > > > > introduce
> > > > > > > > > > > >> >> > > > Disaggregated State Storage and Management
> that
> > > > > uses
> > > > > > > DFS
> > > > > > > > as
> > > > > > > > > > primary
> > > > > > > > > > > >> >> > > storage
> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0
> > > > Roadmap.
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > Design Details can be found in FLIP-423[1].
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > This new architecture is aimed to solve the
> > > > > following
> > > > > > > > > > challenges
> > > > > > > > > > > >> >> > brought
> > > > > > > > > > > >> >> > > in
> > > > > > > > > > > >> >> > > > the cloud-native era for Flink.
> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in
> containerization
> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by
> compaction in
> > > > the
> > > > > > > > current
> > > > > > > > > > state model
> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large
> states
> > > > > > (hundreds
> > > > > > > of
> > > > > > > > > > Terabytes)
> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native
> way
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > More specifically, we want to reach a
> consensus
> > > > on
> > > > > > the
> > > > > > > > > > following issues
> > > > > > > > > > > >> >> > > in
> > > > > > > > > > > >> >> > > > this discussion:
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > 1. Overall design
> > > > > > > > > > > >> >> > > > 2. Proposed Changes
> > > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end
> baseline
> > > > > > version
> > > > > > > > > > using DFS as
> > > > > > > > > > > >> >> > > > primary storage and complete core
> > > > functionalities,
> > > > > > > > > including:
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]:
> > > > Introduce
> > > > > > new
> > > > > > > > APIs
> > > > > > > > > > for
> > > > > > > > > > > >> >> > > > asynchronous state access.
> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model
> (FLIP-425)[3]:
> > > > > > > Implement a
> > > > > > > > > > non-blocking
> > > > > > > > > > > >> >> > > > execution model leveraging the asynchronous
> > > APIs
> > > > > > > > introduced
> > > > > > > > > > in
> > > > > > > > > > > >> >> > FLIP-424.
> > > > > > > > > > > >> >> > > > - Grouping Remote State Access
> (FLIP-426)[4]:
> > > > > Enable
> > > > > > > > > > retrieval of
> > > > > > > > > > > >> >> > remote
> > > > > > > > > > > >> >> > > > state data in batches to avoid unnecessary
> > > > > round-trip
> > > > > > > > costs
> > > > > > > > > > for remote
> > > > > > > > > > > >> >> > > > access
> > > > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]:
> > > > > Introduce
> > > > > > > the
> > > > > > > > > > initial
> > > > > > > > > > > >> >> > version
> > > > > > > > > > > >> >> > > of
> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store.
> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration
> > > > > (FLIP-428)[6]:
> > > > > > > > > > Integrate
> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the
> disaggregated
> > > > > state
> > > > > > > > store
> > > > > > > > > > for fault
> > > > > > > > > > > >> >> > > > tolerance and fast rescaling.
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate
> threads
> > > to
> > > > > make
> > > > > > > > sure
> > > > > > > > > > each FLIP
> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep
> the
> > > > > > discussion
> > > > > > > > > > within a
> > > > > > > > > > > >> >> > focused
> > > > > > > > > > > >> >> > > > thread (this thread) for easier tracking of
> > > > > contexts
> > > > > > to
> > > > > > > > > avoid
> > > > > > > > > > > >> >> > duplicated
> > > > > > > > > > > >> >> > > > questions/discussions and also to think of
> the
> > > > > > > > > > problem/solution in a
> > > > > > > > > > > >> >> > full
> > > > > > > > > > > >> >> > > > picture.
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > Looking forward to your feedback
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > Best,
> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei
> and
> > > > Feng
> > > > > > > > > > > >> >> > > >
> > > > > > > > > > > >> >> > > > [1]
> > > https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > > > > > > > >> >> > > > [2]
> > > https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > > > > > > > >> >> > > > [3]
> > > https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > > > > > > > >> >> > > > [4]
> > > https://cwiki.apache.org/confluence/x/TYp3EQ
> > > > > > > > > > > >> >> > > > [5]
> > > https://cwiki.apache.org/confluence/x/T4p3EQ
> > > > > > > > > > > >> >> > > > [6]
> > > https://cwiki.apache.org/confluence/x/UYp3EQ
> > > > > > > > > > > >> >> > >
> > > > > > > > > > > >> >> >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to