Hi everyone.

Thank you for all the insights and suggestions from those who participated in 
the discussion. 
If there are no further comments, I will initiate the voting on August 21, 2024.




--

    Best!
    Xuyang





在 2024-08-19 10:26:01,"ron" <ld...@zju.edu.cn> 写道:
>Hi, Xuyang
>
>Thanks for your proposal, Look good to me overall, +1
>
>Best,
>Ron
>
>
>> -----原始邮件-----
>> 发件人: Xuyang <xyzhong...@163.com>
>> 发送时间:2024-08-14 15:20:59 (星期三)
>> 收件人: dev@flink.apache.org
>> 主题: Re:Re: Re: Re:Re: [DISCUSS] FLIP-473: Introduce New SQL Operators Based 
>> on Asynchronous State APIs
>> 
>> Hi, Feng. Thank you for your feedback. 
>> 
>> I completely agree that these tests are crucial. I have updated the Roadmap 
>> section of
>> 
>> the FLIP to include plans for harness tests and IT tests for data 
>> correctness, state 
>> 
>> compatibility tests, and performance regression tests.
>> 
>> Regarding the usage of the APIs `thenCombine` and `combineAll`, I have also 
>> added a 
>> 
>> concluding note that there is no significant performance difference between 
>> these two, 
>> 
>> so either one can be used interchangeably.
>> 
>> 
>> 
>> 
>> --
>> 
>>     Best!
>>     Xuyang
>> 
>> 
>> 
>> 
>> 
>> At 2024-08-13 17:10:32, "Feng Jin" <jinfeng1...@gmail.com> wrote:
>> >Hi, Xuyang
>> >
>> >Thank you for initiating this FLIP. I believe this is a significant feature
>> >for the future of Flink SQL, but I also share concerns about the
>> >maintenance costs related to the correctness, state compatibility, and
>> >performance of the two implementations.
>> >I fully support ensuring state compatibility, functional correctness, and
>> >performance regression detection through HarnessTests and IT Tests. 1. I
>> >think relevant testing is a critical part. Is there a more detailed design
>> >plan for the HarnessTests and performance regression test? 2. Regarding the
>> >final performance comparison between thenCombine and combineAll, is there a
>> >more specific conclusion? Which one should we use, or are both options
>> >viable?
>> >
>> >Best,
>> >Feng.
>> >
>> >On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote:
>> >
>> >> Hi, David. Thank you for your review. Let me address your questions:
>> >>
>> >> >1. Is there a way to enforce this as an invariant during build time,
>> >> >perhaps through a generic test framework that switches between the sync
>> >> and
>> >> >async versions of all operators and verifies checkpoint compatibility?
>> >>
>> >> Yes, we need to incorporate a corresponding testing framework to verify
>> >> the state compatibility during
>> >>
>> >> transitions between sync and async state operators. In my proposal, it
>> >> resembles the existing RestoreTestBase,
>> >>
>> >> with the testing logic structured as follows: a. Start with the sync state
>> >> operator to consume data; b. Execute a
>> >>
>> >> checkpoint; c. Restart with the async state operator and recover data from
>> >> the checkpoint for re-consumption;
>> >>
>> >> d. Validate the correctness of the results. Additionally, we could also
>> >> consider scenarios where the async state
>> >>
>> >> operator starts consuming data initially, followed by the restart with the
>> >> sync state operator. I have updated this part
>> >>
>> >> to the section `TEST PLAN` in flip.
>> >>
>> >> >2. If the only difference between them is state handling, could they
>> >> >potentially be implemented as the same operator with two different
>> >> >interfaces? My main concern is code reuse—it’s crucial to avoid
>> >> duplicating
>> >> >code to ensure both implementations stay aligned. Additionally, could
>> >> >feature parity be verified at the test suite level (similar to the first
>> >> >question)? Perhaps we could create a single parameterized test suite that
>> >> >runs against both versions?
>> >>
>> >> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the
>> >> sync and async state operators,
>> >>
>> >> leveraging shared logical calculations while abstracting the state access
>> >> details.” Due to the intricate details
>> >>
>> >> of the code implementation, this was not elaborated in the flip. I share
>> >> your vision of designing reusable business
>> >>
>> >> logic classes (such as JoinHelper) alongside different operator interfaces
>> >> (SyncStateJoinOperator and
>> >>
>> >> AsyncStateJoinOperator), consolidating the reusable logic within the class
>> >> JoinHelper.
>> >>
>> >> For synchronous operators, we already have harness tests to validate data
>> >> correctness, and there will also be
>> >>
>> >> dedicated harness tests for asynchronous operators. Using a parameterized
>> >> test suite for both harnesses is indeed feasible.
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >>     Best!
>> >>     Xuyang
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道:
>> >> >Hi Xuyang,
>> >> >
>> >> >Thank you for looking into this—great work! The overall direction seems
>> >> >solid. I have two minor questions:
>> >> >
>> >> >In theory, the implementation of AsyncStateOperator and SyncStateOperator
>> >> >> differs only in their state handling. Their state schemas, business
>> >> logic,
>> >> >> and other aspects remain the same. Therefore, within the same Flink
>> >> >> version, when the SQL and other Flink configurations remain unchanged,
>> >> or
>> >> >> when using the same compiled plan, users can freely switch between
>> >> >> AsyncStateOperator and SyncStateOperator by toggling the configuration
>> >> >> table.exec.async-state.enabled, as they are fully compatible.
>> >> >
>> >> >
>> >> >1. Is there a way to enforce this as an invariant during build time,
>> >> >perhaps through a generic test framework that switches between the sync
>> >> and
>> >> >async versions of all operators and verifies checkpoint compatibility?
>> >> >
>> >> >2. If the only difference between them is state handling, could they
>> >> >potentially be implemented as the same operator with two different
>> >> >interfaces? My main concern is code reuse—it’s crucial to avoid
>> >> duplicating
>> >> >code to ensure both implementations stay aligned. Additionally, could
>> >> >feature parity be verified at the test suite level (similar to the first
>> >> >question)? Perhaps we could create a single parameterized test suite that
>> >> >runs against both versions?
>> >> >
>> >> >Best,
>> >> >D.
>> >> >
>> >> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote:
>> >> >
>> >> >> Hi, everyone.
>> >> >>
>> >> >> I have updated the FLIP. Here are the newly added sections:
>> >> >>
>> >> >>
>> >> >>  In theory, the implementation of AsyncStateOperator and
>> >> SyncStateOperator
>> >> >> differs only in their state handling.
>> >> >>
>> >> >> Their state schemas, business logic, and others are the same. 
>> >> >> Therefore,
>> >> >> within the same Flink version, when the
>> >> >>
>> >> >> SQL and other Flink configurations remain unchanged, or when using the
>> >> >> same compiled plan, users can freely
>> >> >>
>> >> >> switch between AsyncStateOperator and SyncStateOperator by toggling the
>> >> >> configuration
>> >> >>
>> >> >> table.exec.async-state.enabled, as they are fully compatible. One known
>> >> >> exception is that after the
>> >> >>
>> >> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2],
>> >> >> switching back and forth between
>> >> >>
>> >> >> aligned and unaligned checkpoints is not compatible, which is an
>> >> expected
>> >> >> behavior.
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >>     Best!
>> >> >>     Xuyang
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道:
>> >> >> >Hi, Zakelly.
>> >> >> >
>> >> >> >You’re right. This is another advantage of choosing the where two sets
>> >> of operators share the same execution node. In this approach, users can
>> >> utilize the compiled plan and adjust the config
>> >> 'table.exec.async-state.enabled', providing them with the flexibility to
>> >> switch between asynchronous and synchronous state operators.
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >--
>> >> >> >
>> >> >> >    Best!
>> >> >> >    Xuyang
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote:
>> >> >> >>Hi Xuyang,
>> >> >> >>
>> >> >> >>Thanks for driving this! I'm happy to see operator implementation on
>> >> top of
>> >> >> >>the async state. Overall it looks good, I have one minor question:
>> >> >> >>
>> >> >> >>It seems the only difference between the new operator implementation
>> >> and
>> >> >> >>the original one is the way of state accessing. Does this mean the
>> >> state is
>> >> >> >>totally compatible when toggling the `table.exec.async-state.enabled`
>> >> from
>> >> >> >>operator's point of view? And by this way users can freely migrate
>> >> their
>> >> >> >>job to a newer minor version and leverage the newly implemented async
>> >> >> >>operator, without state loss, right?
>> >> >> >>
>> >> >> >>
>> >> >> >>Best,
>> >> >> >>Zakelly
>> >> >> >>
>> >> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote:
>> >> >> >>
>> >> >> >>> Hi devs,
>> >> >> >>>
>> >> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL
>> >> >> >>> Operators Based on Asynchronous State APIs[1].
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to
>> >> mitigate
>> >> >> >>> I/O bottlenecks in Flink by offloading state
>> >> >> >>>
>> >> >> >>> access to separate threads from the main thread in each task,
>> >> thereby
>> >> >> >>> optimizing I/O and computational resource utilization.
>> >> >> >>>
>> >> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing
>> >> new
>> >> >> >>> SQL operators based on these asynchronous state APIs.
>> >> >> >>>
>> >> >> >>> Our goal is to enhance throughput and reduce latency in stateful
>> >> >> >>> operations by leveraging the new asynchronous state architecture
>> >> >> >>>
>> >> >> >>> in Flink SQL jobs.
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any
>> >> feedback
>> >> >> >>> and suggestions for improvement.
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> There has been POC code[3] related to the Join operator.
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Best!
>> >> >> >>> Xuyang
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> [1]
>> >> >> >>>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs
>> >> >> >>>
>> >> >> >>> [2]
>> >> >> >>>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs
>> >> >> >>>
>> >> >> >>> [3]
>> >> >> >>>
>> >> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >>
>> >> >>
>> >>
>
>
>------------------------------
>Best,
>Ron

Reply via email to