Hi all,
Yun, David M, David A, and I had an offline discussion and talked through a 
couple of details that emerged from the discussion here. We believe, we have 
found a consensus on these points and would like to share our points for 
further feedback:

Let me try to get through the points that were opened in arbitrary order:


1. We want to offer a generic interface for sorted state, not just temporal 
state as proposed initially. We would like to...
a) ...offer a single new state type similar to what TemporalListState was 
offering (so not offering something like TemporalValueState to keep the API 
slim).
b) ...name it BinarySortedMultiMap<UK, UV> with Java-Object keys and values 
(I'll go into the API further below) - the naming stresses on "Binary" because 
we want to make clear that this is what the sort will be based on (see below)
c) ...have our own state descriptor (BinarySortedMultiMapStateDescriptor<UK, 
UV>) similar to MapStateDescriptor<UK, UV>
d) ...require TypeSerializer implementations for the key to extend from 
LexicographicalTypeSerializer (details below)


2. LexicographicalTypeSerializer basically defines the sort order when 
retrieving values: it is based on the serialized binaries, comparing them one-
by-one in an unsigned fashion. For heap-based state backends, these 
serializers can also optionally define a Comparator that doesn't require 
serialization but needs to retain the same sort order. We would provide
implementations of the range-based operations that will iterate based on 
binary keys if this is not provided (by simply converting all relevant keys to 
their binary form and using an appropriate comparator).

```
public interface LexicographicalTypeSerializer<T> extends TypeSerializer<T> {
  default Optional<Comparator<T>> findComparator() {
    return Optional<Comparator<T>>.empty()
  }
}
```


3. BinarySortedMultiMap<UK, UV> should offer the following API

```
public class BinarySortedMultiMap<UK, UV> extends State {
  void put(UK key, Collection<UV>) throws Exception;
  void add(UK key, UV value) throws Exception;

  Map.Entry<UK, UV> entryAt(UK key) throws Exception;
  Map.Entry<UK, UV> firstEntry() throws Exception;
  Map.Entry<UK, UV> lastEntry() throws Exception;

  Iterable<Map.Entry<UK, UV>> readRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable<Map.Entry<UK, UV>> readRangeUntil(UK endUserKey) throws Exception;
  Iterable<Map.Entry<UK, UV>> readRangeFrom(UK startUserKey) throws Exception;

  Iterable<Map.Entry<UK, UV>> clearRange(UK fromKey, UK toKey) throws 
Exception;
  Iterable<Map.Entry<UK, UV>> clearRangeUntil(UK endUserKey) throws Exception;
  Iterable<Map.Entry<UK, UV>> clearRangeFrom(UK startUserKey) throws 
Exception;
}
```

That's for the core of the points - following a few more things that came up 
and some arguments about the "why":

A1) Do we need value[AtOrBefore|AtOrAfter|Before|After]?
-> We looked at various use cases and didn't find a strong need because you 
could always fall back to readRange*. In the interest of a slim API, we 
thought it would be best to start without these (we can always add them later)

A2) Should we support iterating backwards?
-> We haven't found a compelling use case that needs this. If you need it, at 
least for some (?) use cases, you could negate the sort order through the 
serializer and achieve the same thing (unless you need to walk in two 
directions). Let's rather start with a slim API.

A3) Lazy vs. eager iteration
-> Let's implement our iterators similarly to RocksDBMapIterator by eagerly 
retrieving a couple of values (defaults to 128 here) into a cache. This seems 
to be the best of both worlds without bloating the API

A4) ChangelogStateBackend
-> Since we require TypeSerializer implementations for the key and those know 
the length to serializer (from other requirements, e.g. in the network stack), 
it isn't affected by our change except for delegating the new operations to 
the underlying state backend.

A5) Defining the binary sort order as one-by-one with unsigned bytes is fine 
because it is a very common thing among storage systems. Should a different 
binary-based state backend require something else, there could be a mapping 
function translating between different definitions.

A6) How to handle Duplicates
-> We let the user handle this by storing a multi-map, i.e. multiple values 
for the (primary) sort key. If needed, users can sort these values manually. 
As long as we don't have a strong use case where this is not feasible, we 
don't need any implicit duplicate handling by the framework (Flink).

A7) readRangeUntil vsl. headIterator and readRangeFrom vs. tailIterator
-> We propose to use readRange*** because that seems more explicit/intuitive 
in what this is doing.

A8) readRange*** with inclusive/exclusive switch
-> In the interest of a slim API, let's not provide that. The API above will 
interpret all keys as _inclusive_ and should a user need exclusive behaviour, 
they would in the worst case read one more entry - in most of the cases, 
however, this would be served from the cache anyway, so it's not much of a 
problem

A9) Why don't we want to provide a BinarySortedMap with value-like semantics 
similar to TemporalValueState?
-> We'd like to keep the code overhead in Flink small and not provide two more 
state primitives but instead only a single one. For use cases where you don't 
want to handle lists, you can use the BinarySortedMultiMap with its put() 
method and a list with a single entry that would overwrite the old one. While 
retrieving the value(s), you can then assume the list is either empty or has a 
single entry similar to what you are currently doing in a 
WindowProcessFunction. You can also always add a thin wrapper to provide that 
under a more convenient API if you need to.

A10) effects on the CEPOperator?
-> We don't have an overview yet. The buffering of events inside its 
`MapState<Long, List<IN>> elementQueueState`, however, is a pattern that would 
benefit from our MultiMap since a single add() operation wouldn't require you 
to read the whole list again.


Sorry for the long email - we'd be happy to get more feedback and will 
incorporate this into the FLIP description soon.



Nico


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner



Reply via email to