Hi Roman, Thanks for the answers! Please see my comments below:
The switch is performed under each stream key individually, when > its specific list size > reaches a threshold. So I assume there is a conditional branch to determine whether current stream key is in VALUE or MAP mode, and this also involves some state access right? Yes, those pointers are necessary, I couldn't find a way to get rid of them: > - prevSeqNo is used to emit penultimate element when retracing the last > one; > - nextSeqNo is used to keep prevSeqNo correct when retracting an item from > the middle Oh I see. Have you evaluated the implementation relying on the orderliness of RocksDB's kv? RocksDB's `MapState.iterator().next()` retrieves the first entry of the map in binary order. We could reverse the order of seq by generating it from Long.MAX to 0, and thus the first entry retrieved would be the last one added. I see you mention this in 'Rejected Alternatives' but I'm still curious whether this could achieve an improvement. To my knowledge, the iterator might be less efficient since it could not leverage bloom filters as point-lookup does and the cache is unnecessary if only the first entry is needed (of course we could remove that cache). It's not immediately clear which approach is better, as each has its trade-offs. I’d also suggest testing scenarios with retraction rates below 100%, as that may better reflect real-world workloads IIUC. I think it's a good chance to push forward the discussion/design of binary sorted map state (FLIP-220)[1] since this seems to be a good application scenario. But I also think it's acceptable if we do some hack to only rely on RocksDB's state implicitly rather that waiting the new state primitives, if it is truly beneficial. [1] https://cwiki.apache.org/confluence/x/Xo_FD I agree, I think that event-time based TTL is more useful in general > (I specified processing time as a default to make it less surprising for > the users). I don't immediately see the potential usages of a manually controllable > TtlTimeProvider - do you have any use cases in mind? By saying event-time based TTL, I meant to make it easier for users to understand. The event-time could be defined and controlled/advanced by user (SQL implementor or Flink user). e.g. Event-time could be watermark progression or just record sequence (state will live through specific number of records), or even be advanced by special control records for Datastream users. This kind of user-controlled time advancement is what I said "manually controllable". Such flexibility could be broadly beneficial. We’ve encountered cases where state expiration is inconsistent between upstream and downstream SQL operators. With event-time based TTL, operators could share a synchronized notion of time, allowing them to expire state in a more coordinated way. Looking forward to your reply! P.S. I'm on vacation for the next few days, so I'll follow up later :) . Best, Zakelly On Fri, Aug 29, 2025 at 2:52 AM Roman Khachatryan <ro...@apache.org> wrote: > Hi Zakelly, > > Thanks for the feedback! > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > between > > VALUE and MAP performed under each stream key considering each list size, > > or is it performed for all keys if the average list size reaches the > given > > thresholds? > > The switch is performed under each stream key individually, when > its specific list size > reaches a threshold. > > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to > link > > all the nodes? I assume there should be a traversal need but I don't see > > that in pseudo-code. > > Yes, those pointers are necessary, I couldn't find a way to get rid of > them: > - prevSeqNo is used to emit penultimate element when retracing the last > one; > - nextSeqNo is used to keep prevSeqNo correct when retracting an item from > the middle > > > And is `MapState.iterator` also feasible? > Yes, in fact, the ADAPTIVE strategy uses an iterator to move the entries > between MAP and VALUE. > > > 3. I see there are two `RowData` stored for one record, one is in > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for > > upsert-keys. Would it be optimized to single copy for a non-upsert-key > > scenario? > > That's an interesting idea! I'll try to dig into it deeper when > open-sourcing or as a follow-up. > > > 4. For the TTL mechanism part, I would suggest an 'event-time based ttl', > > which allows the user to specify insertion time for each insert/update > > operation and a manually controllable `TtlTimeProvider` (instead of just > > system time). This would be beneficial for many cases, WDYT? > > I agree, I think that event-time based TTL is more useful in general > (I specified processing time as a default to make it less surprising for > the users). > > I don't immediately see the potential usages of a manually controllable > TtlTimeProvider - do you have any use cases in mind? > > > 5. Does the current RocksDB benchmark involve significant state size and > > I/O pressure? > > No, in the micro-benchmark the state wasn't too big (in order of > megabytes); > It was bottlenecked by RocksDB put/get operations, however. > I also performed a benchmark on a cluster with a larger state size > (in order of gigabytes) and got similar results. > > > Regards, > Roman > > > On Thu, Aug 28, 2025 at 11:38 AM Zakelly Lan <zakelly....@gmail.com> > wrote: > > > Hi Roman, > > > > Thanks for the proposal! The SinkUpsertMaterializer sometimes becomes a > > bottleneck in our production, so I'd +1 to optimize it. I have several > > questions regarding your design: > > > > 1. Could you elaborate more about the ADAPTIVE mode? Is the switch > between > > VALUE and MAP performed under each stream key considering each list size, > > or is it performed for all keys if the average list size reaches the > given > > thresholds? > > 2. Is it necessary to maintain pointers 'prevSeqNo' and 'nextSeqNo' to > link > > all the nodes? I assume there should be a traversal need but I don't see > > that in pseudo-code. And is `MapState.iterator` also feasible? > > 3. I see there are two `RowData` stored for one record, one is in > > `rowToSqn` and another is in `sqnToNode`'s node. I guess the first is for > > upsert-keys. Would it be optimized to single copy for a non-upsert-key > > scenario? > > 4. For the TTL mechanism part, I would suggest an 'event-time based ttl', > > which allows the user to specify insertion time for each insert/update > > operation and a manually controllable `TtlTimeProvider` (instead of just > > system time). This would be beneficial for many cases, WDYT? > > 5. Does the current RocksDB benchmark involve significant state size and > > I/O pressure? > > > > > > Best, > > Zakelly > > > > On Thu, Aug 28, 2025 at 7:11 AM Roman Khachatryan <ro...@apache.org> > > wrote: > > > > > Hi everyone, > > > > > > I would like to start a discussion about FLIP-544 > SinkUpsertMaterializer > > V2 > > > [1]. > > > > > > SinkUpsertMaterializer is an operator in Flink that reconciles out of > > order > > > changelog events before sending them to an upsert sink. In some cases > > (that > > > we see in our production), performance of this operator degrades > > > exponentially, depending on the input data. > > > This FLIP proposes a new implementation that is optimized for such > cases > > > and serves as a synchronization point for other efforts in that area. > > > > > > Looking forward to feedback. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-544%3A+SinkUpsertMaterializer+V2 > > > > > > > > > Regards, > > > Roman > > > > > >