Hello Yuan,

I have added a "Motivation Use Case" section in FLIP - HERE
<https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?tab=t.0#heading=h.jhkgiqdhf9x1>

Please review.

Best,
-Soumitra.

On Mon, Jun 1, 2026 at 10:25 PM Soumitra Kumar <[email protected]>
wrote:

> Hello Yuan,
>
> Thanks for your review!
>
> #1 - I will add the use case to the FLIP as you suggested. I have
> DataStream use cases at this moment.
>
> #2 - Existing ReducingState and proposed ReducingMergeState are compatible
> from the behavior perspective. But, ReducingState is implemented as
> read-modify-write, but ReducingMergeState will be implemented using
> RocksDB.merge operation using the associative merge operator. The runtime
> characteristics of both implementations are different. I would leave this
> to users to decide when they want to use which one. I started using the
> read-modify-write based state variables, and hit scaling challenges due to
> high velocity writes, then came to the proposed approach. We need to tune
> rocksdb's flush/compaction behavior to get the best out of
> ReducingMergeState. This should be a conscious choice of users to pick the
> right state variable, I would not recommend introducing this change
> transparently.
>
> #3 - There will be two PRs, FrocksDB and Flink. FrocksDB PR will add
> support for Java based associative merge operators using JNI, and Flink PR
> will leverage that to support new state variables. Since the changes will
> add new state variables, there is no user visible change required to
> migrate existing code. However, the savepoint version will increase to
> accommodate the serialization of the Reduce/Aggregate function, and this
> change must be backward compatible.
>
> Best,
> -Soumitra.
>
> On Sun, May 31, 2026 at 11:06 PM Yuan Mei <[email protected]> wrote:
>
>> Hey Soumitra, thanks for the proposal, the direction is a great
>> exploration!
>>
>> I have the following questions:
>>
>>    1.
>>
>>    *Motivation Use Case:* We need a more specific SQL/DataStream use
>>    case from a user's perspective to motivate this change. This would be a
>>    great amendment to the FLIP.
>>    2.
>>
>>    This is essentially a DB operation and optimization. Can it be
>>    implemented without changing Flink's Public Interfaces?
>>    3.
>>
>>    Most importantly, how will this land end-to-end? This involves
>>    rewriting SQL and existing DS (DataStream) operators. How will it be used
>>    from an end-to-end perspective?
>>
>>
>> Best
>> Yuan
>>
>>
>> On Mon, Jun 1, 2026 at 12:44 PM Soumitra Kumar <[email protected]>
>> wrote:
>>
>>> Hi Roman,
>>>
>>> Thanks for your thorough review, please find the comments below.
>>>
>>> On Sun, May 31, 2026 at 2:42 PM Roman Khachatryan <[email protected]>
>>> wrote:
>>>
>>>> 1. Is it possible that reads will become more costly in some scenarios?
>>>>
>>>> Per my understanding, on compaction RocksDB would need to read and write
>>>> back the same state (compared to the current implementation)
>>>> So the benefit is deferring and "batching" those read/write operations.
>>>>
>>>> Current approach
>>>> add: read + deserialize + aggregate + serialize + write (IO#1)
>>>> get: read + deserialize (IO#2)
>>>>
>>>> Proposed approach
>>>> add: serialize + write (IO#1)
>>>> compact: read + deserialize + aggregate + serialize + write (IO#2)
>>>> get: read + deserialize (IO#3)
>>>>
>>>> But if adds are mostly followed by gets then there's more IO because
>>>> RocksDB needs to make an additional call via JNI before serving reads,
>>>> right?
>>>> If that's the case, we need to have two implementations and select one
>>>> depending on the case?
>>>>
>>>
>>> Your calculation of the cost of the proposed approach is correct. If
>>> there is frequent read-after-write, then the merge operator does not add
>>> value. Users should use the existing read-modify-write based state
>>> variables instead. The proposal does not change the behavior of any of the
>>> existing state variables. Users can decide when to use the new state
>>> variables that support custom Reduce/Aggregate functions.
>>>
>>>
>>>> 2. Speaking more generally, could you list the motivating use cases,
>>>> e.g.
>>>> existing Flink DataStream Operators, custom operators, SQL operators?
>>>> If the impact depends on the operator, it would be helpful to understand
>>>> how the existing operators would be affected.
>>>>
>>>
>>> I got the idea of this enhancement based on a real use case at my work.
>>> I have captured the scenarios in THIS
>>> <https://github.com/soumitrak/flink_streaming_event_reordering> repo.
>>> Essentially, the scenario is to reorder events based on event time, and
>>> process the ordered events every minute. In this case, I am building a
>>> sorted list using an associative operator that implements merging two
>>> sorted lists.
>>>
>>> In general, the proposal should benefit write-heavy (less read)
>>> scenarios where the computation can be a Reduce/Aggregate function. The
>>> existing state variables do read-modify-write and become a bottleneck for
>>> ingestion for write-heavy use cases. Check the performance using the
>>> prototype implementation HERE
>>> <https://github.com/soumitrak/flink_streaming_event_reordering#performance-comparison>
>>> .
>>>
>>>
>>>> 3. Could you clarify how the Reduce/Aggregate function calls will be
>>>> dispatched?
>>>> In theory, if an operator has two such states, their states will end up
>>>> in
>>>> the same RocksDB instance and we need to distinguish them, right?
>>>
>>>
>>> RocksDB allows one merge operator per ColumnFamily, and AFAIU, Flink
>>> creates one ColumnFamily per state variable. Currently, Flink sets the
>>> "stringappendtest"
>>> <https://github.com/apache/flink/blob/6c756a084d40f4716cdfdb4fdd71b0c919c448e6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java#L133>
>>> merge operator for all ColumnFamilies. In this proposal, Flink can set a
>>> user provided Java based Reduce/Aggregate function as the merge operator.
>>> In your example, if the operator has two state variables, then there will
>>> be two ColumnFamilies, and Flink can set a custom ReduceFunction for one
>>> variable and a custom AggregateFunction for the other variable. We don't
>>> need to change anything at this layer in Flink/RocksDB.
>>>
>>>
>>>> 4. Function dispatch during restore
>>>> IIRC, aggregate functions might not be available during restore
>>>> operation -
>>>> because state descriptors containing these functions are set later.
>>>> Is this a concern? Do we need to disable compactions during restore?
>>>>
>>>
>>> Great point! The Reduce/Aggregate functions in Flink are already
>>> serializable and in the proposal they are serialized in the savepoint and
>>> are restored when rocksdb is loaded. This allows rocksdb to call these
>>> functions during compaction before state descriptors are called. This way
>>> we don't need to disable compaction during restore.
>>>
>>>
>>>> 5. Remote compactions
>>>> In the light of Disaggregated State Backend, Flink might benefit from
>>>> remote compactions.
>>>> This may help with predictability (no compaction impact on processing)
>>>> and
>>>> achieving better cost/utilization.
>>>> AFAIK ForSt doesn't support it ATM, but it would be great to keep this
>>>> possibility open.
>>>> And I think the current proposal will make remote compactions
>>>> problematic.
>>>>
>>>
>>> TBH, I don't understand ForSt in detail to comment on this item. Since
>>> the proposal is exposing associative merge operators, it should not be an
>>> issue to support in ForSt. In fact, if ForSt does not support associative
>>> merge operators, then I will volunteer to add it, but let's get this
>>> proposal first.
>>>
>>>
>>>> 6. Alternative: ListState
>>>> Have you considered using ListState to accumulate the records; and then
>>>> reduce/aggregate on read (or incrementally on write)?
>>>> Basically, doing the same thing as you proposed but purely in Java.
>>>>
>>>> I think it should address all the concerns I mentioned above; plus
>>>> - removes the associative requirement (because we control the order)
>>>> - removes RocksDB dependency
>>>> - gives some flexibility when to aggregate (incrementally on write
>>>> and/or
>>>> on read)
>>>>
>>>> (I couldn't find this in the Rejected Alternatives, sorry if I missed
>>>> that).
>>>>
>>>
>>> ListState uses the "stringappendtest" merge operator, defined in the C++
>>> layer in FrocksDB, that concatenates strings using comma char. I can
>>> implement the sorted list using this construct, but the read will be more
>>> expensive than the proposal, since the sorting will happen during the read.
>>> The proposal adds support for pure Java based associative merge operators
>>> that will be invoked by RocksDB using JNI. There will be JNI overhead, but
>>> it is not on the write thread. The write thread is as performant as the
>>> ListState implementation. I have a prototype implementation that reduces
>>> the JNI overhead. I think this can be a great addition to Flink and, just
>>> like other advanced features, it requires understanding/tuning of rocksdb.
>>>
>>> Best,
>>> -Soumitra.
>>>
>>>
>>>>
>>>> On Tue, May 26, 2026 at 3:50 PM Zakelly Lan <[email protected]>
>>>> wrote:
>>>>
>>>> > Hi Soumitra,
>>>> >
>>>> > I'm somehow getting confused. I was expecting there is no public API
>>>> > change in Flink side, and users could use the original state
>>>> descriptors to
>>>> > create ReducingState/AggregatingState. And in the implementation side,
>>>> > taking RocksDBReducingState as example, a wrapper of user defined
>>>> > ReducingFunction that extends the AbstractAssociativeMergeOperator is
>>>> > provided to the frocksdb side. This may be a simple way of
>>>> optimization,
>>>> > offering no additional new use cases. I may have missed some
>>>> information,
>>>> > so please correct me if I'm wrong.
>>>> >
>>>> >
>>>> > And BTW, CC to @roman <[email protected]> and @Yuan Mei
>>>> > <[email protected]> as you may be interested in this topic.
>>>> >
>>>> >
>>>> > Best,
>>>> > Zakelly
>>>> >
>>>> > On Mon, May 25, 2026 at 12:05 PM Soumitra Kumar <
>>>> [email protected]>
>>>> > wrote:
>>>> >
>>>> >> Hi Zakelly,
>>>> >>
>>>> >> I have removed the ReducingMergeState.set method to minimize the
>>>> changes
>>>> >> to
>>>> >> public APIs. The modified FLIP is at the same -
>>>> >>
>>>> >>
>>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>>>> >>
>>>> >> Community members,
>>>> >> This proposal is a great addition to FrocksDB/Flink to enable
>>>> applications
>>>> >> to harness the power of RocksDB. This addition will help write-heavy
>>>> >> workloads and let users build associative data structures in a
>>>> streaming
>>>> >> fashion.
>>>> >>
>>>> >> Please review, and I can answer any question.
>>>> >>
>>>> >> Best,
>>>> >> -Soumitra.
>>>> >>
>>>> >> On Tue, May 19, 2026 at 8:56 PM Soumitra Kumar <
>>>> [email protected]>
>>>> >> wrote:
>>>> >>
>>>> >> > Hi Zakelly,
>>>> >> >
>>>> >> > Thanks for your review and +1!
>>>> >> >
>>>> >> > On Tue, May 19, 2026 at 8:19 AM Zakelly Lan <[email protected]
>>>> >
>>>> >> wrote:
>>>> >> >
>>>> >> >> Thanks for the FLIP! I'd +1 for the direction of enhancing the
>>>> reducing
>>>> >> >> and
>>>> >> >> aggregating state. It's important that we could leverage Rocksdb's
>>>> >> merge
>>>> >> >> operators to eliminate unnecessary `get()`. However I have a few
>>>> >> >> questions:
>>>> >> >>
>>>> >> >> 1. I see you will introduce `AbstractAssociativeMergeOperator` in
>>>> >> >> frocksdb side, so how could user pass this instance to the RocksDB
>>>> >> State
>>>> >> >> backend and what is the relationship with the Flink's
>>>> `ReduceFunction`
>>>> >> or
>>>> >> >> `AggregateFunction`. I would suggest we may 'translate' user's
>>>> >> >> `ReduceFunction`  into some frocksdb's merge operator, thus for
>>>> flink
>>>> >> we
>>>> >> >> still maintain the original experience. WDYT?
>>>> >> >>
>>>> >> >
>>>> >> > Yes, that is the whole idea. Since ReduceFunction and
>>>> AggregateFunction
>>>> >> > are existing primitives and they are associative and serializable,
>>>> I am
>>>> >> > building on them.
>>>> >> >
>>>> >> > This is how they are wired. There are two new classes
>>>> >> > RocksDBReducingMergeOperator, RocksDBAggregatingMergeOperator
>>>> >> implementing
>>>> >> > AbstractAssociativeMergeOperator in Flink. The ColumnFamily is
>>>> >> configured
>>>> >> > with one of them. RocksDBReducingMergeOperator for
>>>> ReducingMergeState,
>>>> >> and
>>>> >> > RocksDBAggregatingMergeOperator for AggregatingMergeState types.
>>>> These
>>>> >> > classes get the callback from frocksdb, handle the serde, and call
>>>> the
>>>> >> user
>>>> >> > defined ReduceFunction and AggregateFunction.
>>>> >> >
>>>> >> >
>>>> >> >>
>>>> >> >> 2. I read you introduce a new `ReducingMergeState` with a new
>>>> `set()`
>>>> >> >> method compared with `ReducingState`, is this necessary? I mean
>>>> if we
>>>> >> >> intend to optimize performance solely through the use of a merge
>>>> >> operator,
>>>> >> >> this is not necessary, right? I do not recommend introducing too
>>>> many
>>>> >> >> public APIs, as this would force us to consider their
>>>> >> >> semantics. Specifically for example, in changelog stream
>>>> processing, if
>>>> >> >> the
>>>> >> >> `merge` operation were to permit state-setting operations, it
>>>> would
>>>> >> >> complicate potential future retraction (or reverse
>>>> merge/aggregating)
>>>> >> >> operations. WDYT?
>>>> >> >>
>>>> >> >
>>>> >> > ReducingMergeState.set is just a shortcut for clear-add. I agree
>>>> with
>>>> >> your
>>>> >> > point, I will remove it.
>>>> >> >
>>>> >> > Best,
>>>> >> > -Soumitra.
>>>> >> >
>>>> >> >
>>>> >> >> On Tue, May 5, 2026 at 11:50 AM Soumitra Kumar <
>>>> >> [email protected]>
>>>> >> >> wrote:
>>>> >> >>
>>>> >> >> > Dear Community Members,
>>>> >> >> >
>>>> >> >> > I want to start discussion on the two tickets I filed recently:
>>>> >> >> > Add support for Java based AssociativeMergeOperator via JNI
>>>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39455>
>>>> >> >> > Support ReducingMergeState and AggregatingMergeState backed by
>>>> Java
>>>> >> >> based
>>>> >> >> > associative merge operators
>>>> >> >> > <https://issues.apache.org/jira/browse/FLINK-39456>
>>>> >> >> >
>>>> >> >> > Copying the motivation from the FLIP doc
>>>> >> >> > <
>>>> >> >> >
>>>> >> >>
>>>> >>
>>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>>>> >> >> > >
>>>> >> >> > :
>>>> >> >> >
>>>> >> >> > Flink supports RocksDBReducingState and RocksDBAggregatingState
>>>> state
>>>> >> >> > variables that do a synchronous read-modify-write on every add
>>>> call.
>>>> >> >> While
>>>> >> >> > this works great in many scenarios, for write-heavy workloads
>>>> this
>>>> >> can
>>>> >> >> be
>>>> >> >> > expensive and may become a bottleneck.
>>>> >> >> > RocksDB's AssociativeMergeOperator is a storage-level primitive
>>>> >> designed
>>>> >> >> > for commutative and associative operations — integer counters,
>>>> set
>>>> >> >> union,
>>>> >> >> > list append, approximate sketches, top-K structures, Bloom
>>>> filter,
>>>> >> and
>>>> >> >> > similar patterns. However, frocksdb (the RocksDB fork used in
>>>> Flink)
>>>> >> >> does
>>>> >> >> > not support Java based associative merge operators.
>>>> >> >> >
>>>> >> >> > This FLIP has two parts:
>>>> >> >> > 1. Support for Java based AssociativeMergeOperator in frocksdb
>>>> via
>>>> >> JNI
>>>> >> >> > 2. Support ReducingMergeState and AggregatingMergeState backed
>>>> by
>>>> >> Java
>>>> >> >> > based associative merge operators
>>>> >> >> >
>>>> >> >> > The first part proposes exposing the associative merge operator
>>>> as a
>>>> >> >> Java
>>>> >> >> > class in frocksdb with minimal JNI overhead. RocksDB can call
>>>> these
>>>> >> >> > operators during flushing and compaction.
>>>> >> >> > The second part leverages the frocksdb support developed in the
>>>> first
>>>> >> >> part
>>>> >> >> > to support ReducingMergeState and AggregatingMergeState state
>>>> >> variables
>>>> >> >> > with user defined ReduceFunction and AggregateFunction using
>>>> rocksdb
>>>> >> >> > backend.
>>>> >> >> >
>>>> >> >> > This enhancement opens up a powerful feature of rocksdb to Java.
>>>> >> Flink
>>>> >> >> > users can use it to build interesting associative data
>>>> structures
>>>> >> >> > on streaming data. I have added benchmark details from a
>>>> prototype
>>>> >> >> > implementation in the FLIP doc.
>>>> >> >> >
>>>> >> >> > Looking forward to feedback.
>>>> >> >> >
>>>> >> >> > FLIP in Google doc
>>>> >> >> > <
>>>> >> >> >
>>>> >> >>
>>>> >>
>>>> https://docs.google.com/document/d/1HwEDRGoSZIUU1SYxTih4qp8FM6LjTdIrDs7CJHm4iB0/edit?usp=sharing
>>>> >> >> > >
>>>> >> >> >
>>>> >> >> > Best,
>>>> >> >> > -Soumitra.
>>>> >> >> >
>>>> >> >>
>>>> >> >
>>>> >>
>>>> >
>>>>
>>>

Reply via email to