Hi Xuyang, Thank you for looking into this—great work! The overall direction seems solid. I have two minor questions:
In theory, the implementation of AsyncStateOperator and SyncStateOperator > differs only in their state handling. Their state schemas, business logic, > and other aspects remain the same. Therefore, within the same Flink > version, when the SQL and other Flink configurations remain unchanged, or > when using the same compiled plan, users can freely switch between > AsyncStateOperator and SyncStateOperator by toggling the configuration > table.exec.async-state.enabled, as they are fully compatible. 1. Is there a way to enforce this as an invariant during build time, perhaps through a generic test framework that switches between the sync and async versions of all operators and verifies checkpoint compatibility? 2. If the only difference between them is state handling, could they potentially be implemented as the same operator with two different interfaces? My main concern is code reuse—it’s crucial to avoid duplicating code to ensure both implementations stay aligned. Additionally, could feature parity be verified at the test suite level (similar to the first question)? Perhaps we could create a single parameterized test suite that runs against both versions? Best, D. On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote: > Hi, everyone. > > I have updated the FLIP. Here are the newly added sections: > > > In theory, the implementation of AsyncStateOperator and SyncStateOperator > differs only in their state handling. > > Their state schemas, business logic, and others are the same. Therefore, > within the same Flink version, when the > > SQL and other Flink configurations remain unchanged, or when using the > same compiled plan, users can freely > > switch between AsyncStateOperator and SyncStateOperator by toggling the > configuration > > table.exec.async-state.enabled, as they are fully compatible. One known > exception is that after the > > SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], > switching back and forth between > > aligned and unaligned checkpoints is not compatible, which is an expected > behavior. > > > > -- > Best! > Xuyang > > > > 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道: > >Hi, Zakelly. > > > >You’re right. This is another advantage of choosing the where two sets of > >operators share the same execution node. In this approach, users can utilize > >the compiled plan and adjust the config 'table.exec.async-state.enabled', > >providing them with the flexibility to switch between asynchronous and > >synchronous state operators. > > > > > > > > > >-- > > > > Best! > > Xuyang > > > > > > > > > > > >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote: > >>Hi Xuyang, > >> > >>Thanks for driving this! I'm happy to see operator implementation on top of > >>the async state. Overall it looks good, I have one minor question: > >> > >>It seems the only difference between the new operator implementation and > >>the original one is the way of state accessing. Does this mean the state is > >>totally compatible when toggling the `table.exec.async-state.enabled` from > >>operator's point of view? And by this way users can freely migrate their > >>job to a newer minor version and leverage the newly implemented async > >>operator, without state loss, right? > >> > >> > >>Best, > >>Zakelly > >> > >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote: > >> > >>> Hi devs, > >>> > >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL > >>> Operators Based on Asynchronous State APIs[1]. > >>> > >>> > >>> > >>> > >>> FLIP-424[2] has introduced an asynchronous state architecture to mitigate > >>> I/O bottlenecks in Flink by offloading state > >>> > >>> access to separate threads from the main thread in each task, thereby > >>> optimizing I/O and computational resource utilization. > >>> > >>> Building upon this infrastructure, FLIP-473[1] proposes introducing new > >>> SQL operators based on these asynchronous state APIs. > >>> > >>> Our goal is to enhance throughput and reduce latency in stateful > >>> operations by leveraging the new asynchronous state architecture > >>> > >>> in Flink SQL jobs. > >>> > >>> > >>> > >>> > >>> For more details, please refer to the FLIP-473[1]. Welcome any feedback > >>> and suggestions for improvement. > >>> > >>> > >>> > >>> > >>> There has been POC code[3] related to the Join operator. > >>> > >>> > >>> > >>> > >>> Best! > >>> Xuyang > >>> > >>> > >>> > >>> > >>> [1] > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs > >>> > >>> [2] > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs > >>> > >>> [3] > >>> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8 > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >