+1 to generic interface for sorted state and Binary***State.

Very happy to be able to go one step further and thank you for your discussion.

Best,
Jingsong

On Fri, May 6, 2022 at 8:37 PM Nico Kruber <n...@apache.org> wrote:
>
> While working a bit more on this, David A and I noticed a couple of things
> that were not matching each other in the proposed APIs:
>
> a) the proposed BinarySortedMultiMapState class didn't actually have getters
> that return multiple items per key, and
> b) while having a single multi-map like implementation in the backend, with
> the adapted API, we'd like to put it up for discussion again whether we maybe
> want to have a user-facing BinarySortedMapState API as well which can be
> simpler but doesn't add any additional constraints to the state backends.
>
> Let me go into details a bit more:
> in a multi-map, a single key can be backed by a set of items and as such, the
> atomic unit that should be retrievable is not a single item but rather
> something like a Collection, an Iterable , a List, or so. Since we are already
> using Iterable in the main API, how about the following?
> ```
> public class BinarySortedMultiMapState<UK, UV> extends State {
>   void put(UK key, Iterable<UV> values) throws Exception;
>   void add(UK key, UV value) throws Exception;
>
>   Iterable<UV> valueAt(UK key) throws Exception;
>
>   Map.Entry<UK, Iterable<UV>> firstEntry() throws Exception;
>   Map.Entry<UK, Iterable<UV>> lastEntry() throws Exception;
>
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRangeUntil(UK endUserKey) throws
> Exception;
>   Iterable<Map.Entry<UK, Iterable<UV>>> readRangeFrom(UK startUserKey) throws
> Exception;
>
>   void clearRange(UK fromKey, UK toKey) throws Exception;
>   void clearRangeUntil(UK endUserKey) throws Exception;
>   void clearRangeFrom(UK startUserKey) throws Exception;
> }
> ```
>
> We also considered using Iterable<Map.Entry<UK, UV>> instead of Map.Entry<UK,
> Iterable<UV>>, but that wouldn't match well with firstEntry() and lastEntry()
> because for a single key, there is not a single first/last value. We also
> looked at common MultiMap insterfaces and their getters were also always
> retrieving the whole list/collection for a key. Since we don't want to promise
> too many details to the user, we believe, Iterable is our best choice for now
> but that can also be "upgraded" to, e.g., List in the future without breaking
> client code.
>
> An appropriate map-like version of that would be the following:
> ```
> public class BinarySortedMapState<UK, UV> extends State {
>   void put(UK key, UV value) throws Exception;
>
>   UV valueAt(UK key) throws Exception;
>
>   Map.Entry<UK, UV> firstEntry() throws Exception;
>   Map.Entry<UK, UV> lastEntry() throws Exception;
>
>   Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
> Exception;
>   Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception;
>   Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception;
>
>   void clearRange(UK fromKey, UK toKey) throws Exception;
>   void clearRangeUntil(UK endUserKey) throws Exception;
>   void clearRangeFrom(UK startUserKey) throws Exception;
> }
> ```
>
>
> We believe, we were also missing details regarding the state descriptor and
> I'm still a bit fuzzy on what to provide as type T in StateDescriptor<S
> extends State, T>.
> For the constructors, however, since we'd require a
> LexicographicalTypeSerializer implementation, we would propose the following
> three overloads similar to the MapStateDescriptor:
> ```
> public class BinarySortedMultiMapStateDescriptor<UK, UV> extends
> StateDescriptor<BinarySortedMultiMapState<UK, UV>, Map<UK, List<UV>>/*?*/> {
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> TypeSerializer<UV> valueSerializer) {}
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> TypeInformation<UV> valueTypeInfo) {}
>
>     public BinarySortedMapStateDescriptor(
>             String name, LexicographicalTypeSerializer<UK> keySerializer,
> Class<UV> valueClass) {}
> }
> ```
> Technically, we could have a LexicographicalTypeInformation as well (for the
> 2nd overload) but don't currently see the need for that wrapper since these
> serializers are just needed for State - but maybe someone with more insights
> into this topic can advise.
>
>
> A few further points to to with respect to the implementation:
> - we'll have to find a suitable heap-based state backend implementation that
> integrates well with all current efforts (needs to be discussed)
> - the StateProcessor API will have to receive appropriate extensions to read
> and write this new form of state
>
>
>
> Nico
>
>
> On Friday, 29 April 2022 14:25:59 CEST Nico Kruber wrote:
> > Hi all,
> > Yun, David M, David A, and I had an offline discussion and talked through a
> > couple of details that emerged from the discussion here. We believe, we have
> > found a consensus on these points and would like to share our points for
> > further feedback:
> >
> > Let me try to get through the points that were opened in arbitrary order:
> >
> >
> > 1. We want to offer a generic interface for sorted state, not just temporal
> > state as proposed initially. We would like to...
> > a) ...offer a single new state type similar to what TemporalListState was
> > offering (so not offering something like TemporalValueState to keep the API
> > slim).
> > b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values
> > (I'll go into the API further below) - the naming stresses on "Binary"
> > because we want to make clear that this is what the sort will be based on
> > (see below) c) ...have our own state descriptor
> > (BinarySortedMultiMapStateDescriptor<UK, UV>) similar to
> > MapStateDescriptor<UK, UV>
> > d) ...require TypeSerializer implementations for the key to extend from
> > LexicographicalTypeSerializer (details below)
> >
> >
> > 2. LexicographicalTypeSerializer basically defines the sort order when
> > retrieving values: it is based on the serialized binaries, comparing them
> > one- by-one in an unsigned fashion. For heap-based state backends, these
> > serializers can also optionally define a Comparator that doesn't require
> > serialization but needs to retain the same sort order. We would provide
> > implementations of the range-based operations that will iterate based on
> > binary keys if this is not provided (by simply converting all relevant keys
> > to their binary form and using an appropriate comparator).
> >
> > ```
> > public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T>
> > { default Optional<Comparator<T>> findComparator() {
> >     return Optional<Comparator<T>>.empty()
> >   }
> > }
> > ```
> >
> >
> > 3. BinarySortedMultiMap<UK, UV> should offer the following API
> >
> > ```
> > public class BinarySortedMultiMap<UK, UV> extends State {
> >   void put(UK key, Collection<UV>) throws Exception;
> >   void add(UK key, UV value) throws Exception;
> >
> >   Map.Entry<UK, UV> entryAt(UK key) throws Exception;
> >   Map.Entry<UK, UV> firstEntry() throws Exception;
> >   Map.Entry<UK, UV> lastEntry() throws Exception;
> >
> >   Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws
> > Exception;
> >   Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws
> > Exception; Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey)
> > throws Exception;
> >
> >   Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws
> > Exception;
> >   Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws
> > Exception; Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey)
> > throws Exception;
> > }
> > ```
> >
> > That's for the core of the points - following a few more things that came up
> > and some arguments about the "why":
> >
> > A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
> > -> We looked at various use cases and didn't find a strong need because you
> > could always fall back to readRange*. In the interest of a slim API, we
> > thought it would be best to start without these (we can always add them
> > later)
> >
> > A2) Should we support iterating backwards?
> > -> We haven't found a compelling use case that needs this. If you need it,
> > at least for some (?) use cases, you could negate the sort order through
> > the serializer and achieve the same thing (unless you need to walk in two
> > directions). Let's rather start with a slim API.
> >
> > A3) Lazy vs. eager iteration
> > -> Let's implement our iterators similarly to RocksDBMapIterator by eagerly
> > retrieving a couple of values (defaults to 128 here) into a cache. This
> > seems to be the best of both worlds without bloating the API
> >
> > A4) ChangelogStateBackend
> > -> Since we require TypeSerializer implementations for the key and those
> > know the length to serializer (from other requirements, e.g. in the network
> > stack), it isn't affected by our change except for delegating the new
> > operations to the underlying state backend.
> >
> > A5) Defining the binary sort order as one-by-one with unsigned bytes is fine
> > because it is a very common thing among storage systems. Should a different
> > binary-based state backend require something else, there could be a mapping
> > function translating between different definitions.
> >
> > A6) How to handle Duplicates
> > -> We let the user handle this by storing a multi-map, i.e. multiple values
> > for the (primary) sort key. If needed, users can sort these values manually.
> > As long as we don't have a strong use case where this is not feasible, we
> > don't need any implicit duplicate handling by the framework (Flink).
> >
> > A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
> > -> We propose to use readRange*** because that seems more explicit/intuitive
> > in what this is doing.
> >
> > A8) readRange*** with inclusive/exclusive switch
> > -> In the interest of a slim API, let's not provide that. The API above will
> > interpret all keys as _inclusive_ and should a user need exclusive
> > behaviour, they would in the worst case read one more entry - in most of
> > the cases, however, this would be served from the cache anyway, so it's not
> > much of a problem
> >
> > A9) Why don't we want to provide a BinarySortedMap with value-like semantics
> > similar to TemporalValueState?
> > -> We'd like to keep the code overhead in Flink small and not provide two
> > more state primitives but instead only a single one. For use cases where
> > you don't want to handle lists, you can use the BinarySortedMultiMap with
> > its put() method and a list with a single entry that would overwrite the
> > old one. While retrieving the value(s), you can then assume the list is
> > either empty or has a single entry similar to what you are currently doing
> > in a
> > WindowProcessFunction. You can also always add a thin wrapper to provide
> > that under a more convenient API if you need to.
> >
> > A10) effects on the CEPOperator?
> > -> We don't have an overview yet. The buffering of events inside its
> > `MapState<Long, List<IN>> elementQueueState`, however, is a pattern that
> > would benefit from our MultiMap since a single add() operation wouldn't
> > require you to read the whole list again.
> >
> >
> > Sorry for the long email - we'd be happy to get more feedback and will
> > incorporate this into the FLIP description soon.
> >
> >
> >
> > Nico
>
> Dr. Nico Kruber | Solutions Architect
>
> Follow us @VervericaData Ververica
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>

Reply via email to