Hi Yunfeng and Dong, Thanks for this FLIP. I have reviewed it briefly and have a few questions:
1. Is this FLIP proposing to buffer the output in the state backend? If so, what is the data format of this buffer (what type of state does it use and what is the value)? Additionally, how does the operator retrieve all the buffer data from the state backend during the `flush()` operation (while the keyed states can only be accessed under a keyed context)? 2. Are the buffered intermediate results required to be included in the next checkpoint? Or are they deleted and subsumed in the original states during the `flush()` operation before triggering the checkpoint? I'm asking because if they are not included in the checkpoint, it may be more efficient to avoid using keyed states for buffering. In this scenario, a simple heap-based or even file-based map could be more efficient. Frequent writes and clears can lead to increased space usage and read amplification for RocksDB, and it also requires more CPU resources for checkpointing and compaction. Looking forward to your thoughts. Best, Zakelly On Mon, Sep 11, 2023 at 1:39 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> wrote: > > Hi all, > > Dong(cc'ed) and I are opening this thread to discuss our proposal to > support buffering & flushing the output of operators with idempotent > semantics, which has been documented in > FLIP-365<https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics>. > > In the pursuit of unifying batch and stream processing, it has been > discovered that the batch execution mode provides a significant > advantage by allowing only the final result from "rolling" operations > such as reduce() or sum() to be emitted, thus reducing the amount of > time and resources required by downstream applications. Inspired by > this advantage, the proposed solution supports buffering the output of > operators in the streaming mode and periodically emitting the final > results, much like in batch processing. This approach is designed to > help improve the throughput of jobs by reducing the need for > downstream applications to process intermediate results in a stream, > at the cost of increased latency and state size. > > Please refer to the FLIP document for more details about the proposed > design and implementation. We welcome any feedback and opinions on > this proposal. > > Best regards. > Dong and Yunfeng