Hi, Xuyang

Thank you for initiating this FLIP. I believe this is a significant feature
for the future of Flink SQL, but I also share concerns about the
maintenance costs related to the correctness, state compatibility, and
performance of the two implementations.
I fully support ensuring state compatibility, functional correctness, and
performance regression detection through HarnessTests and IT Tests. 1. I
think relevant testing is a critical part. Is there a more detailed design
plan for the HarnessTests and performance regression test? 2. Regarding the
final performance comparison between thenCombine and combineAll, is there a
more specific conclusion? Which one should we use, or are both options
viable?

Best,
Feng.

On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote:

> Hi, David. Thank you for your review. Let me address your questions:
>
> >1. Is there a way to enforce this as an invariant during build time,
> >perhaps through a generic test framework that switches between the sync
> and
> >async versions of all operators and verifies checkpoint compatibility?
>
> Yes, we need to incorporate a corresponding testing framework to verify
> the state compatibility during
>
> transitions between sync and async state operators. In my proposal, it
> resembles the existing RestoreTestBase,
>
> with the testing logic structured as follows: a. Start with the sync state
> operator to consume data; b. Execute a
>
> checkpoint; c. Restart with the async state operator and recover data from
> the checkpoint for re-consumption;
>
> d. Validate the correctness of the results. Additionally, we could also
> consider scenarios where the async state
>
> operator starts consuming data initially, followed by the restart with the
> sync state operator. I have updated this part
>
> to the section `TEST PLAN` in flip.
>
> >2. If the only difference between them is state handling, could they
> >potentially be implemented as the same operator with two different
> >interfaces? My main concern is code reuse—it’s crucial to avoid
> duplicating
> >code to ensure both implementations stay aligned. Additionally, could
> >feature parity be verified at the test suite level (similar to the first
> >question)? Perhaps we could create a single parameterized test suite that
> >runs against both versions?
>
> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the
> sync and async state operators,
>
> leveraging shared logical calculations while abstracting the state access
> details.” Due to the intricate details
>
> of the code implementation, this was not elaborated in the flip. I share
> your vision of designing reusable business
>
> logic classes (such as JoinHelper) alongside different operator interfaces
> (SyncStateJoinOperator and
>
> AsyncStateJoinOperator), consolidating the reusable logic within the class
> JoinHelper.
>
> For synchronous operators, we already have harness tests to validate data
> correctness, and there will also be
>
> dedicated harness tests for asynchronous operators. Using a parameterized
> test suite for both harnesses is indeed feasible.
>
>
>
>
> --
>
>     Best!
>     Xuyang
>
>
>
>
>
> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道:
> >Hi Xuyang,
> >
> >Thank you for looking into this—great work! The overall direction seems
> >solid. I have two minor questions:
> >
> >In theory, the implementation of AsyncStateOperator and SyncStateOperator
> >> differs only in their state handling. Their state schemas, business
> logic,
> >> and other aspects remain the same. Therefore, within the same Flink
> >> version, when the SQL and other Flink configurations remain unchanged,
> or
> >> when using the same compiled plan, users can freely switch between
> >> AsyncStateOperator and SyncStateOperator by toggling the configuration
> >> table.exec.async-state.enabled, as they are fully compatible.
> >
> >
> >1. Is there a way to enforce this as an invariant during build time,
> >perhaps through a generic test framework that switches between the sync
> and
> >async versions of all operators and verifies checkpoint compatibility?
> >
> >2. If the only difference between them is state handling, could they
> >potentially be implemented as the same operator with two different
> >interfaces? My main concern is code reuse—it’s crucial to avoid
> duplicating
> >code to ensure both implementations stay aligned. Additionally, could
> >feature parity be verified at the test suite level (similar to the first
> >question)? Perhaps we could create a single parameterized test suite that
> >runs against both versions?
> >
> >Best,
> >D.
> >
> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote:
> >
> >> Hi, everyone.
> >>
> >> I have updated the FLIP. Here are the newly added sections:
> >>
> >>
> >>  In theory, the implementation of AsyncStateOperator and
> SyncStateOperator
> >> differs only in their state handling.
> >>
> >> Their state schemas, business logic, and others are the same. Therefore,
> >> within the same Flink version, when the
> >>
> >> SQL and other Flink configurations remain unchanged, or when using the
> >> same compiled plan, users can freely
> >>
> >> switch between AsyncStateOperator and SyncStateOperator by toggling the
> >> configuration
> >>
> >> table.exec.async-state.enabled, as they are fully compatible. One known
> >> exception is that after the
> >>
> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2],
> >> switching back and forth between
> >>
> >> aligned and unaligned checkpoints is not compatible, which is an
> expected
> >> behavior.
> >>
> >>
> >>
> >> --
> >>     Best!
> >>     Xuyang
> >>
> >>
> >>
> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道:
> >> >Hi, Zakelly.
> >> >
> >> >You’re right. This is another advantage of choosing the where two sets
> of operators share the same execution node. In this approach, users can
> utilize the compiled plan and adjust the config
> 'table.exec.async-state.enabled', providing them with the flexibility to
> switch between asynchronous and synchronous state operators.
> >> >
> >> >
> >> >
> >> >
> >> >--
> >> >
> >> >    Best!
> >> >    Xuyang
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote:
> >> >>Hi Xuyang,
> >> >>
> >> >>Thanks for driving this! I'm happy to see operator implementation on
> top of
> >> >>the async state. Overall it looks good, I have one minor question:
> >> >>
> >> >>It seems the only difference between the new operator implementation
> and
> >> >>the original one is the way of state accessing. Does this mean the
> state is
> >> >>totally compatible when toggling the `table.exec.async-state.enabled`
> from
> >> >>operator's point of view? And by this way users can freely migrate
> their
> >> >>job to a newer minor version and leverage the newly implemented async
> >> >>operator, without state loss, right?
> >> >>
> >> >>
> >> >>Best,
> >> >>Zakelly
> >> >>
> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote:
> >> >>
> >> >>> Hi devs,
> >> >>>
> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL
> >> >>> Operators Based on Asynchronous State APIs[1].
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to
> mitigate
> >> >>> I/O bottlenecks in Flink by offloading state
> >> >>>
> >> >>> access to separate threads from the main thread in each task,
> thereby
> >> >>> optimizing I/O and computational resource utilization.
> >> >>>
> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing
> new
> >> >>> SQL operators based on these asynchronous state APIs.
> >> >>>
> >> >>> Our goal is to enhance throughput and reduce latency in stateful
> >> >>> operations by leveraging the new asynchronous state architecture
> >> >>>
> >> >>> in Flink SQL jobs.
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any
> feedback
> >> >>> and suggestions for improvement.
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> There has been POC code[3] related to the Join operator.
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> Best!
> >> >>> Xuyang
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> [1]
> >> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs
> >> >>>
> >> >>> [2]
> >> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs
> >> >>>
> >> >>> [3]
> >> >>>
> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >>
> >>
>

Reply via email to