Hi all, Yun, David M, David A, and I had an offline discussion and talked through a couple of details that emerged from the discussion here. We believe, we have found a consensus on these points and would like to share our points for further feedback:
Let me try to get through the points that were opened in arbitrary order: 1. We want to offer a generic interface for sorted state, not just temporal state as proposed initially. We would like to... a) ...offer a single new state type similar to what TemporalListState was offering (so not offering something like TemporalValueState to keep the API slim). b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values (I'll go into the API further below) - the naming stresses on "Binary" because we want to make clear that this is what the sort will be based on (see below) c) ...have our own state descriptor (BinarySortedMultiMapStateDescriptor<UK, UV>) similar to MapStateDescriptor<UK, UV> d) ...require TypeSerializer implementations for the key to extend from LexicographicalTypeSerializer (details below) 2. LexicographicalTypeSerializer basically defines the sort order when retrieving values: it is based on the serialized binaries, comparing them one- by-one in an unsigned fashion. For heap-based state backends, these serializers can also optionally define a Comparator that doesn't require serialization but needs to retain the same sort order. We would provide implementations of the range-based operations that will iterate based on binary keys if this is not provided (by simply converting all relevant keys to their binary form and using an appropriate comparator). ``` public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T> { default Optional<Comparator<T>> findComparator() { return Optional<Comparator<T>>.empty() } } ``` 3. BinarySortedMultiMap<UK, UV> should offer the following API ``` public class BinarySortedMultiMap<UK, UV> extends State { void put(UK key, Collection<UV>) throws Exception; void add(UK key, UV value) throws Exception; Map.Entry<UK, UV> entryAt(UK key) throws Exception; Map.Entry<UK, UV> firstEntry() throws Exception; Map.Entry<UK, UV> lastEntry() throws Exception; Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws Exception; Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception; Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception; Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws Exception; Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws Exception; Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey) throws Exception; } ``` That's for the core of the points - following a few more things that came up and some arguments about the "why": A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]? -> We looked at various use cases and didn't find a strong need because you could always fall back to readRange*. In the interest of a slim API, we thought it would be best to start without these (we can always add them later) A2) Should we support iterating backwards? -> We haven't found a compelling use case that needs this. If you need it, at least for some (?) use cases, you could negate the sort order through the serializer and achieve the same thing (unless you need to walk in two directions). Let's rather start with a slim API. A3) Lazy vs. eager iteration -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly retrieving a couple of values (defaults to 128 here) into a cache. This seems to be the best of both worlds without bloating the API A4) ChangelogStateBackend -> Since we require TypeSerializer implementations for the key and those know the length to serializer (from other requirements, e.g. in the network stack), it isn't affected by our change except for delegating the new operations to the underlying state backend. A5) Defining the binary sort order as one-by-one with unsigned bytes is fine because it is a very common thing among storage systems. Should a different binary-based state backend require something else, there could be a mapping function translating between different definitions. A6) How to handle Duplicates -> We let the user handle this by storing a multi-map, i.e. multiple values for the (primary) sort key. If needed, users can sort these values manually. As long as we don't have a strong use case where this is not feasible, we don't need any implicit duplicate handling by the framework (Flink). A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator -> We propose to use readRange*** because that seems more explicit/intuitive in what this is doing. A8) readRange*** with inclusive/exclusive switch -> In the interest of a slim API, let's not provide that. The API above will interpret all keys as _inclusive_ and should a user need exclusive behaviour, they would in the worst case read one more entry - in most of the cases, however, this would be served from the cache anyway, so it's not much of a problem A9) Why don't we want to provide a BinarySortedMap with value-like semantics similar to TemporalValueState? -> We'd like to keep the code overhead in Flink small and not provide two more state primitives but instead only a single one. For use cases where you don't want to handle lists, you can use the BinarySortedMultiMap with its put() method and a list with a single entry that would overwrite the old one. While retrieving the value(s), you can then assume the list is either empty or has a single entry similar to what you are currently doing in a WindowProcessFunction. You can also always add a thin wrapper to provide that under a more convenient API if you need to. A10) effects on the CEPOperator? -> We don't have an overview yet. The buffering of events inside its `MapState<Long, List<IN>> elementQueueState`, however, is a pattern that would benefit from our MultiMap since a single add() operation wouldn't require you to read the whole list again. Sorry for the long email - we'd be happy to get more feedback and will incorporate this into the FLIP description soon. Nico -- Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner