Hi Roman, Sorry for the late reply, I was travelling. Please find my comments inline.
On Fri, Jun 19, 2026 at 8:36 AM Roman Khachatryan <[email protected]> wrote: > I might be missing something, but I don't see why the existing mechanisms > can't solve the problem; especially simple MapState (2) or ListState (6). > At the same time, I have concerns about the proposal itself (1), (4), (5): > Existing solutions can solve the problem, but not with the same run-time characteristics. Two things: 1. I have not tried MapState and ListState, but (most likely) they will increase the size of the state. 2. I can construct an example where the read time would be more than the proposed solution, since the aggregation will happen on read vs async. > > > 1. Is it possible that reads will become more costly in some scenarios? > > Your calculation of the cost of the proposed approach is correct. If > there > > is frequent read-after-write, then the merge operator does not add value. > > My point is that it not only doesn't improve; it degrades the performance > in these read-after-write cases. > So the proposal adds one more (expert-level) way to tune the runtime. > Which I believe we should avoid. > The proposal adds support for associative operators, and the example (the event reordering) is just one such operation. Currently, AFAIK, there are two solutions in Flink: 1. Use ValueState and implement read-modify-write 2. Use ListState and perform the aggregation during read Users can pick one of them based on their needs. The proposal exposes the already supported associative merge operators in RocksDB to Java in FrocksDB layer, and leverages that to add new types of state variables in Flink. I agree that this pattern is not helpful for read-after-write scenarios. In fact, read-modify-write is best for this situation. The proposed solution will help in write-heavy applications. There are several pointers on the web where merge operators in rocksdb can be helpful. Quoting https://artem.krylysov.com/blog/2023/04/19/how-rocksdb-works/ "Merge is a good fit for write-heavy streaming applications ..." The proposal adds one more way to implement associative operation, and I agree that to get the best performance this requires rocksdb tuning. IMHO, the async merging done by RocksDB distributes and parallelizes nicely, and fits within the processing style supported by Flink. So, we should add support for that. > > > > 2. Speaking more generally, could you list the motivating use cases > > > Isn't it possible to use MapState keyed by event time? > > > The sorting will come for free on RocksDB and PUT will only add the new > > > element without touching the existing ones. > > > (there is an API to check whether it's sorted or not) > > don't know how MapState can help with generic ordering, but my knowledge > on that is limited. > > A common pattern is to use MapState<Long, ...>, where keys are timestamps. > Such a MapState, when backed by RocksDB, is automatically sorted by > timestamps. > As I said, I have not used MapState. But, the proposal is adding support for generic associative merge operations by exposing existing functionality in RocksDB. > > > 4. Function dispatch during restore > > Great point! The Reduce/Aggregate functions in Flink are already > > serializable and in the proposal they are serialized in the savepoint and > > are restored when rocksdb is loaded. This allows rocksdb to call these > > functions during compaction before state descriptors are called. This way > > we don't need to disable compaction during restore. > > That means that in case of schema change, compactions will be using old > schema, right? That way, I'm afraid it can bypass state migration. > User needs to handle the schema changes in the Reduce/Aggregate functions. As long as it is done properly, I don't see any additional issue from this proposal. > > > 5. Remote compactions > > TBH, I don't understand ForSt in detail to comment on this item. Since > the > > proposal is exposing associative merge operators, it should not be an > issue > > to support in ForSt. In fact, if ForSt does not support associative merge > > operators, then I will volunteer to add it, but let's get this proposal > > first. > > My concern is about an external compaction component: > this proposal forces it to have job-specific java code > instead of having only C++ code only (or whatever is used in state > backend). > I have not used ForSt backed, and don't know the details. I will invite comments from experts, but my 2 cents is that we don't need to use the merge operators in the ForSt backend, so there won't be any side effects of this enhancement to the ForSt backend. > > > 6. Alternative: ListState > > I can implement the sorted list using this construct, but the read will > be more > > expensive than the proposal, since the sorting will happen during the > read. > > In the current proposal, read will trigger sorting as well - if the data is > not > compacted/sorted yet. And it will add more latency than with ListState > because of the extra read/write pass. > Yes, if the flush/compaction has not happened before read, then read will invoke associative operator callback. > ListState solution provides flexibility to choose when this work happens: > 1. Periodically (using processing time timers) - similar to compactions > 2. On reads > 3. On writes incrementally > 4. Some combination > 1. The compaction happens in different threads than writes in RocksDB. However, in the case of KeyedProcessFunction, the processElement() and onTimer() are single-threaded in Flink. Both methods are executed sequentially by the exact same task thread. The proposal will perform better because of the async aggregation. 2. If the flush/compaction has happened before read, then read will be way faster than ListState, else not. 3. Write will be similar, since both use rocksdb.merge. Thanks for your comments! Best, -Soumitra. > > On Mon, Jun 15, 2026 at 9:13 PM Soumitra Kumar <[email protected]> > wrote: > > > ---------- Forwarded message --------- > > From: Soumitra Kumar <[email protected]> > > Date: Mon, Jun 15, 2026, 12:12 PM > > Subject: Re: [DISCUSS] FLIP-XXX Support ReducingMergeState and > > AggregatingMergeState backed by Java based associative merge operators > > To: <[email protected]> > > > > > > Hi Roman, > > > > I replied to your questions a while back. Let me forward the thread to > > [email protected] . > > > > Best, > > -Soumitra. > > > > On Mon, Jun 15, 2026, 12:48 AM Roman Khachatryan <[email protected]> > wrote: > > > > > Hello Soumitra Kumar, > > > > > > It would be great to get the answers to the questions above I posted - > > > unless the problem is solved and the FLIP isn't necessary. > > > > > > Regards, > > > Roman > > > > > > > > > On Sun, Jun 14, 2026 at 9:41 PM Soumitra Kumar < > [email protected] > > > > > > wrote: > > > > > > > Hello Members, > > > > > > > > Thank you for your review so far. I don't have any open issues at > this > > > > moment. Please let me know if there is any issue for me to > > > clarify/address. > > > > > > > > Best, > > > > -Soumitra. > > > > > > > > On Mon, Jun 8, 2026 at 10:09 PM Soumitra Kumar < > > [email protected] > > > > > > > > wrote: > > > > > > > > > Hi Han, > > > > > > > > > > I have added a section on TTL of ReducingMergeState and > > > > > AggregatingMergeState - HERE > > > > > < > > > > > > > > > > https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?tab=t.0#heading=h.mqp1qeixcg45 > > > > > > > > , > > > > > please review. > > > > > > > > > > Best, > > > > > -Soumitra. > > > > > > > > > > On Mon, Jun 1, 2026 at 11:02 PM Soumitra Kumar < > > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > >> Hi Han, > > > > >> > > > > >> Thanks for your review and encouragement! > > > > >> > > > > >> #1 - Users can migrate from ReducingState to ReducingMergeState, > but > > > it > > > > >> has to be a conscious decision knowing the rocksdb implication. We > > > > should > > > > >> plan to create a few howto docs monitoring and tuning rocksdb to > get > > > the > > > > >> best out of the merge operators. Theoretically, it is possible to > > > build > > > > an > > > > >> automatic migration path, but I would not favor that because of > the > > > > >> different runtime characteristics of ReducingState and > > > > ReducingMergeState. > > > > >> The checkpoints/savepoints for > > > ReducingMergeState/AggregatingMergeState > > > > >> state variables will serialize the Reduce/Aggregate function as > > well. > > > > >> > > > > >> #2 - "Will this introduce different semantics when State TTL is > > > enabled" > > > > >> - Can you elaborate on this? TBH, I have not planned the details > of > > > the > > > > TTL > > > > >> of ReducingMergeState/AggregatingMergeState variables yet, but the > > TTL > > > > >> should be applied on the variable, not on individual operands. I > > will > > > > add a > > > > >> section on TTL of these variables in the FLIP. > > > > >> > > > > >> Best, > > > > >> -Soumitra. > > > > >> > > > > >> On Mon, Jun 1, 2026 at 3:03 AM Han Yin <[email protected]> > > wrote: > > > > >> > > > > >>> Hi Sumatra, > > > > >>> Thanks for the FLIP. The ability to leverage RocksDB merge > > operators > > > in > > > > >>> Reducing/Aggregating state is a really meaningful improvement. > > > > >>> > > > > >>> I share similar concerns about the user interface with the > previous > > > > >>> comments: > > > > >>> • If new state types are introduced, can users migrate their > > > > >>> existing jobs from ReducingState to ReducingMergeState? Since the > > > core > > > > >>> logic of the ReduceFunction remains the same, one would expect a > > > > >>> straightforward migration path. If yes, will > checkpoints/savepoints > > > > remain > > > > >>> compatible across this switch (and back)? > > > > >>> • Will this introduce different semantics when State TTL is > > > > enabled? > > > > >>> > > > > >>> > > > > > > > > > >
