Thanks for the discussion. I think that this is a very good question to flink and we can benefit a lot from it. The default IntervalJoinOperator is really inefficient. We have optimized the problem by using Rocksdb's upper bound and lower bound which detail can refer to https://issues.apache.org/jira/browse/FLINK-10949. A more generic way is useful.
David Morávek <david.mora...@gmail.com> 于2022年4月22日周五 20:28写道: > > > > With that in mind, we could only offer a couple of selected > > temporal/sorted > > state implementations that are handled internally, but not really a > > generic > > one - even if you let the user explicitly handle binary keys... > > > If we want to have a generic interface that is portable between different > state backends and allows for all the use-cases described above, > lexicographical binary sort sounds reasonable, because you need to be able > to push sorting out of the JVM boundary. > > Only trade off I can think of is that as long as you stay within the JVM > (heap state backend), you need to pay a slight key serialization cost, > which is IMO ok-ish. > > Do you have any future state backend ideas in mind, that might not work > with this assumption? > > - > > I'm really starting to like the idea of having a BinarySortedMapState + > higher level / composite states. > > D. > > > On Fri, Apr 22, 2022 at 1:58 PM David Morávek <david.mora...@gmail.com> > wrote: > > > Isn't allowing a TemporalValueState just a special case of b.III? So if a > >> user > >> of the state wants that, then they can leverage a simple API vs. if you > >> want > >> fancier duplicate handling, you'd just go with TemporalListState and > >> implement > >> the logic you want? > > > > > > Yes it is. But it IMO doesn't justify adding a new state primitive. My > > take would be that as long as we can build TVS using other existing state > > primitives (TLS) we should treat it as a "composite state". We currently > > don't have a good user facing API to do that, but it could be added in > > separate FLIP. > > > > eg. something along the lines of > > > > TemporalValueState<String> state = getRuntimeContext().getCompositeState( > > new CompositeStateDescriptor<>( > > "composite", new TemporalValueState(type))); > > > > On Fri, Apr 22, 2022 at 1:44 PM Nico Kruber <n...@apache.org> wrote: > > > >> David, > >> > >> 1) Good points on the possibility to make the TemporalListState generic > >> -> actually, if you think about it more, we are currently assuming that > >> all > >> state backends use the same comparison on the binary level because we > add > >> an > >> appropriate serializer at an earlier abstraction level. This may not > >> actually > >> hold for all (future) state backends and can limit further > >> implementations (if > >> you think this is something to keep in mind!). > >> > >> So we may have to push this serializer implementation further down the > >> stack, > >> i.e. our current implementation is one that fits RocksDB and that > alone... > >> > >> With that in mind, we could only offer a couple of selected > >> temporal/sorted > >> state implementations that are handled internally, but not really a > >> generic > >> one - even if you let the user explicitly handle binary keys... > >> > >> > >> 2) Duplicates > >> > >> Isn't allowing a TemporalValueState just a special case of b.III? So if > a > >> user > >> of the state wants that, then they can leverage a simple API vs. if you > >> want > >> fancier duplicate handling, you'd just go with TemporalListState and > >> implement > >> the logic you want? > >> > >> > >> > >> Nico > >> > >> On Friday, 22 April 2022 10:43:48 CEST David Morávek wrote: > >> > Hi Yun & Nico, > >> > > >> > few thoughts on the discussion > >> > > >> > 1) Making the TemporalListState generic > >> > > >> > This is just not possible with the current infrastructure w.r.t type > >> > serializers as the sorting key *needs to be comparable on the binary > >> level* > >> > (serialized form). > >> > > >> > What I could imagine, is introducing some kind of > >> `Sorted(List|Map)State` > >> > with explicit binary keys. User would either have to work directly > with > >> > `byte[]` keys or provide a function for transforming keys into the > >> binary > >> > representation that could be sorted (this would have to be different > >> from > >> > `TypeSerializer` which could get more fancy with the binary > >> representation, > >> > eg. to save up space -> varints). > >> > > >> > This kind of interface might be really hard to grasp by the pipeline > >> > authors. There needs to be a deeper understanding how the byte > >> comparison > >> > works (eg. it needs to be different from the java byte comparison > which > >> > compares bytes as `signed`). This could be maybe partially mitigated > by > >> > providing predefined `to binary sorting key` functions for the common > >> > primitives / types. > >> > > >> > 2) Duplicates > >> > > >> > I guess, this all boils down to dealing with duplicates / values for > the > >> > > >> > > same timestamp. > >> > > >> > We should never have duplicates. Let's try to elaborate on what having > >> the > >> > duplicates really means in this context. > >> > > >> > a) Temporal Table point of view > >> > > >> > There could be only a single snapshot of the table at any given point > in > >> > time (~ physics). If we allow for duplicates we violate this, as it's > >> not > >> > certain what the actual state of the table is at that point in time. > In > >> > case of the temporal join, what should the elements from the other > side > >> > join against? > >> > > >> > If we happen to have a duplicate, it actually brings us to b) > causality > >> > (which could actually answer the previous question). > >> > > >> > b) Causality > >> > > >> > When building any kind of state machine, it's important to think about > >> > causality (if we switch the order of events, state transitions no > longer > >> > result in the same state). Temporal table is a specific type of the > >> state > >> > machine. > >> > > >> > There are several approaches to mitigate this: > >> > I) nano-second precision -> the chance that two events affecting the > >> same > >> > thing happen at the exactly same nanosecond is negligible (from the > >> > physical standpoint) > >> > II) the sorting key is tuple of (timestamp, sequential id) -> an > example > >> > could be early firings (you get speculative results from a windowed > >> > aggregation with timestamp = EOW, but you can easily assign the order > in > >> > which these have happened) > >> > III) last write with the same timestamp wins -> this is a special case > >> of > >> > II) when we're sure that elements with the duplicate timestamp come in > >> order > >> > > >> > c) Secondary Sort > >> > > >> > Handling the actual duplicates requires secondary sort key, which > might > >> > complicate the `to binary sorting key` interface discussed in 1) - > >> > basically some kind of user provided duplicate handler. > >> > > >> > d) Temporal Value State > >> > > >> > The above point apply to the temporal value state as well, it just > >> pushes > >> > the responsibility away from the state interface. I'm still not > >> convinced > >> > that this is a right direction. > >> > > >> > - > >> > > >> > For most use cases I've seen, the millisecond precision is more than > >> enough > >> > (+ the last write wins as a fallback). Supporting the use cases where > >> it's > >> > not actually enough (I've seen that as well in the past, especially > with > >> > the early firings), might be actually a good case for a more generic > >> form > >> > of the state, that we've discussed in 1). > >> > > >> > 3) Map vs List > >> > > >> > I think this also boils down to the discussion of how to handle > >> duplicates. > >> > From the commonly accepted contracts, list implies that there could be > >> > duplicates and map implies otherwise. One concern about `Map` is that > it > >> > also implies that you should be able to do a point query. > >> > > >> > Best, > >> > D. > >> > > >> > . > >> > > >> > On Fri, Apr 22, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote: > >> > > Hi Nico, > >> > > > >> > > I did not mean that we need to support all APIs in NavigableMap, and > >> it is > >> > > indeed too heavily to implement them all. > >> > > Moreover, I also prefer iterator-like API instead of the original > >> #tailMap > >> > > like API. I just use NavigableMap's API to show examples. > >> > > > >> > > I think we can introduce new APIs like: > >> > > > >> > > SortedMapState<UK, UV> extends State > >> > > > >> > > Map.Entry<UK, UV> firstEntry() throws Exception; > >> > > Map.Entry<UK, UV> lastEntry() throws Exception; > >> > > Iterator<Map.Entry<UK, UV>> headIterator(UK endUserKey) throws > >> > > Exception; > >> > > Iterator<Map.Entry<UK, UV>> tailIterator(UK startUserKey) throws > >> > > > >> > > Exception; > >> > > > >> > > Iterator<Map.Entry<UK, UV>> subIterator(UK startUserKey, UK > >> endUserKey) > >> > > > >> > > throws Exception; > >> > > > >> > > Since SortedMapState has several new APIs, I prefer to introduce new > >> state > >> > > descriptor to distinguish with the original map state. > >> > > > >> > > For the API of SortedMapOfListsState, I would not be strong against, > >> and I > >> > > just want to know the actual benefits if we really want to introduce > >> API. > >> > > > >> > > When talking about the part of changelog state backend, my concern > is > >> > > about how to group keys together in the changelog logger. > >> > > I can share a problem, and before this I need to spend some time on > >> how to > >> > > implement serializer to keep the order of serialized bytes same as > >> > > original > >> > > java objects first. > >> > > For the fixed-length serializer, such as LongSerializer, we just > need > >> to > >> > > ensure all numbers are positive or inverting the sign bit. > >> > > However, for non-fixed-length serializer, such as StringSerializer, > it > >> > > will write the length of the bytes first, which will break the > natural > >> > > order if comparing the bytes. Thus, we might need to avoid to write > >> the > >> > > length in the serialized bytes. > >> > > On the other hand, changelog logger would record operation per key > >> one by > >> > > one in the logs. We need to consider how to distinguish each key in > >> the > >> > > combined serialized byte arrays. > >> > > > >> > > Best > >> > > Yun Tang > >> > > > >> > > ------------------------------ > >> > > *From:* Nico Kruber <n...@apache.org> > >> > > *Sent:* Thursday, April 21, 2022 23:50 > >> > > *To:* dev <dev@flink.apache.org> > >> > > *Cc:* David Morávek <david.mora...@gmail.com>; Yun Tang < > >> myas...@live.com> > >> > > *Subject:* Re: [DISCUSS] FLIP-220: Temporal State > >> > > > >> > > Thanks Yun Tang for your clarifications. > >> > > Let me keep my original structure and reply in these points... > >> > > > >> > > 3. Should we generalise the Temporal***State to offer arbitrary key > >> types > >> > > and > >> > > not just Long timestamps? > >> > > > >> > > The use cases you detailed do indeed look similar to the ones we > were > >> > > optimising in our TemporalState PoC... > >> > > > >> > > I don't think, I'd like to offer a full implementation of > NavigableMap > >> > > though > >> > > because that seems quite some overhead to implement while we can > >> cover the > >> > > mentioned examples with the proposed APIs already when using > >> iterators as > >> > > well > >> > > as single-value retrievals. > >> > > So far, when we were iterating from the smallest key, we could just > >> use > >> > > Long.MIN_VALUE and start from there. That would be difficult to > >> generalise > >> > > for > >> > > arbitrary data types because you may not always know the smallest > >> possible > >> > > value for a certain serialized type (unless we put this into the > >> > > appropriate > >> > > serializer interface). > >> > > > >> > > I see two options here: > >> > > a) a slim API but using NULL as an indicator for smallest/largest > >> > > depending on > >> > > the context, e.g. > >> > > > >> > > - `readRange(null, key)` means from beginning to key > >> > > - `readRange(key, null)` means from key to end > >> > > - `readRange(null, null)` means from beginning to end > >> > > - `value[AtOr]Before(null)` means largest available key > >> > > - `value[AtOr]After(null)` means smallest available key > >> > > > >> > > b) a larger API with special methods for each of these use cases > >> similar > >> > > to > >> > > what NavigableMap has but based on iterators and single-value > >> functions > >> > > only > >> > > > >> > > > BTW, I prefer to introduce another state descriptor instead of > >> current > >> > > > >> > > map > >> > > > >> > > > state descriptor. > >> > > > >> > > Can you elaborate on this? We currently don't need extra > >> functionality, so > >> > > this would be a plain copy of the MapStateDescriptor... > >> > > > >> > > > For the API of SortedMapOfListsState, I think this is a bit > bounded > >> to > >> > > > current implementation of RocksDB state-backend. > >> > > > >> > > Actually, I don't think this is special to RocksDB but generic to > all > >> > > state > >> > > backends that do not hold values in memory and allow fast > append-like > >> > > operations. > >> > > Additionally, since this is a very common use case and RocksDB is > also > >> > > widely > >> > > used, I wouldn't want to continue without this specialization. For a > >> > > similar > >> > > reason, we offer ListState and not just ValueState<List>... > >> > > > >> > > > >> > > 4. ChangelogStateBackend > >> > > > >> > > > For the discussion of ChangelogStateBackend, you can think of > >> changelog > >> > > > state-backend as a write-ahead-log service. And we need to record > >> the > >> > > > changes to any state, thus this should be included in the design > >> doc as > >> > > > >> > > we > >> > > > >> > > > need to introduce another kind of state, especially you might need > >> to > >> > > > consider how to store key bytes serialized by the new serializer > >> (as we > >> > > > might not be able to write the length in the beginning of > serialized > >> > > > >> > > bytes > >> > > > >> > > > to make the order of bytes same as natural order). > >> > > > >> > > Since the ChangelogStateBackend "holds the working state in the > >> underlying > >> > > delegatedStateBackend, and forwards state changes to State > >> Changelog", I > >> > > honestly still don't see how this needs special handling. As long as > >> the > >> > > delegated state backend suppors sorted state, ChangelogStateBackend > >> > > doesn't > >> > > have to do anything special except for recording changes to state. > >> Our PoC > >> > > simply uses the namespace for these keys and that's the same thing > the > >> > > Window > >> > > API is already using - so there's nothing special here. The order in > >> the > >> > > log > >> > > doesn't have to follow the natural order because this is only > required > >> > > inside > >> > > the delegatedStateBackend, isn't it? > >> > > > >> > > > >> > > Nico > >> > > > >> > > On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote: > >> > > > Hi Nico, > >> > > > > >> > > > Thanks for your clarification. > >> > > > For the discussion about generalizing Temporal state to sorted map > >> > > > >> > > state. I > >> > > > >> > > > could give some examples of how to use sorted map state in min/max > >> with > >> > > > retract functions. > >> > > > > >> > > > As you know, NavigableMap in java has several APIs like: > >> > > > Map.Entry<K,V> firstEntry(); > >> > > > Map.Entry<K,V> lastEntry(); > >> > > > NavigableMap<K,V> tailMap(K fromKey, boolean inclusive) > >> > > > > >> > > > The #firstEntry API could be used in > >> > > > MinWithRetractAggFunction#updateMin, > >> > > > #lastEntry could be used in MaxWithRetractAggFunction#updateMax, > and > >> > > > #tailMap could be used in > FirstValueWithRetractAggFunction#retract. > >> > > > >> > > If we > >> > > > >> > > > can introduce SortedMap-like state, these functions could be > >> benefited. > >> > > > BTW, I prefer to introduce another state descriptor instead of > >> current > >> > > > >> > > map > >> > > > >> > > > state descriptor. > >> > > > For the API of SortedMapOfListsState, I think this is a bit > bounded > >> to > >> > > > current implementation of RocksDB state-backend. > >> > > > > >> > > > For the discussion of ChangelogStateBackend, you can think of > >> changelog > >> > > > state-backend as a write-ahead-log service. And we need to record > >> the > >> > > > changes to any state, thus this should be included in the design > >> doc as > >> > > > >> > > we > >> > > > >> > > > need to introduce another kind of state, especially you might need > >> to > >> > > > consider how to store key bytes serialized by the new serializer > >> (as we > >> > > > might not be able to write the length in the beginning of > serialized > >> > > > >> > > bytes > >> > > > >> > > > to make the order of bytes same as natural order). > >> > > > > >> > > > Best > >> > > > Yun Tang. > >> > > > ________________________________ > >> > > > From: Nico Kruber <n...@apache.org> > >> > > > Sent: Wednesday, April 20, 2022 0:38 > >> > > > To: dev <dev@flink.apache.org> > >> > > > Cc: Yun Tang <myas...@live.com>; David Morávek < > >> david.mora...@gmail.com> > >> > > > Subject: Re: [DISCUSS] FLIP-220: Temporal State > >> > > > > >> > > > Hi all, > >> > > > I have read the discussion points from the last emails and would > >> like to > >> > > > add > >> > > > >> > > my two cents on what I believe are the remaining points to solve: > >> > > > 1. Do we need a TemporalValueState? > >> > > > > >> > > > I guess, this all boils down to dealing with duplicates / values > >> for the > >> > > > same > >> > > > >> > > timestamp. Either you always have to account for them and thus > always > >> > > > >> > > > have to store a list anyway, or you only need to join with "the > >> latest" > >> > > > >> > > (or > >> > > > >> > > > the only) value for a given timestamp and get a nicer API and > lower > >> > > > overhead for that use case. > >> > > > At the easiest, you can make an assumption that there is only a > >> single > >> > > > value > >> > > > >> > > for each timestamp by contract, e.g. by increasing the timestamp > >> > > > >> > > > precision and interpreting them as nanoseconds, or maybe > >> milliseconds > >> > > > are > >> > > > already good enough. If that contract breaks, however, you will > get > >> into > >> > > > undefined behaviour. > >> > > > The TemporalRowTimeJoinOperator, for example, currently just > assumes > >> > > > that > >> > > > there is only a single value on the right side of the join > >> (rightState) > >> > > > >> > > and > >> > > > >> > > > I > >> > > > >> > > believe many use cases can make that assumption or otherwise you'd > >> have > >> > > > >> > > > to define the expected behaviour for multiple values at the same > >> > > > >> > > timestamp, > >> > > > >> > > > e.g. "join with the most recent value at the time of the left side > >> and > >> > > > if > >> > > > there are multiple values, choose X". > >> > > > > >> > > > I lean towards having a ValueState implementation as well (in > >> addition > >> > > > to > >> > > > lists). > >> > > > > >> > > > > >> > > > 2. User-facing API (Iterators vs. valueAtOr[Before|After]) > >> > > > > >> > > > I like the iterable-based APIs that David M was proposing, i.e. > >> > > > - Iterable<TimestampedValue<T>> readRange(long minTimestamp, long > >> > > > limitTimestamp); > >> > > > - void clearRange(long minTimestamp, long limitTimestamp); > >> > > > > >> > > > However, I find Iterables rather cumbersome to work with if you > >> actually > >> > > > only > >> > > > >> > > need a single value, e.g. the most recent one. > >> > > > >> > > > For iterating over a range of values, however, they feel more > >> natural to > >> > > > >> > > me > >> > > > >> > > > than our proposal. > >> > > > > >> > > > Actually, if we generalise the key type (see below), we may also > >> need to > >> > > > offer > >> > > > >> > > additional value[Before|After] functions to cover "+1" iterations > >> > > > >> > > > where we cannot simply add 1 as we do now. > >> > > > > >> > > > (a) How about offering both Iterables and > >> > > > value[AtOrBefore|AtOrAfter|Before| > >> > > > >> > > After]? > >> > > > >> > > > This would be similar to what NavigableMap [2] is offering but > with > >> a > >> > > > >> > > more > >> > > > >> > > > explicit API than "ceiling", "floor",... > >> > > > > >> > > > (b) Our API proposal currently also allows iterating backwards > >> which is > >> > > > >> > > not > >> > > > >> > > > covered by the readRange proposal - we could, however, just do > that > >> if > >> > > > minTimestamp > limitTimestamp). What do you think? > >> > > > > >> > > > (c) When implementing the iterators, I actually also see two > >> different > >> > > > modes > >> > > > >> > > which may differ in performance: I call them iteration with eager > >> > > > >> > > > vs. lazy value retrieval. Eager retrieval may retrieve all values > >> in a > >> > > > range at once and make them available in memory, e.g. for smaller > >> data > >> > > > >> > > sets > >> > > > >> > > > similar to what TemporalRowTimeJoinOperator is doing for the right > >> side > >> > > > >> > > of > >> > > > >> > > > the join. This can be spare a lot of Java<->JNI calls and let > >> RocksDB > >> > > > iterate only once (as long at things fit into memory). Lazy > >> retrieval > >> > > > >> > > would > >> > > > >> > > > fetch results one-by-one. -> We could set one as default and allow > >> the > >> > > > >> > > user > >> > > > >> > > > to override that behaviour. > >> > > > > >> > > > 3. Should we generalise the Temporal***State to offer arbitrary > key > >> > > > types > >> > > > and > >> > > > >> > > not just Long timestamps? > >> > > > >> > > > @Yun Tang: can you describe in more detail where you think this > >> would be > >> > > > needed for SQL users? I don't quite get how this would be > >> beneficial. > >> > > > The > >> > > > example you linked doesn't quite show the same behaviour. > >> > > > > >> > > > Other than this, I could see that you can leverage such a > >> generalisation > >> > > > for > >> > > > >> > > arbitrary joins between, for example, IDs and ID ranges which don't > >> > > > >> > > > have a time component attached to it. Given that this shouldn't be > >> too > >> > > > difficult to expose (the functionality has to exist anyway, but > >> > > > otherwise > >> > > > buried into Flink's internals). We'd just have to find suitable > >> names. > >> > > > > >> > > > (a) I don't think TemporalListState<T> is actually > >> SortedMapState<Long, > >> > > > List<T>> because we need efficient "add to list" primitives which > >> cannot > >> > > > easily be made available with a single generic SortedMapState... > >> > > > > >> > > > (b) So the most expressive (yet kind-of ugly) names could be > >> > > > - SortedMapState<Long, ValueType> > >> > > > - SortedMapOfListsState<Long, List<ValueType>> > >> > > > > >> > > > (c) For both of these, we could then re-use the existing > >> > > > >> > > MapStateDescriptor > >> > > > >> > > > to > >> > > > >> > > define key and value/list-element types and require that the key > >> type / > >> > > > >> > > > serializer implements a certain RetainingSortOrderSerializer > >> interface > >> > > > >> > > (and > >> > > > >> > > > think about a better name for this) which defines the contract > that > >> the > >> > > > binary sort order is the same as the Java Object one. > >> > > > -> that can also be verified at runtime to fail early. > >> > > > > >> > > > > >> > > > 4. ChangelogStateBackend: we don't think this needs special > >> attention - > >> > > > >> > > it > >> > > > >> > > > is > >> > > > >> > > just delegating to the other backends anyway and these methods are > >> > > > >> > > > already adapted in our POC code > >> > > > > >> > > > > >> > > > @David M, Yun Tang: let me/us know what you think about these > >> proposals > >> > > > > >> > > > > >> > > > > >> > > > Nico > >> > > > > >> > > > > >> > > > [2] > >> > > > >> > > > https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html > >> > > > >> > > > On Thursday, 14 April 2022 14:15:53 CEST Yun Tang wrote: > >> > > > > Hi David Anderson, > >> > > > > > >> > > > > > >> > > > > > >> > > > > I feel doubted that no motivating use case for this > >> generalization to > >> > > > > SortedMapState. From our internal stats, SQL user would use much > >> more > >> > > > > cases > >> > > > >> > > of min/max with retract functions [1] compared with interval join. > >> > > > >> > > > > From my understanding, the TemporalListState<T> is actually > >> > > > > SortedMapState<Long, List<T>>, while TemporalValueState<T> is > >> > > > > SortedMapState<Long, T>. As you can see, if we just restrict > >> > > > > SortedMapState > >> > > > >> > > with the key type as Long, all current two new interfaces > >> > > > >> > > > > could be replaced. > >> > > > > > >> > > > > > >> > > > > > >> > > > > Moreover, once we introduce temporal state, the extension would > be > >> > > > > limited. > >> > > > >> > > Apart from Long, many other types could be comparable, e.g. > >> > > > >> > > > > TimeStamp, Int, Float and so on. How could we handle these > feature > >> > > > > request after > >> > > > > TemporalState merged? I don't think introducing too many state > >> types > >> > > > >> > > is a > >> > > > >> > > > > good idea. We can only support Long type for the 1st version > when > >> > > > > introducing SortedMapState, and then extends it to many other > more > >> > > > >> > > types > >> > > > >> > > > > in > >> > > > >> > > the future. This could balance the feature requests with clean > >> > > > >> > > > > interfaces design. And thus, we can also use sorted map state in > >> the > >> > > > > popular min/max functions. > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > By the way, current FLIP lacks of consideration of the work on > >> > > > >> > > changelog > >> > > > >> > > > > state-backend once two new state types are introduced. > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > [1] > >> > > > >> > > > >> > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtim > >> > > > >> > > > > e > >> > > > >> > > > >> > /src/main/java/org/apache/flink/table/runtime/functions/aggregate/MinWith > >> > > > >> > > > > Ret ractAggFunction.java > >> > > > > > >> > > > > > >> > > > > > >> > > > > Best, > >> > > > > Yun Tang > >> > > > > ________________________________ > >> > > > > From: David Morávek <david.mora...@gmail.com> > >> > > > > Sent: Wednesday, April 13, 2022 19:50 > >> > > > > To: dev <dev@flink.apache.org> > >> > > > > Subject: Re: [DISCUSS] FLIP-220: Temporal State > >> > > > > > >> > > > > > >> > > > > > >> > > > > Here is a very naive implementation [1] from a prototype I did > few > >> > > > >> > > months > >> > > > >> > > > > back that uses list and insertion sort. Since the list is sorted > >> we > >> > > > > can > >> > > > > use > >> > > > >> > > binary search to create sub-list, that could leverage the same > thing > >> > > > >> > > > > I've described above. > >> > > > > > >> > > > > > >> > > > > > >> > > > > I think back then I didn't go for the SortedMap as it would be > >> hard to > >> > > > > implement with the current heap state backend internals and > would > >> have > >> > > > > bigger memory overhead. > >> > > > > > >> > > > > > >> > > > > > >> > > > > The ideal solution would probably use skip list [2] to lower the > >> > > > >> > > overhead > >> > > > >> > > > > of the binary search, while maintaining a reasonable memory > >> footprint. > >> > > > > Other than that it could be pretty much the same as the > prototype > >> > > > > implementation [1]. > >> > > > > > >> > > > > > >> > > > > > >> > > > > [1] > >> > > > >> > > > >> > https://github.com/dmvk/flink/blob/ecdbc774b13b515e8c0943b2c143fb1e34eca6f > >> > > > >> > > > > 0/ > >> > > > >> > > > >> > flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapTempo > >> > > > >> > > > > ral ListState.java > >> > > > > >> > > > [2] https://en.wikipedia.org/wiki/Skip_list > >> > > > > >> > > > > Best, > >> > > > > D. > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Wed, Apr 13, 2022 at 1:27 PM David Morávek < > >> david.mora...@gmail.com > >> > > > > > >> > > > > wrote: > >> > > > > > Hi David, > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > It seems to me that at least with the heap-based state > backend, > >> > > > > > readRange > >> > > > > > > >> > > > > >> is going to have to do a lot of unnecessary work to implement > >> this > >> > > > > >> isEmpty() operation, since it have will to consider the > entire > >> > > > > >> range > >> > > > > >> from > >> > > > > >> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit > >> isEmpty > >> > > > > >> method? > >> > > > > >> I'm not convinced we need it, but it would be cheaper to > >> implement. > >> > > > >> > > Or > >> > > > >> > > > > >> perhaps this join can be rewritten to not need this > operation; > >> I > >> > > > > >> haven't > >> > > > > >> thought enough about that alternative.) > >> > > > > > > >> > > > > > I think this really boils down to how the returned iterable is > >> going > >> > > > >> > > to > >> > > > >> > > > > > be > >> > > > > > implemented. Basically for checking whether state is empty, > you > >> need > >> > > > >> > > to > >> > > > >> > > > > > do > >> > > > > > something along the lines of: > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE)); > >> // > >> > > > > > basically checking `hasNext() == false` or `isEmpty()` in case > >> of > >> > > > > > `Collection` > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > Few notes: > >> > > > > > 1) It could be lazy (the underlying collection doesn't have to > >> be > >> > > > > > materialized - eg. in case of RocksDB); > >> > > > > > 2) For HeapStateBackend it depends on the underlying > >> implementation. > >> > > > > > I'd > >> > > > > > probably do something along the lines of sorted tree (eg. > >> SortedMap > >> > > > > > / > >> > > > > > NavigableMap), that allows effective range scans / range > >> deletes. > >> > > > >> > > Then > >> > > > >> > > > > > you > >> > > > > > could simply do something like (from top of the head): > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > @Value > >> > > > > > class TimestampedKey<K> { > >> > > > > > > >> > > > > > K key; > >> > > > > > long timestamap; > >> > > > > > > >> > > > > > } > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > SortedMap<TimestampedKey<K>, V> internalState; > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > Iterable<TimestampedValue<V>> readRange(long min, long max) { > >> > > > > > > >> > > > > > return toIterable(internalState.subMap(new > >> > > > > > TimestampedKey(currentKey(), > >> > > > > > > >> > > > > > min), new TimestampedKey(currentKey(), max))); > >> > > > > > } > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > This should be fairly cheap. The important bit is that the > >> returned > >> > > > > > iterator is always non-null, but could be empty. > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > Does that answer your question? > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > D. > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson < > >> > > > >> > > da...@alpinegizmo.com> > >> > > > >> > > > > > wrote: > >> > > > > >> Yun Tang and Jingsong, > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> Some flavor of OrderedMapState is certainly feasible, and I > do > >> see > >> > > > > >> some > >> > > > > >> appeal in supporting Binary**State. > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> However, I haven't seen a motivating use case for this > >> > > > >> > > generalization, > >> > > > >> > > > > >> and > >> > > > > >> would rather keep this as simple as possible. By handling > >> Longs we > >> > > > >> > > can > >> > > > >> > > > > >> already optimize a wide range of use cases. > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> David > >> > > > > >> > >> > > > > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com> > >> wrote: > >> > > > > >> > Hi David, > >> > > > > >> > > >> > > > > >> > Could you share some explanations why SortedMapState cannot > >> work > >> > > > >> > > in > >> > > > >> > > > > >> > details? I just cannot catch up what the statement below > >> means: > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > This was rejected as being overly difficult to implement in > >> a way > >> > > > > >> > that > >> > > > > >> > would cleanly leverage RocksDB’s iterators. > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > Best > >> > > > > >> > Yun Tang > >> > > > > >> > ________________________________ > >> > > > > >> > From: Aitozi <gjying1...@gmail.com> > >> > > > > >> > Sent: Tuesday, April 12, 2022 15:00 > >> > > > > >> > To: dev@flink.apache.org <dev@flink.apache.org> > >> > > > > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > Hi David > >> > > > > >> > > >> > > > > >> > I have look through the doc, I think it will be a good > >> > > > > >> > improvement > >> > > > > >> > >> > > > > >> to > >> > > > > >> > >> > > > > >> > this pattern usage, I'm interested in it. Do you have some > >> POC > >> > > > >> > > work > >> > > > >> > > > > >> > to > >> > > > > >> > share for a closer look. > >> > > > > >> > Besides, I have one question that can we support expose the > >> > > > > >> > namespace > >> > > > > >> > in > >> > > > > >> > the different state type not limited to `TemporalState`. By > >> this, > >> > > > > >> > user > >> > > > > >> > >> > > > > >> can > >> > > > > >> > >> > > > > >> > specify the namespace > >> > > > > >> > and the TemporalState is one of the special case that it > use > >> > > > > >> > timestamp > >> > > > > >> > >> > > > > >> as > >> > > > > >> > >> > > > > >> > the namespace. I think it will be more extendable. > >> > > > > >> > > >> > > > > >> > What do you think about this ? > >> > > > > >> > > >> > > > > >> > Best, > >> > > > > >> > Aitozi. > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > David Anderson <dander...@apache.org> 于2022年4月11日周一 > 20:54写道: > >> > > > > >> > > >> > > > > >> > > Greetings, Flink developers. > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > I would like to open up a discussion of a proposal [1] to > >> add a > >> > > > > >> > > new > >> > > > > >> > >> > > > > >> kind > >> > > > > >> > >> > > > > >> > of > >> > > > > >> > > >> > > > > >> > > state to Flink. > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > The goal here is to optimize a fairly common pattern, > >> which is > >> > > > > >> > > using > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > MapState<Long, List<Event>> > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > to store lists of events associated with timestamps. This > >> > > > >> > > pattern > >> > > > >> > > > > >> > > is > >> > > > > >> > >> > > > > >> used > >> > > > > >> > >> > > > > >> > > internally in quite a few operators that implement > sorting > >> and > >> > > > > >> > > joins, > >> > > > > >> > >> > > > > >> and > >> > > > > >> > >> > > > > >> > > it also shows up in user code, for example, when > >> implementing > >> > > > > >> > > custom > >> > > > > >> > > windowing in a KeyedProcessFunction. > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC > >> that > >> > > > > >> > > achieves > >> > > > > >> > >> > > > > >> a > >> > > > > >> > >> > > > > >> > > more than 2x improvement in throughput when performing > >> these > >> > > > > >> > >> > > > > >> operations > >> > > > > >> > >> > > > > >> > on > >> > > > > >> > > >> > > > > >> > > RocksDB by better leveraging the capabilities of the > >> RocksDB > >> > > > >> > > state > >> > > > >> > > > > >> > backend. > >> > > > > >> > > >> > > > > >> > > See FLIP-220 [1] for details. > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > Best, > >> > > > > >> > > David > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD > >> > > > > >> > > > Dr. Nico Kruber | Solutions Architect > >> > > > > >> > > > Follow us @VervericaData Ververica > >> > > > -- > >> > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >> > > > -- > >> > > > Ververica GmbH > >> > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> > > > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, > Karl > >> > > > Anton > >> > > > Wehner > >> > > > >> > > -- > >> > > Dr. Nico Kruber | Solutions Architect > >> > > > >> > > Follow us @VervericaData Ververica > >> > > -- > >> > > Join Flink Forward - The Apache Flink Conference > >> > > Stream Processing | Event Driven | Real Time > >> > > -- > >> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >> > > -- > >> > > Ververica GmbH > >> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> > > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl > >> Anton > >> > > Wehner > >> > >> > >> -- > >> Dr. Nico Kruber | Solutions Architect > >> > >> Follow us @VervericaData Ververica > >> -- > >> Join Flink Forward - The Apache Flink Conference > >> Stream Processing | Event Driven | Real Time > >> -- > >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > >> -- > >> Ververica GmbH > >> Registered at Amtsgericht Charlottenburg: HRB 158244 B > >> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl > Anton > >> Wehner > >> > >> > >> >