Hi, Xuyang

Thanks for your proposal, Look good to me overall, +1

Best,
Ron


> -----原始邮件-----
> 发件人: Xuyang <xyzhong...@163.com>
> 发送时间:2024-08-14 15:20:59 (星期三)
> 收件人: dev@flink.apache.org
> 主题: Re:Re: Re: Re:Re: [DISCUSS] FLIP-473: Introduce New SQL Operators Based 
> on Asynchronous State APIs
> 
> Hi, Feng. Thank you for your feedback. 
> 
> I completely agree that these tests are crucial. I have updated the Roadmap 
> section of
> 
> the FLIP to include plans for harness tests and IT tests for data 
> correctness, state 
> 
> compatibility tests, and performance regression tests.
> 
> Regarding the usage of the APIs `thenCombine` and `combineAll`, I have also 
> added a 
> 
> concluding note that there is no significant performance difference between 
> these two, 
> 
> so either one can be used interchangeably.
> 
> 
> 
> 
> --
> 
>     Best!
>     Xuyang
> 
> 
> 
> 
> 
> At 2024-08-13 17:10:32, "Feng Jin" <jinfeng1...@gmail.com> wrote:
> >Hi, Xuyang
> >
> >Thank you for initiating this FLIP. I believe this is a significant feature
> >for the future of Flink SQL, but I also share concerns about the
> >maintenance costs related to the correctness, state compatibility, and
> >performance of the two implementations.
> >I fully support ensuring state compatibility, functional correctness, and
> >performance regression detection through HarnessTests and IT Tests. 1. I
> >think relevant testing is a critical part. Is there a more detailed design
> >plan for the HarnessTests and performance regression test? 2. Regarding the
> >final performance comparison between thenCombine and combineAll, is there a
> >more specific conclusion? Which one should we use, or are both options
> >viable?
> >
> >Best,
> >Feng.
> >
> >On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote:
> >
> >> Hi, David. Thank you for your review. Let me address your questions:
> >>
> >> >1. Is there a way to enforce this as an invariant during build time,
> >> >perhaps through a generic test framework that switches between the sync
> >> and
> >> >async versions of all operators and verifies checkpoint compatibility?
> >>
> >> Yes, we need to incorporate a corresponding testing framework to verify
> >> the state compatibility during
> >>
> >> transitions between sync and async state operators. In my proposal, it
> >> resembles the existing RestoreTestBase,
> >>
> >> with the testing logic structured as follows: a. Start with the sync state
> >> operator to consume data; b. Execute a
> >>
> >> checkpoint; c. Restart with the async state operator and recover data from
> >> the checkpoint for re-consumption;
> >>
> >> d. Validate the correctness of the results. Additionally, we could also
> >> consider scenarios where the async state
> >>
> >> operator starts consuming data initially, followed by the restart with the
> >> sync state operator. I have updated this part
> >>
> >> to the section `TEST PLAN` in flip.
> >>
> >> >2. If the only difference between them is state handling, could they
> >> >potentially be implemented as the same operator with two different
> >> >interfaces? My main concern is code reuse—it’s crucial to avoid
> >> duplicating
> >> >code to ensure both implementations stay aligned. Additionally, could
> >> >feature parity be verified at the test suite level (similar to the first
> >> >question)? Perhaps we could create a single parameterized test suite that
> >> >runs against both versions?
> >>
> >> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the
> >> sync and async state operators,
> >>
> >> leveraging shared logical calculations while abstracting the state access
> >> details.” Due to the intricate details
> >>
> >> of the code implementation, this was not elaborated in the flip. I share
> >> your vision of designing reusable business
> >>
> >> logic classes (such as JoinHelper) alongside different operator interfaces
> >> (SyncStateJoinOperator and
> >>
> >> AsyncStateJoinOperator), consolidating the reusable logic within the class
> >> JoinHelper.
> >>
> >> For synchronous operators, we already have harness tests to validate data
> >> correctness, and there will also be
> >>
> >> dedicated harness tests for asynchronous operators. Using a parameterized
> >> test suite for both harnesses is indeed feasible.
> >>
> >>
> >>
> >>
> >> --
> >>
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >>
> >>
> >> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道:
> >> >Hi Xuyang,
> >> >
> >> >Thank you for looking into this—great work! The overall direction seems
> >> >solid. I have two minor questions:
> >> >
> >> >In theory, the implementation of AsyncStateOperator and SyncStateOperator
> >> >> differs only in their state handling. Their state schemas, business
> >> logic,
> >> >> and other aspects remain the same. Therefore, within the same Flink
> >> >> version, when the SQL and other Flink configurations remain unchanged,
> >> or
> >> >> when using the same compiled plan, users can freely switch between
> >> >> AsyncStateOperator and SyncStateOperator by toggling the configuration
> >> >> table.exec.async-state.enabled, as they are fully compatible.
> >> >
> >> >
> >> >1. Is there a way to enforce this as an invariant during build time,
> >> >perhaps through a generic test framework that switches between the sync
> >> and
> >> >async versions of all operators and verifies checkpoint compatibility?
> >> >
> >> >2. If the only difference between them is state handling, could they
> >> >potentially be implemented as the same operator with two different
> >> >interfaces? My main concern is code reuse—it’s crucial to avoid
> >> duplicating
> >> >code to ensure both implementations stay aligned. Additionally, could
> >> >feature parity be verified at the test suite level (similar to the first
> >> >question)? Perhaps we could create a single parameterized test suite that
> >> >runs against both versions?
> >> >
> >> >Best,
> >> >D.
> >> >
> >> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote:
> >> >
> >> >> Hi, everyone.
> >> >>
> >> >> I have updated the FLIP. Here are the newly added sections:
> >> >>
> >> >>
> >> >>  In theory, the implementation of AsyncStateOperator and
> >> SyncStateOperator
> >> >> differs only in their state handling.
> >> >>
> >> >> Their state schemas, business logic, and others are the same. Therefore,
> >> >> within the same Flink version, when the
> >> >>
> >> >> SQL and other Flink configurations remain unchanged, or when using the
> >> >> same compiled plan, users can freely
> >> >>
> >> >> switch between AsyncStateOperator and SyncStateOperator by toggling the
> >> >> configuration
> >> >>
> >> >> table.exec.async-state.enabled, as they are fully compatible. One known
> >> >> exception is that after the
> >> >>
> >> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2],
> >> >> switching back and forth between
> >> >>
> >> >> aligned and unaligned checkpoints is not compatible, which is an
> >> expected
> >> >> behavior.
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >>     Best!
> >> >>     Xuyang
> >> >>
> >> >>
> >> >>
> >> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道:
> >> >> >Hi, Zakelly.
> >> >> >
> >> >> >You’re right. This is another advantage of choosing the where two sets
> >> of operators share the same execution node. In this approach, users can
> >> utilize the compiled plan and adjust the config
> >> 'table.exec.async-state.enabled', providing them with the flexibility to
> >> switch between asynchronous and synchronous state operators.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >--
> >> >> >
> >> >> >    Best!
> >> >> >    Xuyang
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote:
> >> >> >>Hi Xuyang,
> >> >> >>
> >> >> >>Thanks for driving this! I'm happy to see operator implementation on
> >> top of
> >> >> >>the async state. Overall it looks good, I have one minor question:
> >> >> >>
> >> >> >>It seems the only difference between the new operator implementation
> >> and
> >> >> >>the original one is the way of state accessing. Does this mean the
> >> state is
> >> >> >>totally compatible when toggling the `table.exec.async-state.enabled`
> >> from
> >> >> >>operator's point of view? And by this way users can freely migrate
> >> their
> >> >> >>job to a newer minor version and leverage the newly implemented async
> >> >> >>operator, without state loss, right?
> >> >> >>
> >> >> >>
> >> >> >>Best,
> >> >> >>Zakelly
> >> >> >>
> >> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote:
> >> >> >>
> >> >> >>> Hi devs,
> >> >> >>>
> >> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL
> >> >> >>> Operators Based on Asynchronous State APIs[1].
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to
> >> mitigate
> >> >> >>> I/O bottlenecks in Flink by offloading state
> >> >> >>>
> >> >> >>> access to separate threads from the main thread in each task,
> >> thereby
> >> >> >>> optimizing I/O and computational resource utilization.
> >> >> >>>
> >> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing
> >> new
> >> >> >>> SQL operators based on these asynchronous state APIs.
> >> >> >>>
> >> >> >>> Our goal is to enhance throughput and reduce latency in stateful
> >> >> >>> operations by leveraging the new asynchronous state architecture
> >> >> >>>
> >> >> >>> in Flink SQL jobs.
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any
> >> feedback
> >> >> >>> and suggestions for improvement.
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> There has been POC code[3] related to the Join operator.
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> Best!
> >> >> >>> Xuyang
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> [1]
> >> >> >>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs
> >> >> >>>
> >> >> >>> [2]
> >> >> >>>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs
> >> >> >>>
> >> >> >>> [3]
> >> >> >>>
> >> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >>
> >> >>
> >>


------------------------------
Best,
Ron

Reply via email to