Hi all, I have read the discussion points from the last emails and would like to add my two cents on what I believe are the remaining points to solve:
1. Do we need a TemporalValueState? I guess, this all boils down to dealing with duplicates / values for the same timestamp. Either you always have to account for them and thus always have to store a list anyway, or you only need to join with "the latest" (or the only) value for a given timestamp and get a nicer API and lower overhead for that use case. At the easiest, you can make an assumption that there is only a single value for each timestamp by contract, e.g. by increasing the timestamp precision and interpreting them as nanoseconds, or maybe milliseconds are already good enough. If that contract breaks, however, you will get into undefined behaviour. The TemporalRowTimeJoinOperator, for example, currently just assumes that there is only a single value on the right side of the join (rightState) and I believe many use cases can make that assumption or otherwise you'd have to define the expected behaviour for multiple values at the same timestamp, e.g. "join with the most recent value at the time of the left side and if there are multiple values, choose X". I lean towards having a ValueState implementation as well (in addition to lists). 2. User-facing API (Iterators vs. valueAtOr[Before|After]) I like the iterable-based APIs that David M was proposing, i.e. - Iterable<TimestampedValue<T>> readRange(long minTimestamp, long limitTimestamp); - void clearRange(long minTimestamp, long limitTimestamp); However, I find Iterables rather cumbersome to work with if you actually only need a single value, e.g. the most recent one. For iterating over a range of values, however, they feel more natural to me than our proposal. Actually, if we generalise the key type (see below), we may also need to offer additional value[Before|After] functions to cover "+1" iterations where we cannot simply add 1 as we do now. (a) How about offering both Iterables and value[AtOrBefore|AtOrAfter|Before| After]? This would be similar to what NavigableMap [2] is offering but with a more explicit API than "ceiling", "floor",... (b) Our API proposal currently also allows iterating backwards which is not covered by the readRange proposal - we could, however, just do that if minTimestamp > limitTimestamp). What do you think? (c) When implementing the iterators, I actually also see two different modes which may differ in performance: I call them iteration with eager vs. lazy value retrieval. Eager retrieval may retrieve all values in a range at once and make them available in memory, e.g. for smaller data sets similar to what TemporalRowTimeJoinOperator is doing for the right side of the join. This can be spare a lot of Java<->JNI calls and let RocksDB iterate only once (as long at things fit into memory). Lazy retrieval would fetch results one-by-one. -> We could set one as default and allow the user to override that behaviour. 3. Should we generalise the Temporal***State to offer arbitrary key types and not just Long timestamps? @Yun Tang: can you describe in more detail where you think this would be needed for SQL users? I don't quite get how this would be beneficial. The example you linked doesn't quite show the same behaviour. Other than this, I could see that you can leverage such a generalisation for arbitrary joins between, for example, IDs and ID ranges which don't have a time component attached to it. Given that this shouldn't be too difficult to expose (the functionality has to exist anyway, but otherwise buried into Flink's internals). We'd just have to find suitable names. (a) I don't think TemporalListState<T> is actually SortedMapState<Long, List<T>> because we need efficient "add to list" primitives which cannot easily be made available with a single generic SortedMapState... (b) So the most expressive (yet kind-of ugly) names could be - SortedMapState<Long, ValueType> - SortedMapOfListsState<Long, List<ValueType>> (c) For both of these, we could then re-use the existing MapStateDescriptor to define key and value/list-element types and require that the key type / serializer implements a certain RetainingSortOrderSerializer interface (and think about a better name for this) which defines the contract that the binary sort order is the same as the Java Object one. -> that can also be verified at runtime to fail early. 4. ChangelogStateBackend: we don't think this needs special attention - it is just delegating to the other backends anyway and these methods are already adapted in our POC code @David M, Yun Tang: let me/us know what you think about these proposals Nico [2] https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html On Thursday, 14 April 2022 14:15:53 CEST Yun Tang wrote: > Hi David Anderson, > > I feel doubted that no motivating use case for this generalization to > SortedMapState. From our internal stats, SQL user would use much more cases > of min/max with retract functions [1] compared with interval join. > From my understanding, the TemporalListState<T> is actually > SortedMapState<Long, List<T>>, while TemporalValueState<T> is > SortedMapState<Long, T>. As you can see, if we just restrict SortedMapState > with the key type as Long, all current two new interfaces could be > replaced. > Moreover, once we introduce temporal state, the extension would be limited. > Apart from Long, many other types could be comparable, e.g. TimeStamp, Int, > Float and so on. How could we handle these feature request after > TemporalState merged? I don't think introducing too many state types is a > good idea. We can only support Long type for the 1st version when > introducing SortedMapState, and then extends it to many other more types in > the future. This could balance the feature requests with clean interfaces > design. And thus, we can also use sorted map state in the popular min/max > functions. > > By the way, current FLIP lacks of consideration of the work on changelog > state-backend once two new state types are introduced. > > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime > /src/main/java/org/apache/flink/table/runtime/functions/aggregate/MinWithRet > ractAggFunction.java > Best, > Yun Tang > ________________________________ > From: David Morávek <david.mora...@gmail.com> > Sent: Wednesday, April 13, 2022 19:50 > To: dev <dev@flink.apache.org> > Subject: Re: [DISCUSS] FLIP-220: Temporal State > > Here is a very naive implementation [1] from a prototype I did few months > back that uses list and insertion sort. Since the list is sorted we can use > binary search to create sub-list, that could leverage the same thing I've > described above. > > I think back then I didn't go for the SortedMap as it would be hard to > implement with the current heap state backend internals and would have > bigger memory overhead. > > The ideal solution would probably use skip list [2] to lower the overhead > of the binary search, while maintaining a reasonable memory footprint. > Other than that it could be pretty much the same as the prototype > implementation [1]. > > [1] > https://github.com/dmvk/flink/blob/ecdbc774b13b515e8c0943b2c143fb1e34eca6f0/ > flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapTemporal > ListState.java [2] https://en.wikipedia.org/wiki/Skip_list > > Best, > D. > > On Wed, Apr 13, 2022 at 1:27 PM David Morávek <david.mora...@gmail.com> > wrote: > > > > Hi David, > > > > > > > > It seems to me that at least with the heap-based state backend, readRange > > > >> is going to have to do a lot of unnecessary work to implement this > >> isEmpty() operation, since it have will to consider the entire range > >> from > >> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit isEmpty method? > >> I'm not convinced we need it, but it would be cheaper to implement. Or > >> perhaps this join can be rewritten to not need this operation; I haven't > >> thought enough about that alternative.) > >> > >> > > > > > > > > I think this really boils down to how the returned iterable is going to > > be > > implemented. Basically for checking whether state is empty, you need to > > do > > something along the lines of: > > > > > > > > Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE)); // > > basically checking `hasNext() == false` or `isEmpty()` in case of > > `Collection` > > > > > > > > Few notes: > > 1) It could be lazy (the underlying collection doesn't have to be > > materialized - eg. in case of RocksDB); > > 2) For HeapStateBackend it depends on the underlying implementation. I'd > > probably do something along the lines of sorted tree (eg. SortedMap / > > NavigableMap), that allows effective range scans / range deletes. Then > > you > > could simply do something like (from top of the head): > > > > > > > > @Value > > class TimestampedKey<K> { > > > > K key; > > long timestamap; > > > > } > > > > > > > > SortedMap<TimestampedKey<K>, V> internalState; > > > > > > > > Iterable<TimestampedValue<V>> readRange(long min, long max) { > > > > return toIterable(internalState.subMap(new TimestampedKey(currentKey(), > > > > min), new TimestampedKey(currentKey(), max))); > > } > > > > > > > > This should be fairly cheap. The important bit is that the returned > > iterator is always non-null, but could be empty. > > > > > > > > Does that answer your question? > > > > > > > > D. > > > > > > > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson <da...@alpinegizmo.com> > > wrote: > > > > > > > >> Yun Tang and Jingsong, > >> > >> > >> > >> Some flavor of OrderedMapState is certainly feasible, and I do see some > >> appeal in supporting Binary**State. > >> > >> > >> > >> However, I haven't seen a motivating use case for this generalization, > >> and > >> would rather keep this as simple as possible. By handling Longs we can > >> already optimize a wide range of use cases. > >> > >> > >> > >> David > >> > >> > >> > >> > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote: > >> > >> > >> > >> > Hi David, > >> > > >> > > >> > > >> > Could you share some explanations why SortedMapState cannot work in > >> > details? I just cannot catch up what the statement below means: > >> > > >> > > >> > > >> > This was rejected as being overly difficult to implement in a way that > >> > would cleanly leverage RocksDB’s iterators. > >> > > >> > > >> > > >> > > >> > Best > >> > Yun Tang > >> > ________________________________ > >> > From: Aitozi <gjying1...@gmail.com> > >> > Sent: Tuesday, April 12, 2022 15:00 > >> > To: dev@flink.apache.org <dev@flink.apache.org> > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State > >> > > >> > > >> > > >> > Hi David > >> > > >> > I have look through the doc, I think it will be a good > >> > improvement > >> > >> to > >> > >> > this pattern usage, I'm interested in it. Do you have some POC work to > >> > share for a closer look. > >> > Besides, I have one question that can we support expose the namespace > >> > in > >> > the different state type not limited to `TemporalState`. By this, user > >> > >> can > >> > >> > specify the namespace > >> > and the TemporalState is one of the special case that it use timestamp > >> > >> as > >> > >> > the namespace. I think it will be more extendable. > >> > > >> > What do you think about this ? > >> > > >> > > >> > > >> > Best, > >> > Aitozi. > >> > > >> > > >> > > >> > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道: > >> > > >> > > >> > > >> > > Greetings, Flink developers. > >> > > > >> > > > >> > > > >> > > I would like to open up a discussion of a proposal [1] to add a new > >> > >> kind > >> > >> > of > >> > > >> > > state to Flink. > >> > > > >> > > > >> > > > >> > > The goal here is to optimize a fairly common pattern, which is using > >> > > > >> > > > >> > > > >> > > MapState<Long, List<Event>> > >> > > > >> > > > >> > > > >> > > to store lists of events associated with timestamps. This pattern is > >> > >> used > >> > >> > > internally in quite a few operators that implement sorting and > >> > > joins, > >> > >> and > >> > >> > > it also shows up in user code, for example, when implementing custom > >> > > windowing in a KeyedProcessFunction. > >> > > > >> > > > >> > > > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that > >> > > achieves > >> > >> a > >> > >> > > more than 2x improvement in throughput when performing these > >> > >> operations > >> > >> > on > >> > > >> > > RocksDB by better leveraging the capabilities of the RocksDB state > >> > > >> > backend. > >> > > >> > > > >> > > > >> > > See FLIP-220 [1] for details. > >> > > > >> > > > >> > > > >> > > Best, > >> > > David > >> > > > >> > > > >> > > > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > >> > > > >> > > > >> > > >> > > >> > >> > > > > Dr. Nico Kruber | Solutions Architect Follow us @VervericaData Ververica -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner