Hi, Xuyang Thank you for initiating this FLIP. I believe this is a significant feature for the future of Flink SQL, but I also share concerns about the maintenance costs related to the correctness, state compatibility, and performance of the two implementations. I fully support ensuring state compatibility, functional correctness, and performance regression detection through HarnessTests and IT Tests. 1. I think relevant testing is a critical part. Is there a more detailed design plan for the HarnessTests and performance regression test? 2. Regarding the final performance comparison between thenCombine and combineAll, is there a more specific conclusion? Which one should we use, or are both options viable?
Best, Feng. On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote: > Hi, David. Thank you for your review. Let me address your questions: > > >1. Is there a way to enforce this as an invariant during build time, > >perhaps through a generic test framework that switches between the sync > and > >async versions of all operators and verifies checkpoint compatibility? > > Yes, we need to incorporate a corresponding testing framework to verify > the state compatibility during > > transitions between sync and async state operators. In my proposal, it > resembles the existing RestoreTestBase, > > with the testing logic structured as follows: a. Start with the sync state > operator to consume data; b. Execute a > > checkpoint; c. Restart with the async state operator and recover data from > the checkpoint for re-consumption; > > d. Validate the correctness of the results. Additionally, we could also > consider scenarios where the async state > > operator starts consuming data initially, followed by the restart with the > sync state operator. I have updated this part > > to the section `TEST PLAN` in flip. > > >2. If the only difference between them is state handling, could they > >potentially be implemented as the same operator with two different > >interfaces? My main concern is code reuse—it’s crucial to avoid > duplicating > >code to ensure both implementations stay aligned. Additionally, could > >feature parity be verified at the test suite level (similar to the first > >question)? Perhaps we could create a single parameterized test suite that > >runs against both versions? > > IIUC, your focus aligns with the roadmap’s mention of “Refactoring the > sync and async state operators, > > leveraging shared logical calculations while abstracting the state access > details.” Due to the intricate details > > of the code implementation, this was not elaborated in the flip. I share > your vision of designing reusable business > > logic classes (such as JoinHelper) alongside different operator interfaces > (SyncStateJoinOperator and > > AsyncStateJoinOperator), consolidating the reusable logic within the class > JoinHelper. > > For synchronous operators, we already have harness tests to validate data > correctness, and there will also be > > dedicated harness tests for asynchronous operators. Using a parameterized > test suite for both harnesses is indeed feasible. > > > > > -- > > Best! > Xuyang > > > > > > 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道: > >Hi Xuyang, > > > >Thank you for looking into this—great work! The overall direction seems > >solid. I have two minor questions: > > > >In theory, the implementation of AsyncStateOperator and SyncStateOperator > >> differs only in their state handling. Their state schemas, business > logic, > >> and other aspects remain the same. Therefore, within the same Flink > >> version, when the SQL and other Flink configurations remain unchanged, > or > >> when using the same compiled plan, users can freely switch between > >> AsyncStateOperator and SyncStateOperator by toggling the configuration > >> table.exec.async-state.enabled, as they are fully compatible. > > > > > >1. Is there a way to enforce this as an invariant during build time, > >perhaps through a generic test framework that switches between the sync > and > >async versions of all operators and verifies checkpoint compatibility? > > > >2. If the only difference between them is state handling, could they > >potentially be implemented as the same operator with two different > >interfaces? My main concern is code reuse—it’s crucial to avoid > duplicating > >code to ensure both implementations stay aligned. Additionally, could > >feature parity be verified at the test suite level (similar to the first > >question)? Perhaps we could create a single parameterized test suite that > >runs against both versions? > > > >Best, > >D. > > > >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote: > > > >> Hi, everyone. > >> > >> I have updated the FLIP. Here are the newly added sections: > >> > >> > >> In theory, the implementation of AsyncStateOperator and > SyncStateOperator > >> differs only in their state handling. > >> > >> Their state schemas, business logic, and others are the same. Therefore, > >> within the same Flink version, when the > >> > >> SQL and other Flink configurations remain unchanged, or when using the > >> same compiled plan, users can freely > >> > >> switch between AsyncStateOperator and SyncStateOperator by toggling the > >> configuration > >> > >> table.exec.async-state.enabled, as they are fully compatible. One known > >> exception is that after the > >> > >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], > >> switching back and forth between > >> > >> aligned and unaligned checkpoints is not compatible, which is an > expected > >> behavior. > >> > >> > >> > >> -- > >> Best! > >> Xuyang > >> > >> > >> > >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道: > >> >Hi, Zakelly. > >> > > >> >You’re right. This is another advantage of choosing the where two sets > of operators share the same execution node. In this approach, users can > utilize the compiled plan and adjust the config > 'table.exec.async-state.enabled', providing them with the flexibility to > switch between asynchronous and synchronous state operators. > >> > > >> > > >> > > >> > > >> >-- > >> > > >> > Best! > >> > Xuyang > >> > > >> > > >> > > >> > > >> > > >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote: > >> >>Hi Xuyang, > >> >> > >> >>Thanks for driving this! I'm happy to see operator implementation on > top of > >> >>the async state. Overall it looks good, I have one minor question: > >> >> > >> >>It seems the only difference between the new operator implementation > and > >> >>the original one is the way of state accessing. Does this mean the > state is > >> >>totally compatible when toggling the `table.exec.async-state.enabled` > from > >> >>operator's point of view? And by this way users can freely migrate > their > >> >>job to a newer minor version and leverage the newly implemented async > >> >>operator, without state loss, right? > >> >> > >> >> > >> >>Best, > >> >>Zakelly > >> >> > >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote: > >> >> > >> >>> Hi devs, > >> >>> > >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL > >> >>> Operators Based on Asynchronous State APIs[1]. > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> FLIP-424[2] has introduced an asynchronous state architecture to > mitigate > >> >>> I/O bottlenecks in Flink by offloading state > >> >>> > >> >>> access to separate threads from the main thread in each task, > thereby > >> >>> optimizing I/O and computational resource utilization. > >> >>> > >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing > new > >> >>> SQL operators based on these asynchronous state APIs. > >> >>> > >> >>> Our goal is to enhance throughput and reduce latency in stateful > >> >>> operations by leveraging the new asynchronous state architecture > >> >>> > >> >>> in Flink SQL jobs. > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> For more details, please refer to the FLIP-473[1]. Welcome any > feedback > >> >>> and suggestions for improvement. > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> There has been POC code[3] related to the Join operator. > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> Best! > >> >>> Xuyang > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> [1] > >> >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs > >> >>> > >> >>> [2] > >> >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs > >> >>> > >> >>> [3] > >> >>> > https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8 > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> > >> >