Hi Zakelly, > So I assume there is a conditional branch to determine whether current > stream key is in VALUE or MAP mode, and this also involves some state > access right?
Yes, that's a good question; it might add one more state access. To mitigate that to some extent, the results of the 1st access to ValueState can be cached and be used later for add/retract (in case it's not empty). (such caching is already included in the benchmark results) > Have you evaluated the implementation relying on the orderliness > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the first > entry of the map in binary order. We could reverse the order of seq by > generating it from Long.MAX to 0, and thus the first entry retrieved would > be the last one added. I see you mention this in 'Rejected Alternatives' > but I'm still curious whether this could achieve an improvement. To > my knowledge, the iterator might be less efficient since it could not > leverage bloom filters as point-lookup does and the cache is unnecessary if > only the first entry is needed (of course we could remove that cache). It's > not immediately clear which approach is better, as each has its trade-offs. We tried a similar approach of using an iterator. The benefits are diminished there by slow iteration (plus, iterator creation is also slow). Because of that, the performance was similar to the current implementation. We didn't compare the two approaches side by side though. > I’d also suggest testing scenarios with retraction rates below 100%, as > that may better reflect real-world workloads IIUC. I mostly focused on 100% because that's where I saw regression: the fewer retractions, the longer the list, the worse performance of ValueState. I'll re-run the benchmark with lower rates. > I think it's a good chance to push forward the discussion/design of binary > sorted map state (FLIP-220)[1] since this seems to be a good application > scenario. But I also think it's acceptable if we do some hack to only rely > on RocksDB's state implicitly rather that waiting the new > state primitives, if it is truly beneficial. I agree, probably not for this FLIP (because of the above reasons), but for many other cases it would be definitely beneficial to expose the sortedness of RocksDB in some way. Multi-way join (FLIP-516) [1] is one such example. > By saying event-time based TTL, I meant to make it easier for users to > understand. The event-time could be defined and controlled/advanced by user > (SQL implementor or Flink user). e.g. Event-time could be watermark > progression or just record sequence (state will live through specific > number of records), or even be advanced by special control records for > Datastream users. This kind of user-controlled time advancement is what I > said "manually controllable". Such flexibility could be broadly beneficial. Thanks for clarifying, special control records is an interesting idea, and I think it should be easy to implement. > We’ve encountered cases where state expiration is inconsistent between > upstream and downstream SQL operators. With event-time based TTL, operators > could share a synchronized notion of time, allowing them to expire state in > a more coordinated way. Yeah, that's a pity that Flink doesn't have event-time TTL. But to solve cross-operator TTL inconsistency, we'd need to change multiple operators (or state backend). I'm not sure we can efficiently support event time TTL for a general case. P.S.: have a good vacation! :) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A+Multi-Way+Join+Operator Regards, Roman On Fri, Aug 29, 2025 at 8:24 AM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Roman, > > Thanks for the answers! Please see my comments below: > > The switch is performed under each stream key individually, when > > its specific list size > > reaches a threshold. > > > So I assume there is a conditional branch to determine whether current > stream key is in VALUE or MAP mode, and this also involves some state > access right? > > Yes, those pointers are necessary, I couldn't find a way to get rid of > them: > > - prevSeqNo is used to emit penultimate element when retracing the last > > one; > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item > from > > the middle > > > Oh I see. Have you evaluated the implementation relying on the orderliness > of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the first > entry of the map in binary order. We could reverse the order of seq by > generating it from Long.MAX to 0, and thus the first entry retrieved would > be the last one added. I see you mention this in 'Rejected Alternatives' > but I'm still curious whether this could achieve an improvement. To > my knowledge, the iterator might be less efficient since it could not > leverage bloom filters as point-lookup does and the cache is unnecessary if > only the first entry is needed (of course we could remove that cache). It's > not immediately clear which approach is better, as each has its trade-offs. > I’d also suggest testing scenarios with retraction rates below 100%, as > that may better reflect real-world workloads IIUC. > > I think it's a good chance to push forward the discussion/design of binary > sorted map state (FLIP-220)[1] since this seems to be a good application > scenario. But I also think it's acceptable if we do some hack to only rely > on RocksDB's state implicitly rather that waiting the new > state primitives, if it is truly beneficial. > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > > I agree, I think that event-time based TTL is more useful in general > > (I specified processing time as a default to make it less surprising for > > the users). > > I don't immediately see the potential usages of a manually controllable > > TtlTimeProvider - do you have any use cases in mind? > > > By saying event-time based TTL, I meant to make it easier for users to > understand. The event-time could be defined and controlled/advanced by user > (SQL implementor or Flink user). e.g. Event-time could be watermark > progression or just record sequence (state will live through specific > number of records), or even be advanced by special control records for > Datastream users. This kind of user-controlled time advancement is what I > said "manually controllable". Such flexibility could be broadly beneficial. > > We’ve encountered cases where state expiration is inconsistent between > upstream and downstream SQL operators. With event-time based TTL, operators > could share a synchronized notion of time, allowing them to expire state in > a more coordinated way. > > > Looking forward to your reply! P.S. I'm on vacation for the next few days, > so I'll follow up later :) . > > Best, > Zakelly > > On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <ro...@apache.org> > wrote: > > > Hi Zakelly, > > > > Thanks for the feedback! > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > > between > > > VALUE and MAP performed under each stream key considering each list > size, > > > or is it performed for all keys if the average list size reaches the > > given > > > thresholds? > > > > The switch is performed under each stream key individually, when > > its specific list size > > reaches a threshold. > > > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to > > link > > > all the nodes? I assume there should be a traversal need but I don't > see > > > that in pseudo-code. > > > > Yes, those pointers are necessary, I couldn't find a way to get rid of > > them: > > - prevSeqNo is used to emit penultimate element when retracing the last > > one; > > - nextSeqNo is used to keep prevSeqNo correct when retracting an item > from > > the middle > > > > > And is `MapState.iterator` also feasible? > > Yes, in fact, the ADAPTIVE strategy uses an iterator to move the entries > > between MAP and VALUE. > > > > > 3. I see there are two `RowData` stored for one record, one is in > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is > for > > > upsert-keys. Would it be optimized to single copy for a non-upsert-key > > > scenario? > > > > That's an interesting idea! I'll try to dig into it deeper when > > open-sourcing or as a follow-up. > > > > > 4. For the TTL mechanism part, I would suggest an 'event-time based > ttl', > > > which allows the user to specify insertion time for each insert/update > > > operation and a manually controllable `TtlTimeProvider` (instead of > just > > > system time). This would be beneficial for many cases, WDYT? > > > > I agree, I think that event-time based TTL is more useful in general > > (I specified processing time as a default to make it less surprising for > > the users). > > > > I don't immediately see the potential usages of a manually controllable > > TtlTimeProvider - do you have any use cases in mind? > > > > > 5. Does the current RocksDB benchmark involve significant state size > and > > > I/O pressure? > > > > No, in the micro-benchmark the state wasn't too big (in order of > > megabytes); > > It was bottlenecked by RocksDB put/get operations, however. > > I also performed a benchmark on a cluster with a larger state size > > (in order of gigabytes) and got similar results. > > > > > > Regards, > > Roman > > > > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <zakelly....@gmail.com> > > wrote: > > > > > Hi Roman, > > > > > > Thanks for the proposal! The SinkUpsertMaterializer sometimes becomes a > > > bottleneck in our production, so I'd +1 to optimize it. I have several > > > questions regarding your design: > > > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > > between > > > VALUE and MAP performed under each stream key considering each list > size, > > > or is it performed for all keys if the average list size reaches the > > given > > > thresholds? > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to > > link > > > all the nodes? I assume there should be a traversal need but I don't > see > > > that in pseudo-code. And is `MapState.iterator` also feasible? > > > 3. I see there are two `RowData` stored for one record, one is in > > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is > for > > > upsert-keys. Would it be optimized to single copy for a non-upsert-key > > > scenario? > > > 4. For the TTL mechanism part, I would suggest an 'event-time based > ttl', > > > which allows the user to specify insertion time for each insert/update > > > operation and a manually controllable `TtlTimeProvider` (instead of > just > > > system time). This would be beneficial for many cases, WDYT? > > > 5. Does the current RocksDB benchmark involve significant state size > and > > > I/O pressure? > > > > > > > > > Best, > > > Zakelly > > > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <ro...@apache.org> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I would like to start a discussion about FLIP-544 > > SinkUpsertMaterializer > > > V2 > > > > [1]. > > > > > > > > SinkUpsertMaterializer is an operator in Flink that reconciles out of > > > order > > > > changelog events before sending them to an upsert sink. In some cases > > > (that > > > > we see in our production), performance of this operator degrades > > > > exponentially, depending on the input data. > > > > This FLIP proposes a new implementation that is optimized for such > > cases > > > > and serves as a synchronization point for other efforts in that area. > > > > > > > > Looking forward to feedback. > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > > > > > > > Regards, > > > > Roman > > > > > > > > > >