Hello Yuan, I have added a "Motivation Use Case" section in FLIP - HERE <https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?tab=t.0#heading=h.jhkgiqdhf9x1>
Please review. Best, -Soumitra. On Mon, Jun 1, 2026 at 10:25 PM Soumitra Kumar <[email protected]> wrote: > Hello Yuan, > > Thanks for your review! > > #1 - I will add the use case to the FLIP as you suggested. I have > DataStream use cases at this moment. > > #2 - Existing ReducingState and proposed ReducingMergeState are compatible > from the behavior perspective. But, ReducingState is implemented as > read-modify-write, but ReducingMergeState will be implemented using > RocksDB.merge operation using the associative merge operator. The runtime > characteristics of both implementations are different. I would leave this > to users to decide when they want to use which one. I started using the > read-modify-write based state variables, and hit scaling challenges due to > high velocity writes, then came to the proposed approach. We need to tune > rocksdb's flush/compaction behavior to get the best out of > ReducingMergeState. This should be a conscious choice of users to pick the > right state variable, I would not recommend introducing this change > transparently. > > #3 - There will be two PRs, FrocksDB and Flink. FrocksDB PR will add > support for Java based associative merge operators using JNI, and Flink PR > will leverage that to support new state variables. Since the changes will > add new state variables, there is no user visible change required to > migrate existing code. However, the savepoint version will increase to > accommodate the serialization of the Reduce/Aggregate function, and this > change must be backward compatible. > > Best, > -Soumitra. > > On Sun, May 31, 2026 at 11:06 PM Yuan Mei <[email protected]> wrote: > >> Hey Soumitra, thanks for the proposal, the direction is a great >> exploration! >> >> I have the following questions: >> >> 1. >> >> *Motivation Use Case:* We need a more specific SQL/DataStream use >> case from a user's perspective to motivate this change. This would be a >> great amendment to the FLIP. >> 2. >> >> This is essentially a DB operation and optimization. Can it be >> implemented without changing Flink's Public Interfaces? >> 3. >> >> Most importantly, how will this land end-to-end? This involves >> rewriting SQL and existing DS (DataStream) operators. How will it be used >> from an end-to-end perspective? >> >> >> Best >> Yuan >> >> >> On Mon, Jun 1, 2026 at 12:44 PM Soumitra Kumar <[email protected]> >> wrote: >> >>> Hi Roman, >>> >>> Thanks for your thorough review, please find the comments below. >>> >>> On Sun, May 31, 2026 at 2:42 PM Roman Khachatryan <[email protected]> >>> wrote: >>> >>>> 1. Is it possible that reads will become more costly in some scenarios? >>>> >>>> Per my understanding, on compaction RocksDB would need to read and write >>>> back the same state (compared to the current implementation) >>>> So the benefit is deferring and "batching" those read/write operations. >>>> >>>> Current approach >>>> add: read + deserialize + aggregate + serialize + write (IO#1) >>>> get: read + deserialize (IO#2) >>>> >>>> Proposed approach >>>> add: serialize + write (IO#1) >>>> compact: read + deserialize + aggregate + serialize + write (IO#2) >>>> get: read + deserialize (IO#3) >>>> >>>> But if adds are mostly followed by gets then there's more IO because >>>> RocksDB needs to make an additional call via JNI before serving reads, >>>> right? >>>> If that's the case, we need to have two implementations and select one >>>> depending on the case? >>>> >>> >>> Your calculation of the cost of the proposed approach is correct. If >>> there is frequent read-after-write, then the merge operator does not add >>> value. Users should use the existing read-modify-write based state >>> variables instead. The proposal does not change the behavior of any of the >>> existing state variables. Users can decide when to use the new state >>> variables that support custom Reduce/Aggregate functions. >>> >>> >>>> 2. Speaking more generally, could you list the motivating use cases, >>>> e.g. >>>> existing Flink DataStream Operators, custom operators, SQL operators? >>>> If the impact depends on the operator, it would be helpful to understand >>>> how the existing operators would be affected. >>>> >>> >>> I got the idea of this enhancement based on a real use case at my work. >>> I have captured the scenarios in THIS >>> <https://github.com/soumitrak/flink_streaming_event_reordering> repo. >>> Essentially, the scenario is to reorder events based on event time, and >>> process the ordered events every minute. In this case, I am building a >>> sorted list using an associative operator that implements merging two >>> sorted lists. >>> >>> In general, the proposal should benefit write-heavy (less read) >>> scenarios where the computation can be a Reduce/Aggregate function. The >>> existing state variables do read-modify-write and become a bottleneck for >>> ingestion for write-heavy use cases. Check the performance using the >>> prototype implementation HERE >>> <https://github.com/soumitrak/flink_streaming_event_reordering#performance-comparison> >>> . >>> >>> >>>> 3. Could you clarify how the Reduce/Aggregate function calls will be >>>> dispatched? >>>> In theory, if an operator has two such states, their states will end up >>>> in >>>> the same RocksDB instance and we need to distinguish them, right? >>> >>> >>> RocksDB allows one merge operator per ColumnFamily, and AFAIU, Flink >>> creates one ColumnFamily per state variable. Currently, Flink sets the >>> "stringappendtest" >>> <https://github.com/apache/flink/blob/6c756a084d40f4716cdfdb4fdd71b0c919c448e6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java#L133> >>> merge operator for all ColumnFamilies. In this proposal, Flink can set a >>> user provided Java based Reduce/Aggregate function as the merge operator. >>> In your example, if the operator has two state variables, then there will >>> be two ColumnFamilies, and Flink can set a custom ReduceFunction for one >>> variable and a custom AggregateFunction for the other variable. We don't >>> need to change anything at this layer in Flink/RocksDB. >>> >>> >>>> 4. Function dispatch during restore >>>> IIRC, aggregate functions might not be available during restore >>>> operation - >>>> because state descriptors containing these functions are set later. >>>> Is this a concern? Do we need to disable compactions during restore? >>>> >>> >>> Great point! The Reduce/Aggregate functions in Flink are already >>> serializable and in the proposal they are serialized in the savepoint and >>> are restored when rocksdb is loaded. This allows rocksdb to call these >>> functions during compaction before state descriptors are called. This way >>> we don't need to disable compaction during restore. >>> >>> >>>> 5. Remote compactions >>>> In the light of Disaggregated State Backend, Flink might benefit from >>>> remote compactions. >>>> This may help with predictability (no compaction impact on processing) >>>> and >>>> achieving better cost/utilization. >>>> AFAIK ForSt doesn't support it ATM, but it would be great to keep this >>>> possibility open. >>>> And I think the current proposal will make remote compactions >>>> problematic. >>>> >>> >>> TBH, I don't understand ForSt in detail to comment on this item. Since >>> the proposal is exposing associative merge operators, it should not be an >>> issue to support in ForSt. In fact, if ForSt does not support associative >>> merge operators, then I will volunteer to add it, but let's get this >>> proposal first. >>> >>> >>>> 6. Alternative: ListState >>>> Have you considered using ListState to accumulate the records; and then >>>> reduce/aggregate on read (or incrementally on write)? >>>> Basically, doing the same thing as you proposed but purely in Java. >>>> >>>> I think it should address all the concerns I mentioned above; plus >>>> - removes the associative requirement (because we control the order) >>>> - removes RocksDB dependency >>>> - gives some flexibility when to aggregate (incrementally on write >>>> and/or >>>> on read) >>>> >>>> (I couldn't find this in the Rejected Alternatives, sorry if I missed >>>> that). >>>> >>> >>> ListState uses the "stringappendtest" merge operator, defined in the C++ >>> layer in FrocksDB, that concatenates strings using comma char. I can >>> implement the sorted list using this construct, but the read will be more >>> expensive than the proposal, since the sorting will happen during the read. >>> The proposal adds support for pure Java based associative merge operators >>> that will be invoked by RocksDB using JNI. There will be JNI overhead, but >>> it is not on the write thread. The write thread is as performant as the >>> ListState implementation. I have a prototype implementation that reduces >>> the JNI overhead. I think this can be a great addition to Flink and, just >>> like other advanced features, it requires understanding/tuning of rocksdb. >>> >>> Best, >>> -Soumitra. >>> >>> >>>> >>>> On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]> >>>> wrote: >>>> >>>> > Hi Soumitra, >>>> > >>>> > I'm somehow getting confused. I was expecting there is no public API >>>> > change in Flink side, and users could use the original state >>>> descriptors to >>>> > create ReducingState/AggregatingState. And in the implementation side, >>>> > taking RocksDBReducingState as example, a wrapper of user defined >>>> > ReducingFunction that extends the AbstractAssociativeMergeOperator is >>>> > provided to the frocksdb side. This may be a simple way of >>>> optimization, >>>> > offering no additional new use cases. I may have missed some >>>> information, >>>> > so please correct me if I'm wrong. >>>> > >>>> > >>>> > And BTW, CC to @roman <[email protected]> and @Yuan Mei >>>> > <[email protected]> as you may be interested in this topic. >>>> > >>>> > >>>> > Best, >>>> > Zakelly >>>> > >>>> > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar < >>>> [email protected]> >>>> > wrote: >>>> > >>>> >> Hi Zakelly, >>>> >> >>>> >> I have removed the ReducingMergeState.set method to minimize the >>>> changes >>>> >> to >>>> >> public APIs. The modified FLIP is at the same - >>>> >> >>>> >> >>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >>>> >> >>>> >> Community members, >>>> >> This proposal is a great addition to FrocksDB/Flink to enable >>>> applications >>>> >> to harness the power of RocksDB. This addition will help write-heavy >>>> >> workloads and let users build associative data structures in a >>>> streaming >>>> >> fashion. >>>> >> >>>> >> Please review, and I can answer any question. >>>> >> >>>> >> Best, >>>> >> -Soumitra. >>>> >> >>>> >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar < >>>> [email protected]> >>>> >> wrote: >>>> >> >>>> >> > Hi Zakelly, >>>> >> > >>>> >> > Thanks for your review and +1! >>>> >> > >>>> >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected] >>>> > >>>> >> wrote: >>>> >> > >>>> >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the >>>> reducing >>>> >> >> and >>>> >> >> aggregating state. It's important that we could leverage Rocksdb's >>>> >> merge >>>> >> >> operators to eliminate unnecessary `get()`. However I have a few >>>> >> >> questions: >>>> >> >> >>>> >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in >>>> >> >> frocksdb side, so how could user pass this instance to the RocksDB >>>> >> State >>>> >> >> backend and what is the relationship with the Flink's >>>> `ReduceFunction` >>>> >> or >>>> >> >> `AggregateFunction`. I would suggest we may 'translate' user's >>>> >> >> `ReduceFunction` into some frocksdb's merge operator, thus for >>>> flink >>>> >> we >>>> >> >> still maintain the original experience. WDYT? >>>> >> >> >>>> >> > >>>> >> > Yes, that is the whole idea. Since ReduceFunction and >>>> AggregateFunction >>>> >> > are existing primitives and they are associative and serializable, >>>> I am >>>> >> > building on them. >>>> >> > >>>> >> > This is how they are wired. There are two new classes >>>> >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator >>>> >> implementing >>>> >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is >>>> >> configured >>>> >> > with one of them. RocksDBReducingMergeOperator for >>>> ReducingMergeState, >>>> >> and >>>> >> > RocksDBAggregatingMergeOperator for AggregatingMergeState types. >>>> These >>>> >> > classes get the callback from frocksdb, handle the serde, and call >>>> the >>>> >> user >>>> >> > defined ReduceFunction and AggregateFunction. >>>> >> > >>>> >> > >>>> >> >> >>>> >> >> 2. I read you introduce a new `ReducingMergeState` with a new >>>> `set()` >>>> >> >> method compared with `ReducingState`, is this necessary? I mean >>>> if we >>>> >> >> intend to optimize performance solely through the use of a merge >>>> >> operator, >>>> >> >> this is not necessary, right? I do not recommend introducing too >>>> many >>>> >> >> public APIs, as this would force us to consider their >>>> >> >> semantics. Specifically for example, in changelog stream >>>> processing, if >>>> >> >> the >>>> >> >> `merge` operation were to permit state-setting operations, it >>>> would >>>> >> >> complicate potential future retraction (or reverse >>>> merge/aggregating) >>>> >> >> operations. WDYT? >>>> >> >> >>>> >> > >>>> >> > ReducingMergeState.set is just a shortcut for clear-add. I agree >>>> with >>>> >> your >>>> >> > point, I will remove it. >>>> >> > >>>> >> > Best, >>>> >> > -Soumitra. >>>> >> > >>>> >> > >>>> >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar < >>>> >> [email protected]> >>>> >> >> wrote: >>>> >> >> >>>> >> >> > Dear Community Members, >>>> >> >> > >>>> >> >> > I want to start discussion on the two tickets I filed recently: >>>> >> >> > Add support for Java based AssociativeMergeOperator via JNI >>>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39455> >>>> >> >> > Support ReducingMergeState and AggregatingMergeState backed by >>>> Java >>>> >> >> based >>>> >> >> > associative merge operators >>>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39456> >>>> >> >> > >>>> >> >> > Copying the motivation from the FLIP doc >>>> >> >> > < >>>> >> >> > >>>> >> >> >>>> >> >>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >>>> >> >> > > >>>> >> >> > : >>>> >> >> > >>>> >> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState >>>> state >>>> >> >> > variables that do a synchronous read-modify-write on every add >>>> call. >>>> >> >> While >>>> >> >> > this works great in many scenarios, for write-heavy workloads >>>> this >>>> >> can >>>> >> >> be >>>> >> >> > expensive and may become a bottleneck. >>>> >> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive >>>> >> designed >>>> >> >> > for commutative and associative operations — integer counters, >>>> set >>>> >> >> union, >>>> >> >> > list append, approximate sketches, top-K structures, Bloom >>>> filter, >>>> >> and >>>> >> >> > similar patterns. However, frocksdb (the RocksDB fork used in >>>> Flink) >>>> >> >> does >>>> >> >> > not support Java based associative merge operators. >>>> >> >> > >>>> >> >> > This FLIP has two parts: >>>> >> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb >>>> via >>>> >> JNI >>>> >> >> > 2. Support ReducingMergeState and AggregatingMergeState backed >>>> by >>>> >> Java >>>> >> >> > based associative merge operators >>>> >> >> > >>>> >> >> > The first part proposes exposing the associative merge operator >>>> as a >>>> >> >> Java >>>> >> >> > class in frocksdb with minimal JNI overhead. RocksDB can call >>>> these >>>> >> >> > operators during flushing and compaction. >>>> >> >> > The second part leverages the frocksdb support developed in the >>>> first >>>> >> >> part >>>> >> >> > to support ReducingMergeState and AggregatingMergeState state >>>> >> variables >>>> >> >> > with user defined ReduceFunction and AggregateFunction using >>>> rocksdb >>>> >> >> > backend. >>>> >> >> > >>>> >> >> > This enhancement opens up a powerful feature of rocksdb to Java. >>>> >> Flink >>>> >> >> > users can use it to build interesting associative data >>>> structures >>>> >> >> > on streaming data. I have added benchmark details from a >>>> prototype >>>> >> >> > implementation in the FLIP doc. >>>> >> >> > >>>> >> >> > Looking forward to feedback. >>>> >> >> > >>>> >> >> > FLIP in Google doc >>>> >> >> > < >>>> >> >> > >>>> >> >> >>>> >> >>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing >>>> >> >> > > >>>> >> >> > >>>> >> >> > Best, >>>> >> >> > -Soumitra. >>>> >> >> > >>>> >> >> >>>> >> > >>>> >> >>>> > >>>> >>>
