+1 to generic interface for sorted state and Binary***State. Very happy to be able to go one step further and thank you for your discussion.
Best, Jingsong On Fri, May 6, 2022 at 8:37 PM Nico Kruber <n...@apache.org> wrote: > > While working a bit more on this, David A and I noticed a couple of things > that were not matching each other in the proposed APIs: > > a) the proposed BinarySortedMultiMapState class didn't actually have getters > that return multiple items per key, and > b) while having a single multi-map like implementation in the backend, with > the adapted API, we'd like to put it up for discussion again whether we maybe > want to have a user-facing BinarySortedMapState API as well which can be > simpler but doesn't add any additional constraints to the state backends. > > Let me go into details a bit more: > in a multi-map, a single key can be backed by a set of items and as such, the > atomic unit that should be retrievable is not a single item but rather > something like a Collection, an Iterable , a List, or so. Since we are already > using Iterable in the main API, how about the following? > ``` > public class BinarySortedMultiMapState<UK, UV> extends State { > void put(UK key, Iterable<UV> values) throws Exception; > void add(UK key, UV value) throws Exception; > > Iterable<UV> valueAt(UK key) throws Exception; > > Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception; > Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception; > > Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey) throws > Exception; > Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK endUserKey) throws > Exception; > Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK startUserKey) throws > Exception; > > void clearRange(UK fromKey, UK toKey) throws Exception; > void clearRangeUntil(UK endUserKey) throws Exception; > void clearRangeFrom(UK startUserKey) throws Exception; > } > ``` > > We also considered using Iterable<Map.Entry<UK, UV>> instead of Map.Entry<UK, > Iterable<UV>>, but that wouldn't match well with firstEntry() and lastEntry() > because for a single key, there is not a single first/last value. We also > looked at common MultiMap insterfaces and their getters were also always > retrieving the whole list/collection for a key. Since we don't want to promise > too many details to the user, we believe, Iterable is our best choice for now > but that can also be "upgraded" to, e.g., List in the future without breaking > client code. > > An appropriate map-like version of that would be the following: > ``` > public class BinarySortedMapState<UK, UV> extends State { > void put(UK key, UV value) throws Exception; > > UV valueAt(UK key) throws Exception; > > Map.Entry<UK, UV> firstEntry() throws Exception; > Map.Entry<UK, UV> lastEntry() throws Exception; > > Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws > Exception; > Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception; > Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception; > > void clearRange(UK fromKey, UK toKey) throws Exception; > void clearRangeUntil(UK endUserKey) throws Exception; > void clearRangeFrom(UK startUserKey) throws Exception; > } > ``` > > > We believe, we were also missing details regarding the state descriptor and > I'm still a bit fuzzy on what to provide as type T in StateDescriptor<S > extends State, T>. > For the constructors, however, since we'd require a > LexicographicalTypeSerializer implementation, we would propose the following > three overloads similar to the MapStateDescriptor: > ``` > public class BinarySortedMultiMapStateDescriptor<UK, UV> extends > StateDescriptor<BinarySortedMultiMapState<UK, UV>, Map<UK, List<UV>>/*?*/> { > > public BinarySortedMapStateDescriptor( > String name, LexicographicalTypeSerializer<UK> keySerializer, > TypeSerializer<UV> valueSerializer) {} > > public BinarySortedMapStateDescriptor( > String name, LexicographicalTypeSerializer<UK> keySerializer, > TypeInformation<UV> valueTypeInfo) {} > > public BinarySortedMapStateDescriptor( > String name, LexicographicalTypeSerializer<UK> keySerializer, > Class<UV> valueClass) {} > } > ``` > Technically, we could have a LexicographicalTypeInformation as well (for the > 2nd overload) but don't currently see the need for that wrapper since these > serializers are just needed for State - but maybe someone with more insights > into this topic can advise. > > > A few further points to to with respect to the implementation: > - we'll have to find a suitable heap-based state backend implementation that > integrates well with all current efforts (needs to be discussed) > - the StateProcessor API will have to receive appropriate extensions to read > and write this new form of state > > > > Nico > > > On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote: > > Hi all, > > Yun, David M, David A, and I had an offline discussion and talked through a > > couple of details that emerged from the discussion here. We believe, we have > > found a consensus on these points and would like to share our points for > > further feedback: > > > > Let me try to get through the points that were opened in arbitrary order: > > > > > > 1. We want to offer a generic interface for sorted state, not just temporal > > state as proposed initially. We would like to... > > a) ...offer a single new state type similar to what TemporalListState was > > offering (so not offering something like TemporalValueState to keep the API > > slim). > > b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values > > (I'll go into the API further below) - the naming stresses on "Binary" > > because we want to make clear that this is what the sort will be based on > > (see below) c) ...have our own state descriptor > > (BinarySortedMultiMapStateDescriptor<UK, UV>) similar to > > MapStateDescriptor<UK, UV> > > d) ...require TypeSerializer implementations for the key to extend from > > LexicographicalTypeSerializer (details below) > > > > > > 2. LexicographicalTypeSerializer basically defines the sort order when > > retrieving values: it is based on the serialized binaries, comparing them > > one- by-one in an unsigned fashion. For heap-based state backends, these > > serializers can also optionally define a Comparator that doesn't require > > serialization but needs to retain the same sort order. We would provide > > implementations of the range-based operations that will iterate based on > > binary keys if this is not provided (by simply converting all relevant keys > > to their binary form and using an appropriate comparator). > > > > ``` > > public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T> > > { default Optional<Comparator<T>> findComparator() { > > return Optional<Comparator<T>>.empty() > > } > > } > > ``` > > > > > > 3. BinarySortedMultiMap<UK, UV> should offer the following API > > > > ``` > > public class BinarySortedMultiMap<UK, UV> extends State { > > void put(UK key, Collection<UV>) throws Exception; > > void add(UK key, UV value) throws Exception; > > > > Map.Entry<UK, UV> entryAt(UK key) throws Exception; > > Map.Entry<UK, UV> firstEntry() throws Exception; > > Map.Entry<UK, UV> lastEntry() throws Exception; > > > > Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws > > Exception; > > Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws > > Exception; Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) > > throws Exception; > > > > Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws > > Exception; > > Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws > > Exception; Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey) > > throws Exception; > > } > > ``` > > > > That's for the core of the points - following a few more things that came up > > and some arguments about the "why": > > > > A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]? > > -> We looked at various use cases and didn't find a strong need because you > > could always fall back to readRange*. In the interest of a slim API, we > > thought it would be best to start without these (we can always add them > > later) > > > > A2) Should we support iterating backwards? > > -> We haven't found a compelling use case that needs this. If you need it, > > at least for some (?) use cases, you could negate the sort order through > > the serializer and achieve the same thing (unless you need to walk in two > > directions). Let's rather start with a slim API. > > > > A3) Lazy vs. eager iteration > > -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly > > retrieving a couple of values (defaults to 128 here) into a cache. This > > seems to be the best of both worlds without bloating the API > > > > A4) ChangelogStateBackend > > -> Since we require TypeSerializer implementations for the key and those > > know the length to serializer (from other requirements, e.g. in the network > > stack), it isn't affected by our change except for delegating the new > > operations to the underlying state backend. > > > > A5) Defining the binary sort order as one-by-one with unsigned bytes is fine > > because it is a very common thing among storage systems. Should a different > > binary-based state backend require something else, there could be a mapping > > function translating between different definitions. > > > > A6) How to handle Duplicates > > -> We let the user handle this by storing a multi-map, i.e. multiple values > > for the (primary) sort key. If needed, users can sort these values manually. > > As long as we don't have a strong use case where this is not feasible, we > > don't need any implicit duplicate handling by the framework (Flink). > > > > A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator > > -> We propose to use readRange*** because that seems more explicit/intuitive > > in what this is doing. > > > > A8) readRange*** with inclusive/exclusive switch > > -> In the interest of a slim API, let's not provide that. The API above will > > interpret all keys as _inclusive_ and should a user need exclusive > > behaviour, they would in the worst case read one more entry - in most of > > the cases, however, this would be served from the cache anyway, so it's not > > much of a problem > > > > A9) Why don't we want to provide a BinarySortedMap with value-like semantics > > similar to TemporalValueState? > > -> We'd like to keep the code overhead in Flink small and not provide two > > more state primitives but instead only a single one. For use cases where > > you don't want to handle lists, you can use the BinarySortedMultiMap with > > its put() method and a list with a single entry that would overwrite the > > old one. While retrieving the value(s), you can then assume the list is > > either empty or has a single entry similar to what you are currently doing > > in a > > WindowProcessFunction. You can also always add a thin wrapper to provide > > that under a more convenient API if you need to. > > > > A10) effects on the CEPOperator? > > -> We don't have an overview yet. The buffering of events inside its > > `MapState<Long, List<IN>> elementQueueState`, however, is a pattern that > > would benefit from our MultiMap since a single add() operation wouldn't > > require you to read the whole list again. > > > > > > Sorry for the long email - we'd be happy to get more feedback and will > > incorporate this into the FLIP description soon. > > > > > > > > Nico > > Dr. Nico Kruber | Solutions Architect > > Follow us @VervericaData Ververica > -- > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton > Wehner > >