Hi everyone. Thank you for all the insights and suggestions from those who participated in the discussion.
If there are no further comments, I will initiate the voting on August 21, 2024. -- Best! Xuyang 在 2024-08-19 10:26:01,"ron" <ld...@zju.edu.cn> 写道: >Hi, Xuyang > >Thanks for your proposal, Look good to me overall, +1 > >Best, >Ron > > >> -----原始邮件----- >> 发件人: Xuyang <xyzhong...@163.com> >> 发送时间:2024-08-14 15:20:59 (星期三) >> 收件人: dev@flink.apache.org >> 主题: Re:Re: Re: Re:Re: [DISCUSS] FLIP-473: Introduce New SQL Operators Based >> on Asynchronous State APIs >> >> Hi, Feng. Thank you for your feedback. >> >> I completely agree that these tests are crucial. I have updated the Roadmap >> section of >> >> the FLIP to include plans for harness tests and IT tests for data >> correctness, state >> >> compatibility tests, and performance regression tests. >> >> Regarding the usage of the APIs `thenCombine` and `combineAll`, I have also >> added a >> >> concluding note that there is no significant performance difference between >> these two, >> >> so either one can be used interchangeably. >> >> >> >> >> -- >> >> Best! >> Xuyang >> >> >> >> >> >> At 2024-08-13 17:10:32, "Feng Jin" <jinfeng1...@gmail.com> wrote: >> >Hi, Xuyang >> > >> >Thank you for initiating this FLIP. I believe this is a significant feature >> >for the future of Flink SQL, but I also share concerns about the >> >maintenance costs related to the correctness, state compatibility, and >> >performance of the two implementations. >> >I fully support ensuring state compatibility, functional correctness, and >> >performance regression detection through HarnessTests and IT Tests. 1. I >> >think relevant testing is a critical part. Is there a more detailed design >> >plan for the HarnessTests and performance regression test? 2. Regarding the >> >final performance comparison between thenCombine and combineAll, is there a >> >more specific conclusion? Which one should we use, or are both options >> >viable? >> > >> >Best, >> >Feng. >> > >> >On Mon, Aug 12, 2024 at 11:23 AM Xuyang <xyzhong...@163.com> wrote: >> > >> >> Hi, David. Thank you for your review. Let me address your questions: >> >> >> >> >1. Is there a way to enforce this as an invariant during build time, >> >> >perhaps through a generic test framework that switches between the sync >> >> and >> >> >async versions of all operators and verifies checkpoint compatibility? >> >> >> >> Yes, we need to incorporate a corresponding testing framework to verify >> >> the state compatibility during >> >> >> >> transitions between sync and async state operators. In my proposal, it >> >> resembles the existing RestoreTestBase, >> >> >> >> with the testing logic structured as follows: a. Start with the sync state >> >> operator to consume data; b. Execute a >> >> >> >> checkpoint; c. Restart with the async state operator and recover data from >> >> the checkpoint for re-consumption; >> >> >> >> d. Validate the correctness of the results. Additionally, we could also >> >> consider scenarios where the async state >> >> >> >> operator starts consuming data initially, followed by the restart with the >> >> sync state operator. I have updated this part >> >> >> >> to the section `TEST PLAN` in flip. >> >> >> >> >2. If the only difference between them is state handling, could they >> >> >potentially be implemented as the same operator with two different >> >> >interfaces? My main concern is code reuse—it’s crucial to avoid >> >> duplicating >> >> >code to ensure both implementations stay aligned. Additionally, could >> >> >feature parity be verified at the test suite level (similar to the first >> >> >question)? Perhaps we could create a single parameterized test suite that >> >> >runs against both versions? >> >> >> >> IIUC, your focus aligns with the roadmap’s mention of “Refactoring the >> >> sync and async state operators, >> >> >> >> leveraging shared logical calculations while abstracting the state access >> >> details.” Due to the intricate details >> >> >> >> of the code implementation, this was not elaborated in the flip. I share >> >> your vision of designing reusable business >> >> >> >> logic classes (such as JoinHelper) alongside different operator interfaces >> >> (SyncStateJoinOperator and >> >> >> >> AsyncStateJoinOperator), consolidating the reusable logic within the class >> >> JoinHelper. >> >> >> >> For synchronous operators, we already have harness tests to validate data >> >> correctness, and there will also be >> >> >> >> dedicated harness tests for asynchronous operators. Using a parameterized >> >> test suite for both harnesses is indeed feasible. >> >> >> >> >> >> >> >> >> >> -- >> >> >> >> Best! >> >> Xuyang >> >> >> >> >> >> >> >> >> >> >> >> 在 2024-08-09 20:49:49,"David Morávek" <d...@apache.org> 写道: >> >> >Hi Xuyang, >> >> > >> >> >Thank you for looking into this—great work! The overall direction seems >> >> >solid. I have two minor questions: >> >> > >> >> >In theory, the implementation of AsyncStateOperator and SyncStateOperator >> >> >> differs only in their state handling. Their state schemas, business >> >> logic, >> >> >> and other aspects remain the same. Therefore, within the same Flink >> >> >> version, when the SQL and other Flink configurations remain unchanged, >> >> or >> >> >> when using the same compiled plan, users can freely switch between >> >> >> AsyncStateOperator and SyncStateOperator by toggling the configuration >> >> >> table.exec.async-state.enabled, as they are fully compatible. >> >> > >> >> > >> >> >1. Is there a way to enforce this as an invariant during build time, >> >> >perhaps through a generic test framework that switches between the sync >> >> and >> >> >async versions of all operators and verifies checkpoint compatibility? >> >> > >> >> >2. If the only difference between them is state handling, could they >> >> >potentially be implemented as the same operator with two different >> >> >interfaces? My main concern is code reuse—it’s crucial to avoid >> >> duplicating >> >> >code to ensure both implementations stay aligned. Additionally, could >> >> >feature parity be verified at the test suite level (similar to the first >> >> >question)? Perhaps we could create a single parameterized test suite that >> >> >runs against both versions? >> >> > >> >> >Best, >> >> >D. >> >> > >> >> >On Thu, Aug 8, 2024 at 2:23 PM Xuyang <xyzhong...@163.com> wrote: >> >> > >> >> >> Hi, everyone. >> >> >> >> >> >> I have updated the FLIP. Here are the newly added sections: >> >> >> >> >> >> >> >> >> In theory, the implementation of AsyncStateOperator and >> >> SyncStateOperator >> >> >> differs only in their state handling. >> >> >> >> >> >> Their state schemas, business logic, and others are the same. >> >> >> Therefore, >> >> >> within the same Flink version, when the >> >> >> >> >> >> SQL and other Flink configurations remain unchanged, or when using the >> >> >> same compiled plan, users can freely >> >> >> >> >> >> switch between AsyncStateOperator and SyncStateOperator by toggling the >> >> >> configuration >> >> >> >> >> >> table.exec.async-state.enabled, as they are fully compatible. One known >> >> >> exception is that after the >> >> >> >> >> >> SQL AsyncStateOperator supports in-flight checkpoints in FLIP-455[2], >> >> >> switching back and forth between >> >> >> >> >> >> aligned and unaligned checkpoints is not compatible, which is an >> >> expected >> >> >> behavior. >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> Best! >> >> >> Xuyang >> >> >> >> >> >> >> >> >> >> >> >> 在 2024-08-07 19:05:57,"Xuyang" <xyzhong...@163.com> 写道: >> >> >> >Hi, Zakelly. >> >> >> > >> >> >> >You’re right. This is another advantage of choosing the where two sets >> >> of operators share the same execution node. In this approach, users can >> >> utilize the compiled plan and adjust the config >> >> 'table.exec.async-state.enabled', providing them with the flexibility to >> >> switch between asynchronous and synchronous state operators. >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >-- >> >> >> > >> >> >> > Best! >> >> >> > Xuyang >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >At 2024-08-06 18:54:24, "Zakelly Lan" <zakelly....@gmail.com> wrote: >> >> >> >>Hi Xuyang, >> >> >> >> >> >> >> >>Thanks for driving this! I'm happy to see operator implementation on >> >> top of >> >> >> >>the async state. Overall it looks good, I have one minor question: >> >> >> >> >> >> >> >>It seems the only difference between the new operator implementation >> >> and >> >> >> >>the original one is the way of state accessing. Does this mean the >> >> state is >> >> >> >>totally compatible when toggling the `table.exec.async-state.enabled` >> >> from >> >> >> >>operator's point of view? And by this way users can freely migrate >> >> their >> >> >> >>job to a newer minor version and leverage the newly implemented async >> >> >> >>operator, without state loss, right? >> >> >> >> >> >> >> >> >> >> >> >>Best, >> >> >> >>Zakelly >> >> >> >> >> >> >> >>On Mon, Aug 5, 2024 at 10:40 AM Xuyang <xyzhong...@163.com> wrote: >> >> >> >> >> >> >> >>> Hi devs, >> >> >> >>> >> >> >> >>> I'd like to initiate a discussion about FLIP-473: Introduce New SQL >> >> >> >>> Operators Based on Asynchronous State APIs[1]. >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> FLIP-424[2] has introduced an asynchronous state architecture to >> >> mitigate >> >> >> >>> I/O bottlenecks in Flink by offloading state >> >> >> >>> >> >> >> >>> access to separate threads from the main thread in each task, >> >> thereby >> >> >> >>> optimizing I/O and computational resource utilization. >> >> >> >>> >> >> >> >>> Building upon this infrastructure, FLIP-473[1] proposes introducing >> >> new >> >> >> >>> SQL operators based on these asynchronous state APIs. >> >> >> >>> >> >> >> >>> Our goal is to enhance throughput and reduce latency in stateful >> >> >> >>> operations by leveraging the new asynchronous state architecture >> >> >> >>> >> >> >> >>> in Flink SQL jobs. >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> For more details, please refer to the FLIP-473[1]. Welcome any >> >> feedback >> >> >> >>> and suggestions for improvement. >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> There has been POC code[3] related to the Join operator. >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> Best! >> >> >> >>> Xuyang >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> [1] >> >> >> >>> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs >> >> >> >>> >> >> >> >>> [2] >> >> >> >>> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-424%3A+Asynchronous+State+APIs >> >> >> >>> >> >> >> >>> [3] >> >> >> >>> >> >> https://github.com/apache/flink/commit/bccace0f4b233e10279c7d95e009ae6aadad5ae8 >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >>> >> >> >> >> >> >> >> >> > > >------------------------------ >Best, >Ron