Hi Yunfeng,

Thank you for providing the information. Here are my opinions:

1. Could you please include the implementation details in your FLIP? I
believe it would be helpful for further discussion. Additionally, I
have a concern regarding the usage of ValueState/ListState. Would it
introduce more serialization/deserialization overhead?
2.1  I think the buffered intermediate results *themselves* are not
included in the next checkpoint based on your description. They are
cleared and subsumed in the original states right before the
checkpoint. However, the deprecated data and tombstones reside in the
LSM-tree files, which brings in unnecessary checkpoint consumption.
2.2 Based on my experience, many jobs face bottlenecks on state
backends. Mini-batching helps in reducing state access overhead,
leading to performance advantages in certain scenarios. I can see the
benefits you mentioned in the FLIP, but I am concerned about whether
it is a good choice to increase the burden on the bottleneck and use
such a resource-intensive component. I have noticed some "useless" but
performance-related design aspects in keyed state for your use case,
such as time-to-live (wrapped as part of the value), KeyedGroup
(calculating hash during key iteration, serialized as a prefix in
Rocksdb), and checkpoint (CPU/IO/network consumption). What if we
design a new buffer holder, borrowing only "useful" designs from the
state backend, would it necessarily be better than the design of the
current state backends?

I'm not very familiar with mini-batching, and since this FLIP proposed
to deprecate the mini-batching, it is better to involve more experts
to discuss this topic.


Best,
Zakelly


On Fri, Sep 22, 2023 at 9:10 AM Yunfeng Zhou
<flink.zhouyunf...@gmail.com> wrote:
>
> Hi Zakelly,
>
> Thanks for your comments on this FLIP. Please let me attempt to
> clarify these points.
>
> 1. Yes, this FLIP proposes to buffer the outputs in the state backend.
> As only the latest one of each type of StreamElement is about to be
> buffered, a ValueState in keyed context or a ListState in non-keyed
> context would be enough to hold each type of StreamElement. The value
> to be stored in the ValueState/ListState would be the original
> StreamRecord/Watermark/WatermarkStatus/LatencyMarker. Besides, the
> KeyedStateBackend#applyToAllKeys method makes it possible to access
> states for all keys in one keyed context.
>
> 2.1 The buffered intermediate results need to be included in the next
> checkpoint to preserve exactly-once semantics during failover. The
> buffer would be cleared in the `flush()` operation, but `flush()` need
> not be triggered before checkpoints. I agree with it that saving
> buffered results to state would increase the workload about state
> access operations, but given that the state buffer would be enabled on
> aggregation operators which already involve states, the additional
> buffer results would not increase the time complexity of state
> accesses or the memory(state) complexity. If we could exchange one
> state read/write operation and the space of a ValueState with all
> computations in downstream operators to process one intermediate
> result, I believe the optimization to throughput would be worth the
> tradeoff in states.
>
> 2.2 Not considering checkpoints, it might still be meaningful to
> discuss the alternative solutions to store buffered results during
> runtime as proposed in your suggestions. At least for keyed streams,
> I'm concerned that saving all buffered results in memory would easily
> cause OOM problems, as there is no guarantee on the number of keyed
> states to store between a flush interval. I'm also wondering whether a
> file-based map would have better performance than state backends, and
> why Flink haven't introduced FileSystemStateBackend if file-based map
> could be better. Could you please provide more illustrations on the
> pros & cons of state backend v.s. memory/filesystem?
>
> Best regards,
> Yunfeng
>
> On Thu, Sep 21, 2023 at 4:10 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > Hi Yunfeng and Dong,
> >
> > Thanks for this FLIP. I have reviewed it briefly and have a few questions:
> >
> > 1. Is this FLIP proposing to buffer the output in the state backend?
> > If so, what is the data format of this buffer (what type of state does
> > it use and what is the value)? Additionally, how does the operator
> > retrieve all the buffer data from the state backend during the
> > `flush()` operation (while the keyed states can only be accessed under
> > a keyed context)?
> > 2. Are the buffered intermediate results required to be included in
> > the next checkpoint? Or are they deleted and subsumed in the original
> > states during the `flush()` operation before triggering the
> > checkpoint? I'm asking because if they are not included in the
> > checkpoint, it may be more efficient to avoid using keyed states for
> > buffering. In this scenario, a simple heap-based or even file-based
> > map could be more efficient. Frequent writes and clears can lead to
> > increased space usage and read amplification for RocksDB, and it also
> > requires more CPU resources for checkpointing and compaction.
> >
> >
> > Looking forward to your thoughts.
> >
> >
> > Best,
> > Zakelly
> >
> >
> > On Mon, Sep 11, 2023 at 1:39 PM Yunfeng Zhou
> > <flink.zhouyunf...@gmail.com> wrote:
> > >
> > > Hi all,
> > >
> > > Dong(cc'ed) and I are opening this thread to discuss our proposal to
> > > support buffering & flushing the output of operators with idempotent
> > > semantics,  which has been documented in
> > > FLIP-365<https://cwiki.apache.org/confluence/display/FLINK/FLIP-365%3A+Introduce+flush+interval+to+adjust+the+interval+of+emitting+results+with+idempotent+semantics>.
> > >
> > > In the pursuit of unifying batch and stream processing, it has been
> > > discovered that the batch execution mode provides a significant
> > > advantage by allowing only the final result from "rolling" operations
> > > such as reduce() or sum() to be emitted, thus reducing the amount of
> > > time and resources required by downstream applications. Inspired by
> > > this advantage, the proposed solution supports buffering the output of
> > > operators in the streaming mode and periodically emitting the final
> > > results, much like in batch processing. This approach is designed to
> > > help improve the throughput of jobs by reducing the need for
> > > downstream applications to process intermediate results in a stream,
> > > at the cost of increased latency and state size.
> > >
> > > Please refer to the FLIP document for more details about the proposed
> > > design and implementation. We welcome any feedback and opinions on
> > > this proposal.
> > >
> > > Best regards.
> > > Dong and Yunfeng

Reply via email to