Hi, Xuyang Thanks for your proposal, Look good to me overall, +1
Best, Ron > -----原始邮件----- > 发件人: Xuyang <xyzhong...@163.com> > 发送时间:2024-08-14 15:20:59 (星期三) > 收件人: dev@flink.apache.org > 主题: Re:Re: Re: Re:Re: [DISCUSS] FLIP-473: Introduce New SQL Operators Based > on Asynchronous State APIs > > Hi, Feng. Thank you for your feedback. > > I completely agree that these tests are crucial. I have updated the Roadmap > section of > > the FLIP to include plans for harness tests and IT tests for data > correctness, state > > compatibility tests, and performance regression tests. > > Regarding the usage of the APIs `thenCombine` and `combineAll`, I have also > added a > > concluding note that there is no significant performance difference between > these two, > > so either one can be used interchangeably. > > > > > -- > > Best! > Xuyang > > > > > > At 2024-08-13 17:10:32, "Feng Jin" <jinfeng1...@gmail.com> wrote: > >Hi, Xuyang > > > >Thank you for initiating this FLIP. I believe this is a significant feature > >for the future of Flink SQL, but I also share concerns about the > >maintenance costs related to the correctness, state compatibility, and > >performance of the two implementations. > >I fully support ensuring state compatibility, functional correctness, and > >performance regression detection through HarnessTests and IT Tests. 1. I > >think relevant testing is a critical part. Is there a more detailed design > >plan for the HarnessTests and performance regression test? 2. Regarding the > >final performance comparison between thenCombine and combineAll, is there a > >more specific conclusion? Which one should we use, or are both options > >viable? > > > >Best, > >Feng. > > > >On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote: > > > >> Hi, David. Thank you for your review. Let me address your questions: > >> > >> >1. Is there a way to enforce this as an invariant during build time, > >> >perhaps through a generic test framework that switches between the sync > >> and > >> >async versions of all operators and verifies checkpoint compatibility? > >> > >> Yes, we need to incorporate a corresponding testing framework to verify > >> the state compatibility during > >> > >> transitions between sync and async state operators. In my proposal, it > >> resembles the existing RestoreTestBase, > >> > >> with the testing logic structured as follows: a. Start with the sync state > >> operator to consume data; b. Execute a > >> > >> checkpoint; c. Restart with the async state operator and recover data from > >> the checkpoint for re-consumption; > >> > >> d. Validate the correctness of the results. Additionally, we could also > >> consider scenarios where the async state > >> > >> operator starts consuming data initially, followed by the restart with the > >> sync state operator. I have updated this part > >> > >> to the section `TEST PLAN` in flip. > >> > >> >2. If the only difference between them is state handling, could they > >> >potentially be implemented as the same operator with two different > >> >interfaces? My main concern is code reuse—it’s crucial to avoid > >> duplicating > >> >code to ensure both implementations stay aligned. Additionally, could > >> >feature parity be verified at the test suite level (similar to the first > >> >question)? Perhaps we could create a single parameterized test suite that > >> >runs against both versions? > >> > >> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the > >> sync and async state operators, > >> > >> leveraging shared logical calculations while abstracting the state access > >> details.” Due to the intricate details > >> > >> of the code implementation, this was not elaborated in the flip. I share > >> your vision of designing reusable business > >> > >> logic classes (such as JoinHelper) alongside different operator interfaces > >> (SyncStateJoinOperator and > >> > >> AsyncStateJoinOperator), consolidating the reusable logic within the class > >> JoinHelper. > >> > >> For synchronous operators, we already have harness tests to validate data > >> correctness, and there will also be > >> > >> dedicated harness tests for asynchronous operators. Using a parameterized > >> test suite for both harnesses is indeed feasible. > >> > >> > >> > >> > >> -- > >> > >> Best! > >> Xuyang > >> > >> > >> > >> > >> > >> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道: > >> >Hi Xuyang, > >> > > >> >Thank you for looking into this—great work! The overall direction seems > >> >solid. I have two minor questions: > >> > > >> >In theory, the implementation of AsyncStateOperator and SyncStateOperator > >> >> differs only in their state handling. Their state schemas, business > >> logic, > >> >> and other aspects remain the same. Therefore, within the same Flink > >> >> version, when the SQL and other Flink configurations remain unchanged, > >> or > >> >> when using the same compiled plan, users can freely switch between > >> >> AsyncStateOperator and SyncStateOperator by toggling the configuration > >> >> table.exec.async-state.enabled, as they are fully compatible. > >> > > >> > > >> >1. Is there a way to enforce this as an invariant during build time, > >> >perhaps through a generic test framework that switches between the sync > >> and > >> >async versions of all operators and verifies checkpoint compatibility? > >> > > >> >2. If the only difference between them is state handling, could they > >> >potentially be implemented as the same operator with two different > >> >interfaces? My main concern is code reuse—it’s crucial to avoid > >> duplicating > >> >code to ensure both implementations stay aligned. Additionally, could > >> >feature parity be verified at the test suite level (similar to the first > >> >question)? Perhaps we could create a single parameterized test suite that > >> >runs against both versions? > >> > > >> >Best, > >> >D. > >> > > >> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote: > >> > > >> >> Hi, everyone. > >> >> > >> >> I have updated the FLIP. Here are the newly added sections: > >> >> > >> >> > >> >> In theory, the implementation of AsyncStateOperator and > >> SyncStateOperator > >> >> differs only in their state handling. > >> >> > >> >> Their state schemas, business logic, and others are the same. Therefore, > >> >> within the same Flink version, when the > >> >> > >> >> SQL and other Flink configurations remain unchanged, or when using the > >> >> same compiled plan, users can freely > >> >> > >> >> switch between AsyncStateOperator and SyncStateOperator by toggling the > >> >> configuration > >> >> > >> >> table.exec.async-state.enabled, as they are fully compatible. One known > >> >> exception is that after the > >> >> > >> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], > >> >> switching back and forth between > >> >> > >> >> aligned and unaligned checkpoints is not compatible, which is an > >> expected > >> >> behavior. > >> >> > >> >> > >> >> > >> >> -- > >> >> Best! > >> >> Xuyang > >> >> > >> >> > >> >> > >> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道: > >> >> >Hi, Zakelly. > >> >> > > >> >> >You’re right. This is another advantage of choosing the where two sets > >> of operators share the same execution node. In this approach, users can > >> utilize the compiled plan and adjust the config > >> 'table.exec.async-state.enabled', providing them with the flexibility to > >> switch between asynchronous and synchronous state operators. > >> >> > > >> >> > > >> >> > > >> >> > > >> >> >-- > >> >> > > >> >> > Best! > >> >> > Xuyang > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote: > >> >> >>Hi Xuyang, > >> >> >> > >> >> >>Thanks for driving this! I'm happy to see operator implementation on > >> top of > >> >> >>the async state. Overall it looks good, I have one minor question: > >> >> >> > >> >> >>It seems the only difference between the new operator implementation > >> and > >> >> >>the original one is the way of state accessing. Does this mean the > >> state is > >> >> >>totally compatible when toggling the `table.exec.async-state.enabled` > >> from > >> >> >>operator's point of view? And by this way users can freely migrate > >> their > >> >> >>job to a newer minor version and leverage the newly implemented async > >> >> >>operator, without state loss, right? > >> >> >> > >> >> >> > >> >> >>Best, > >> >> >>Zakelly > >> >> >> > >> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote: > >> >> >> > >> >> >>> Hi devs, > >> >> >>> > >> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL > >> >> >>> Operators Based on Asynchronous State APIs[1]. > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to > >> mitigate > >> >> >>> I/O bottlenecks in Flink by offloading state > >> >> >>> > >> >> >>> access to separate threads from the main thread in each task, > >> thereby > >> >> >>> optimizing I/O and computational resource utilization. > >> >> >>> > >> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing > >> new > >> >> >>> SQL operators based on these asynchronous state APIs. > >> >> >>> > >> >> >>> Our goal is to enhance throughput and reduce latency in stateful > >> >> >>> operations by leveraging the new asynchronous state architecture > >> >> >>> > >> >> >>> in Flink SQL jobs. > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any > >> feedback > >> >> >>> and suggestions for improvement. > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> There has been POC code[3] related to the Join operator. > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> Best! > >> >> >>> Xuyang > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> [1] > >> >> >>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs > >> >> >>> > >> >> >>> [2] > >> >> >>> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs > >> >> >>> > >> >> >>> [3] > >> >> >>> > >> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8 > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> >>> > >> >> > >> >> > >> ------------------------------ Best, Ron