Hi, Feng. Thank you for your feedback. I completely agree that these tests are crucial. I have updated the Roadmap section of
the FLIP to include plans for harness tests and IT tests for data correctness, state compatibility tests, and performance regression tests. Regarding the usage of the APIs `thenCombine` and `combineAll`, I have also added a concluding note that there is no significant performance difference between these two, so either one can be used interchangeably. -- Best! Xuyang At 2024-08-13 17:10:32, "Feng Jin" <jinfeng1...@gmail.com> wrote: >Hi, Xuyang > >Thank you for initiating this FLIP. I believe this is a significant feature >for the future of Flink SQL, but I also share concerns about the >maintenance costs related to the correctness, state compatibility, and >performance of the two implementations. >I fully support ensuring state compatibility, functional correctness, and >performance regression detection through HarnessTests and IT Tests. 1. I >think relevant testing is a critical part. Is there a more detailed design >plan for the HarnessTests and performance regression test? 2. Regarding the >final performance comparison between thenCombine and combineAll, is there a >more specific conclusion? Which one should we use, or are both options >viable? > >Best, >Feng. > >On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote: > >> Hi, David. Thank you for your review. Let me address your questions: >> >> >1. Is there a way to enforce this as an invariant during build time, >> >perhaps through a generic test framework that switches between the sync >> and >> >async versions of all operators and verifies checkpoint compatibility? >> >> Yes, we need to incorporate a corresponding testing framework to verify >> the state compatibility during >> >> transitions between sync and async state operators. In my proposal, it >> resembles the existing RestoreTestBase, >> >> with the testing logic structured as follows: a. Start with the sync state >> operator to consume data; b. Execute a >> >> checkpoint; c. Restart with the async state operator and recover data from >> the checkpoint for re-consumption; >> >> d. Validate the correctness of the results. Additionally, we could also >> consider scenarios where the async state >> >> operator starts consuming data initially, followed by the restart with the >> sync state operator. I have updated this part >> >> to the section `TEST PLAN` in flip. >> >> >2. If the only difference between them is state handling, could they >> >potentially be implemented as the same operator with two different >> >interfaces? My main concern is code reuse—it’s crucial to avoid >> duplicating >> >code to ensure both implementations stay aligned. Additionally, could >> >feature parity be verified at the test suite level (similar to the first >> >question)? Perhaps we could create a single parameterized test suite that >> >runs against both versions? >> >> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the >> sync and async state operators, >> >> leveraging shared logical calculations while abstracting the state access >> details.” Due to the intricate details >> >> of the code implementation, this was not elaborated in the flip. I share >> your vision of designing reusable business >> >> logic classes (such as JoinHelper) alongside different operator interfaces >> (SyncStateJoinOperator and >> >> AsyncStateJoinOperator), consolidating the reusable logic within the class >> JoinHelper. >> >> For synchronous operators, we already have harness tests to validate data >> correctness, and there will also be >> >> dedicated harness tests for asynchronous operators. Using a parameterized >> test suite for both harnesses is indeed feasible. >> >> >> >> >> -- >> >> Best! >> Xuyang >> >> >> >> >> >> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道: >> >Hi Xuyang, >> > >> >Thank you for looking into this—great work! The overall direction seems >> >solid. I have two minor questions: >> > >> >In theory, the implementation of AsyncStateOperator and SyncStateOperator >> >> differs only in their state handling. Their state schemas, business >> logic, >> >> and other aspects remain the same. Therefore, within the same Flink >> >> version, when the SQL and other Flink configurations remain unchanged, >> or >> >> when using the same compiled plan, users can freely switch between >> >> AsyncStateOperator and SyncStateOperator by toggling the configuration >> >> table.exec.async-state.enabled, as they are fully compatible. >> > >> > >> >1. Is there a way to enforce this as an invariant during build time, >> >perhaps through a generic test framework that switches between the sync >> and >> >async versions of all operators and verifies checkpoint compatibility? >> > >> >2. If the only difference between them is state handling, could they >> >potentially be implemented as the same operator with two different >> >interfaces? My main concern is code reuse—it’s crucial to avoid >> duplicating >> >code to ensure both implementations stay aligned. Additionally, could >> >feature parity be verified at the test suite level (similar to the first >> >question)? Perhaps we could create a single parameterized test suite that >> >runs against both versions? >> > >> >Best, >> >D. >> > >> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote: >> > >> >> Hi, everyone. >> >> >> >> I have updated the FLIP. Here are the newly added sections: >> >> >> >> >> >> In theory, the implementation of AsyncStateOperator and >> SyncStateOperator >> >> differs only in their state handling. >> >> >> >> Their state schemas, business logic, and others are the same. Therefore, >> >> within the same Flink version, when the >> >> >> >> SQL and other Flink configurations remain unchanged, or when using the >> >> same compiled plan, users can freely >> >> >> >> switch between AsyncStateOperator and SyncStateOperator by toggling the >> >> configuration >> >> >> >> table.exec.async-state.enabled, as they are fully compatible. One known >> >> exception is that after the >> >> >> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], >> >> switching back and forth between >> >> >> >> aligned and unaligned checkpoints is not compatible, which is an >> expected >> >> behavior. >> >> >> >> >> >> >> >> -- >> >> Best! >> >> Xuyang >> >> >> >> >> >> >> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道: >> >> >Hi, Zakelly. >> >> > >> >> >You’re right. This is another advantage of choosing the where two sets >> of operators share the same execution node. In this approach, users can >> utilize the compiled plan and adjust the config >> 'table.exec.async-state.enabled', providing them with the flexibility to >> switch between asynchronous and synchronous state operators. >> >> > >> >> > >> >> > >> >> > >> >> >-- >> >> > >> >> > Best! >> >> > Xuyang >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote: >> >> >>Hi Xuyang, >> >> >> >> >> >>Thanks for driving this! I'm happy to see operator implementation on >> top of >> >> >>the async state. Overall it looks good, I have one minor question: >> >> >> >> >> >>It seems the only difference between the new operator implementation >> and >> >> >>the original one is the way of state accessing. Does this mean the >> state is >> >> >>totally compatible when toggling the `table.exec.async-state.enabled` >> from >> >> >>operator's point of view? And by this way users can freely migrate >> their >> >> >>job to a newer minor version and leverage the newly implemented async >> >> >>operator, without state loss, right? >> >> >> >> >> >> >> >> >>Best, >> >> >>Zakelly >> >> >> >> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote: >> >> >> >> >> >>> Hi devs, >> >> >>> >> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL >> >> >>> Operators Based on Asynchronous State APIs[1]. >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to >> mitigate >> >> >>> I/O bottlenecks in Flink by offloading state >> >> >>> >> >> >>> access to separate threads from the main thread in each task, >> thereby >> >> >>> optimizing I/O and computational resource utilization. >> >> >>> >> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing >> new >> >> >>> SQL operators based on these asynchronous state APIs. >> >> >>> >> >> >>> Our goal is to enhance throughput and reduce latency in stateful >> >> >>> operations by leveraging the new asynchronous state architecture >> >> >>> >> >> >>> in Flink SQL jobs. >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any >> feedback >> >> >>> and suggestions for improvement. >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> There has been POC code[3] related to the Join operator. >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> Best! >> >> >>> Xuyang >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> [1] >> >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs >> >> >>> >> >> >>> [2] >> >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs >> >> >>> >> >> >>> [3] >> >> >>> >> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8 >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >>> >> >> >> >> >>