Thanks for the discussion. I think that this is a very good question to
flink and we can benefit a lot from it. The default IntervalJoinOperator is
really inefficient. We have optimized the problem by using Rocksdb's upper
bound and lower bound which detail can refer to
https://issues.apache.org/jira/browse/FLINK-10949. A more generic way is
useful.

David Morávek <david.mora...@gmail.com> 于2022年4月22日周五 20:28写道:

> >
> > With that in mind, we could only offer a couple of selected
> > temporal/sorted
> > state implementations that are handled internally, but not really a
> > generic
> > one - even if you let the user explicitly handle binary keys...
>
>
> If we want to have a generic interface that is portable between different
> state backends and allows for all the use-cases described above,
> lexicographical binary sort sounds reasonable, because you need to be able
> to push sorting out of the JVM boundary.
>
> Only trade off I can think of is that as long as you stay within the JVM
> (heap state backend), you need to pay a slight key serialization cost,
> which is IMO ok-ish.
>
> Do you have any future state backend ideas in mind, that might not work
> with this assumption?
>
> -
>
> I'm really starting to like the idea of having a BinarySortedMapState +
> higher level / composite states.
>
> D.
>
>
> On Fri, Apr 22, 2022 at 1:58 PM David Morávek <david.mora...@gmail.com>
> wrote:
>
> > Isn't allowing a TemporalValueState just a special case of b.III? So if a
> >> user
> >> of the state wants that, then they can leverage a simple API vs. if you
> >> want
> >> fancier duplicate handling, you'd just go with TemporalListState and
> >> implement
> >> the logic you want?
> >
> >
> > Yes it is. But it IMO doesn't justify adding a new state primitive. My
> > take would be that as long as we can build TVS using other existing state
> > primitives (TLS) we should treat it as a "composite state". We currently
> > don't have a good user facing API to do that, but it could be added in
> > separate FLIP.
> >
> > eg. something along the lines of
> >
> > TemporalValueState<String> state = getRuntimeContext().getCompositeState(
> >         new CompositeStateDescriptor<>(
> >                 "composite", new TemporalValueState(type)));
> >
> > On Fri, Apr 22, 2022 at 1:44 PM Nico Kruber <n...@apache.org> wrote:
> >
> >> David,
> >>
> >> 1) Good points on the possibility to make the TemporalListState generic
> >> -> actually, if you think about it more, we are currently assuming that
> >> all
> >> state backends use the same comparison on the binary level because we
> add
> >> an
> >> appropriate serializer at an earlier abstraction level. This may not
> >> actually
> >> hold for all (future) state backends and can limit further
> >> implementations (if
> >> you think this is something to keep in mind!).
> >>
> >> So we may have to push this serializer implementation further down the
> >> stack,
> >> i.e. our current implementation is one that fits RocksDB and that
> alone...
> >>
> >> With that in mind, we could only offer a couple of selected
> >> temporal/sorted
> >> state implementations that are handled internally, but not really a
> >> generic
> >> one - even if you let the user explicitly handle binary keys...
> >>
> >>
> >> 2) Duplicates
> >>
> >> Isn't allowing a TemporalValueState just a special case of b.III? So if
> a
> >> user
> >> of the state wants that, then they can leverage a simple API vs. if you
> >> want
> >> fancier duplicate handling, you'd just go with TemporalListState and
> >> implement
> >> the logic you want?
> >>
> >>
> >>
> >> Nico
> >>
> >> On Friday, 22 April 2022 10:43:48 CEST David Morávek wrote:
> >> >  Hi Yun & Nico,
> >> >
> >> > few thoughts on the discussion
> >> >
> >> > 1) Making the TemporalListState generic
> >> >
> >> > This is just not possible with the current infrastructure w.r.t type
> >> > serializers as the sorting key *needs to be comparable on the binary
> >> level*
> >> > (serialized form).
> >> >
> >> > What I could imagine, is introducing some kind of
> >> `Sorted(List|Map)State`
> >> > with explicit binary keys. User would either have to work directly
> with
> >> > `byte[]` keys or provide a function for transforming keys into the
> >> binary
> >> > representation that could be sorted (this would have to be different
> >> from
> >> > `TypeSerializer` which could get more fancy with the binary
> >> representation,
> >> > eg. to save up space -> varints).
> >> >
> >> > This kind of interface might be really hard to grasp by the pipeline
> >> > authors. There needs to be a deeper understanding how the byte
> >> comparison
> >> > works (eg. it needs to be different from the java byte comparison
> which
> >> > compares bytes as `signed`). This could be maybe partially mitigated
> by
> >> > providing predefined `to binary sorting key` functions for the common
> >> > primitives / types.
> >> >
> >> > 2) Duplicates
> >> >
> >> > I guess, this all boils down to dealing with duplicates / values for
> the
> >> >
> >> > > same timestamp.
> >> >
> >> > We should never have duplicates. Let's try to elaborate on what having
> >> the
> >> > duplicates really means in this context.
> >> >
> >> > a) Temporal Table point of view
> >> >
> >> > There could be only a single snapshot of the table at any given point
> in
> >> > time (~ physics). If we allow for duplicates we violate this, as it's
> >> not
> >> > certain what the actual state of the table is at that point in time.
> In
> >> > case of the temporal join, what should the elements from the other
> side
> >> > join against?
> >> >
> >> > If we happen to have a duplicate, it actually brings us to b)
> causality
> >> > (which could actually answer the previous question).
> >> >
> >> > b) Causality
> >> >
> >> > When building any kind of state machine, it's important to think about
> >> > causality (if we switch the order of events, state transitions no
> longer
> >> > result in the same state). Temporal table is a specific type of the
> >> state
> >> > machine.
> >> >
> >> > There are several approaches to mitigate this:
> >> > I) nano-second precision -> the chance that two events affecting the
> >> same
> >> > thing happen at the exactly same nanosecond is negligible (from the
> >> > physical standpoint)
> >> > II) the sorting key is tuple of (timestamp, sequential id) -> an
> example
> >> > could be early firings (you get speculative results from a windowed
> >> > aggregation with timestamp = EOW, but you can easily assign the order
> in
> >> > which these have happened)
> >> > III) last write with the same timestamp wins -> this is a special case
> >> of
> >> > II) when we're sure that elements with the duplicate timestamp come in
> >> order
> >> >
> >> > c) Secondary Sort
> >> >
> >> > Handling the actual duplicates requires secondary sort key, which
> might
> >> > complicate the `to binary sorting key` interface discussed in 1) -
> >> > basically some kind of user provided duplicate handler.
> >> >
> >> > d) Temporal Value State
> >> >
> >> > The above point apply to the temporal value state as well, it just
> >> pushes
> >> > the responsibility away from the state interface. I'm still not
> >> convinced
> >> > that this is a right direction.
> >> >
> >> > -
> >> >
> >> > For most use cases I've seen, the millisecond precision is more than
> >> enough
> >> > (+ the last write wins as a fallback). Supporting the use cases where
> >> it's
> >> > not actually enough (I've seen that as well in the past, especially
> with
> >> > the early firings), might be actually a good case for a more generic
> >> form
> >> > of the state, that we've discussed in 1).
> >> >
> >> > 3) Map vs List
> >> >
> >> > I think this also boils down to the discussion of how to handle
> >> duplicates.
> >> > From the commonly accepted contracts, list implies that there could be
> >> > duplicates and map implies otherwise. One concern about `Map` is that
> it
> >> > also implies that you should be able to do a point query.
> >> >
> >> > Best,
> >> > D.
> >> >
> >> > .
> >> >
> >> > On Fri, Apr 22, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote:
> >> > > Hi Nico,
> >> > >
> >> > > I did not mean that we need to support all APIs in NavigableMap, and
> >> it is
> >> > > indeed too heavily to implement them all.
> >> > > Moreover, I also prefer iterator-like API instead of the original
> >> #tailMap
> >> > > like API. I just use NavigableMap's API to show examples.
> >> > >
> >> > > I think we can introduce new APIs like:
> >> > >
> >> > > SortedMapState<UK, UV> extends State
> >> > >
> >> > >   Map.Entry<UK, UV> firstEntry() throws Exception;
> >> > >   Map.Entry<UK, UV> lastEntry() throws Exception;
> >> > >   Iterator<Map.Entry<UK, UV>> headIterator(UK endUserKey) throws
> >> > >   Exception;
> >> > >   Iterator<Map.Entry<UK, UV>> tailIterator(UK startUserKey) throws
> >> > >
> >> > > Exception;
> >> > >
> >> > >   Iterator<Map.Entry<UK, UV>> subIterator(UK startUserKey, UK
> >> endUserKey)
> >> > >
> >> > > throws Exception;
> >> > >
> >> > > Since SortedMapState has several new APIs, I prefer to introduce new
> >> state
> >> > > descriptor to distinguish with the original map state.
> >> > >
> >> > > For the API of SortedMapOfListsState, I would not be strong against,
> >> and I
> >> > > just want to know the actual benefits if we really want to introduce
> >> API.
> >> > >
> >> > > When talking about the part of changelog state backend, my concern
> is
> >> > > about how to group keys together in the changelog logger.
> >> > > I can share a problem, and before this I need to spend some time on
> >> how to
> >> > > implement serializer to keep the order of serialized bytes same as
> >> > > original
> >> > > java objects first.
> >> > > For the fixed-length serializer, such as LongSerializer, we just
> need
> >> to
> >> > > ensure all numbers are positive or inverting the sign bit.
> >> > > However, for non-fixed-length serializer, such as StringSerializer,
> it
> >> > > will write the length of the bytes first, which will break the
> natural
> >> > > order if comparing the bytes. Thus, we might need to avoid to write
> >> the
> >> > > length in the serialized bytes.
> >> > > On the other hand, changelog logger would record operation per key
> >> one by
> >> > > one in the logs. We need to consider how to distinguish each key in
> >> the
> >> > > combined serialized byte arrays.
> >> > >
> >> > > Best
> >> > > Yun Tang
> >> > >
> >> > > ------------------------------
> >> > > *From:* Nico Kruber <n...@apache.org>
> >> > > *Sent:* Thursday, April 21, 2022 23:50
> >> > > *To:* dev <dev@flink.apache.org>
> >> > > *Cc:* David Morávek <david.mora...@gmail.com>; Yun Tang <
> >> myas...@live.com>
> >> > > *Subject:* Re: [DISCUSS] FLIP-220: Temporal State
> >> > >
> >> > > Thanks Yun Tang for your clarifications.
> >> > > Let me keep my original structure and reply in these points...
> >> > >
> >> > > 3. Should we generalise the Temporal***State to offer arbitrary key
> >> types
> >> > > and
> >> > > not just Long timestamps?
> >> > >
> >> > > The use cases you detailed do indeed look similar to the ones we
> were
> >> > > optimising in our TemporalState PoC...
> >> > >
> >> > > I don't think, I'd like to offer a full implementation of
> NavigableMap
> >> > > though
> >> > > because that seems quite some overhead to implement while we can
> >> cover the
> >> > > mentioned examples with the proposed APIs already when using
> >> iterators as
> >> > > well
> >> > > as single-value retrievals.
> >> > > So far, when we were iterating from the smallest key, we could just
> >> use
> >> > > Long.MIN_VALUE and start from there. That would be difficult to
> >> generalise
> >> > > for
> >> > > arbitrary data types because you may not always know the smallest
> >> possible
> >> > > value for a certain serialized type (unless we put this into the
> >> > > appropriate
> >> > > serializer interface).
> >> > >
> >> > > I see two options here:
> >> > > a) a slim API but using NULL as an indicator for smallest/largest
> >> > > depending on
> >> > > the context, e.g.
> >> > >
> >> > >   - `readRange(null, key)` means from beginning to key
> >> > >   - `readRange(key, null)` means from key to end
> >> > >   - `readRange(null, null)` means from beginning to end
> >> > >   - `value[AtOr]Before(null)` means largest available key
> >> > >   - `value[AtOr]After(null)` means smallest available key
> >> > >
> >> > > b) a larger API with special methods for each of these use cases
> >> similar
> >> > > to
> >> > > what NavigableMap has but based on iterators and single-value
> >> functions
> >> > > only
> >> > >
> >> > > > BTW, I prefer to introduce another state descriptor instead of
> >> current
> >> > >
> >> > > map
> >> > >
> >> > > > state descriptor.
> >> > >
> >> > > Can you elaborate on this? We currently don't need extra
> >> functionality, so
> >> > > this would be a plain copy of the MapStateDescriptor...
> >> > >
> >> > > > For the API of SortedMapOfListsState, I think this is a bit
> bounded
> >> to
> >> > > > current implementation of RocksDB state-backend.
> >> > >
> >> > > Actually, I don't think this is special to RocksDB but generic to
> all
> >> > > state
> >> > > backends that do not hold values in memory and allow fast
> append-like
> >> > > operations.
> >> > > Additionally, since this is a very common use case and RocksDB is
> also
> >> > > widely
> >> > > used, I wouldn't want to continue without this specialization. For a
> >> > > similar
> >> > > reason, we offer ListState and not just ValueState<List>...
> >> > >
> >> > >
> >> > > 4. ChangelogStateBackend
> >> > >
> >> > > > For the discussion of ChangelogStateBackend, you can think of
> >> changelog
> >> > > > state-backend as a write-ahead-log service. And we need to record
> >> the
> >> > > > changes to any state, thus this should be included in the design
> >> doc as
> >> > >
> >> > > we
> >> > >
> >> > > > need to introduce another kind of state, especially you might need
> >> to
> >> > > > consider how to store key bytes serialized by the new serializer
> >> (as we
> >> > > > might not be able to write the length in the beginning of
> serialized
> >> > >
> >> > > bytes
> >> > >
> >> > > > to make the order of bytes same as natural order).
> >> > >
> >> > > Since the ChangelogStateBackend "holds the working state in the
> >> underlying
> >> > > delegatedStateBackend, and forwards state changes to State
> >> Changelog", I
> >> > > honestly still don't see how this needs special handling. As long as
> >> the
> >> > > delegated state backend suppors sorted state, ChangelogStateBackend
> >> > > doesn't
> >> > > have to do anything special except for recording changes to state.
> >> Our PoC
> >> > > simply uses the namespace for these keys and that's the same thing
> the
> >> > > Window
> >> > > API is already using - so there's nothing special here. The order in
> >> the
> >> > > log
> >> > > doesn't have to follow the natural order because this is only
> required
> >> > > inside
> >> > > the delegatedStateBackend, isn't it?
> >> > >
> >> > >
> >> > > Nico
> >> > >
> >> > > On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote:
> >> > > > Hi Nico,
> >> > > >
> >> > > > Thanks for your clarification.
> >> > > > For the discussion about generalizing Temporal state to sorted map
> >> > >
> >> > > state. I
> >> > >
> >> > > > could give some examples of how to use sorted map state in min/max
> >> with
> >> > > > retract functions.
> >> > > >
> >> > > > As you know, NavigableMap in java has several APIs like:
> >> > > >     Map.Entry<K,V> firstEntry();
> >> > > >     Map.Entry<K,V> lastEntry();
> >> > > >     NavigableMap<K,V> tailMap(K fromKey, boolean inclusive)
> >> > > >
> >> > > > The #firstEntry API could be used in
> >> > > > MinWithRetractAggFunction#updateMin,
> >> > > > #lastEntry could be used in MaxWithRetractAggFunction#updateMax,
> and
> >> > > > #tailMap could be used in
> FirstValueWithRetractAggFunction#retract.
> >> > >
> >> > >  If we
> >> > >
> >> > > > can introduce SortedMap-like state, these functions could be
> >> benefited.
> >> > > > BTW, I prefer to introduce another state descriptor instead of
> >> current
> >> > >
> >> > > map
> >> > >
> >> > > > state descriptor.
> >> > > > For the API of SortedMapOfListsState, I think this is a bit
> bounded
> >> to
> >> > > > current implementation of RocksDB state-backend.
> >> > > >
> >> > > > For the discussion of ChangelogStateBackend, you can think of
> >> changelog
> >> > > > state-backend as a write-ahead-log service. And we need to record
> >> the
> >> > > > changes to any state, thus this should be included in the design
> >> doc as
> >> > >
> >> > > we
> >> > >
> >> > > > need to introduce another kind of state, especially you might need
> >> to
> >> > > > consider how to store key bytes serialized by the new serializer
> >> (as we
> >> > > > might not be able to write the length in the beginning of
> serialized
> >> > >
> >> > > bytes
> >> > >
> >> > > > to make the order of bytes same as natural order).
> >> > > >
> >> > > > Best
> >> > > > Yun Tang.
> >> > > > ________________________________
> >> > > > From: Nico Kruber <n...@apache.org>
> >> > > > Sent: Wednesday, April 20, 2022 0:38
> >> > > > To: dev <dev@flink.apache.org>
> >> > > > Cc: Yun Tang <myas...@live.com>; David Morávek <
> >> david.mora...@gmail.com>
> >> > > > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >> > > >
> >> > > > Hi all,
> >> > > > I have read the discussion points from the last emails and would
> >> like to
> >> > > > add
> >> > >
> >> > >  my two cents on what I believe are the remaining points to solve:
> >> > > > 1. Do we need a TemporalValueState?
> >> > > >
> >> > > > I guess, this all boils down to dealing with duplicates / values
> >> for the
> >> > > > same
> >> > >
> >> > >  timestamp. Either you always have to account for them and thus
> always
> >> > >
> >> > > > have to store a list anyway, or you only need to join with "the
> >> latest"
> >> > >
> >> > > (or
> >> > >
> >> > > > the only) value for a given timestamp and get a nicer API and
> lower
> >> > > > overhead for that use case.
> >> > > > At the easiest, you can make an assumption that there is only a
> >> single
> >> > > > value
> >> > >
> >> > >  for each timestamp by contract, e.g. by increasing the timestamp
> >> > >
> >> > > > precision and interpreting them as nanoseconds, or maybe
> >> milliseconds
> >> > > > are
> >> > > > already good enough. If that contract breaks, however, you will
> get
> >> into
> >> > > > undefined behaviour.
> >> > > > The TemporalRowTimeJoinOperator, for example, currently just
> assumes
> >> > > > that
> >> > > > there is only a single value on the right side of the join
> >> (rightState)
> >> > >
> >> > > and
> >> > >
> >> > > > I
> >> > >
> >> > >  believe many use cases can make that assumption or otherwise you'd
> >> have
> >> > >
> >> > > > to define the expected behaviour for multiple values at the same
> >> > >
> >> > > timestamp,
> >> > >
> >> > > > e.g. "join with the most recent value at the time of the left side
> >> and
> >> > > > if
> >> > > > there are multiple values, choose X".
> >> > > >
> >> > > > I lean towards having a ValueState implementation as well (in
> >> addition
> >> > > > to
> >> > > > lists).
> >> > > >
> >> > > >
> >> > > > 2. User-facing API (Iterators vs. valueAtOr[Before|After])
> >> > > >
> >> > > > I like the iterable-based APIs that David M was proposing, i.e.
> >> > > > - Iterable<TimestampedValue<T>> readRange(long minTimestamp, long
> >> > > > limitTimestamp);
> >> > > > - void clearRange(long minTimestamp, long limitTimestamp);
> >> > > >
> >> > > > However, I find Iterables rather cumbersome to work with if you
> >> actually
> >> > > > only
> >> > >
> >> > >  need a single value, e.g. the most recent one.
> >> > >
> >> > > > For iterating over a range of values, however, they feel more
> >> natural to
> >> > >
> >> > > me
> >> > >
> >> > > > than our proposal.
> >> > > >
> >> > > > Actually, if we generalise the key type (see below), we may also
> >> need to
> >> > > > offer
> >> > >
> >> > >  additional value[Before|After] functions to cover "+1" iterations
> >> > >
> >> > > > where we cannot simply add 1 as we do now.
> >> > > >
> >> > > > (a) How about offering both Iterables and
> >> > > > value[AtOrBefore|AtOrAfter|Before|
> >> > >
> >> > >  After]?
> >> > >
> >> > > > This would be similar to what NavigableMap [2] is offering but
> with
> >> a
> >> > >
> >> > > more
> >> > >
> >> > > > explicit API than "ceiling", "floor",...
> >> > > >
> >> > > > (b) Our API proposal currently also allows iterating backwards
> >> which is
> >> > >
> >> > > not
> >> > >
> >> > > > covered by the readRange proposal - we could, however, just do
> that
> >> if
> >> > > > minTimestamp > limitTimestamp). What do you think?
> >> > > >
> >> > > > (c) When implementing the iterators, I actually also see two
> >> different
> >> > > > modes
> >> > >
> >> > >  which may differ in performance: I call them iteration with eager
> >> > >
> >> > > > vs. lazy value retrieval. Eager retrieval may retrieve all values
> >> in a
> >> > > > range at once and make them available in memory, e.g. for smaller
> >> data
> >> > >
> >> > > sets
> >> > >
> >> > > > similar to what TemporalRowTimeJoinOperator is doing for the right
> >> side
> >> > >
> >> > > of
> >> > >
> >> > > > the join. This can be spare a lot of Java<->JNI calls and let
> >> RocksDB
> >> > > > iterate only once (as long at things fit into memory). Lazy
> >> retrieval
> >> > >
> >> > > would
> >> > >
> >> > > > fetch results one-by-one. -> We could set one as default and allow
> >> the
> >> > >
> >> > > user
> >> > >
> >> > > > to override that behaviour.
> >> > > >
> >> > > > 3. Should we generalise the Temporal***State to offer arbitrary
> key
> >> > > > types
> >> > > > and
> >> > >
> >> > >  not just Long timestamps?
> >> > >
> >> > > > @Yun Tang: can you describe in more detail where you think this
> >> would be
> >> > > > needed for SQL users? I don't quite get how this would be
> >> beneficial.
> >> > > > The
> >> > > > example you linked doesn't quite show the same behaviour.
> >> > > >
> >> > > > Other than this, I could see that you can leverage such a
> >> generalisation
> >> > > > for
> >> > >
> >> > >  arbitrary joins between, for example, IDs and ID ranges which don't
> >> > >
> >> > > > have a time component attached to it. Given that this shouldn't be
> >> too
> >> > > > difficult to expose (the functionality has to exist anyway, but
> >> > > > otherwise
> >> > > > buried into Flink's internals). We'd just have to find suitable
> >> names.
> >> > > >
> >> > > > (a) I don't think TemporalListState<T> is actually
> >> SortedMapState<Long,
> >> > > > List<T>> because we need efficient "add to list" primitives which
> >> cannot
> >> > > > easily be made available with a single generic SortedMapState...
> >> > > >
> >> > > > (b) So the most expressive (yet kind-of ugly) names could be
> >> > > > - SortedMapState<Long, ValueType>
> >> > > > - SortedMapOfListsState<Long, List<ValueType>>
> >> > > >
> >> > > > (c) For both of these, we could then re-use the existing
> >> > >
> >> > > MapStateDescriptor
> >> > >
> >> > > > to
> >> > >
> >> > >  define key and value/list-element types and require that the key
> >> type /
> >> > >
> >> > > > serializer implements a certain RetainingSortOrderSerializer
> >> interface
> >> > >
> >> > > (and
> >> > >
> >> > > > think about a better name for this) which defines the contract
> that
> >> the
> >> > > > binary sort order is the same as the Java Object one.
> >> > > > -> that can also be verified at runtime to fail early.
> >> > > >
> >> > > >
> >> > > > 4. ChangelogStateBackend: we don't think this needs special
> >> attention -
> >> > >
> >> > > it
> >> > >
> >> > > > is
> >> > >
> >> > >  just delegating to the other backends anyway and these methods are
> >> > >
> >> > > > already adapted in our POC code
> >> > > >
> >> > > >
> >> > > > @David M, Yun Tang: let me/us know what you think about these
> >> proposals
> >> > > >
> >> > > >
> >> > > >
> >> > > > Nico
> >> > > >
> >> > > >
> >> > > > [2]
> >> > >
> >> > >
> https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html
> >> > >
> >> > > > On Thursday, 14 April 2022 14:15:53 CEST Yun Tang wrote:
> >> > > > > Hi David Anderson,
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > I feel doubted that no motivating use case for this
> >> generalization to
> >> > > > > SortedMapState. From our internal stats, SQL user would use much
> >> more
> >> > > > > cases
> >> > >
> >> > >  of min/max with retract functions [1] compared with interval join.
> >> > >
> >> > > > > From my understanding, the TemporalListState<T> is actually
> >> > > > > SortedMapState<Long, List<T>>, while TemporalValueState<T> is
> >> > > > > SortedMapState<Long, T>. As you can see, if we just restrict
> >> > > > > SortedMapState
> >> > >
> >> > >  with the key type as Long, all current two new interfaces
> >> > >
> >> > > > > could be replaced.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > Moreover, once we introduce temporal state, the extension would
> be
> >> > > > > limited.
> >> > >
> >> > >  Apart from Long, many other types could be comparable, e.g.
> >> > >
> >> > > > > TimeStamp, Int, Float and so on. How could we handle these
> feature
> >> > > > > request after
> >> > > > > TemporalState merged? I don't think introducing too many state
> >> types
> >> > >
> >> > > is a
> >> > >
> >> > > > > good idea. We can only support Long type for the 1st version
> when
> >> > > > > introducing SortedMapState, and then extends it to many other
> more
> >> > >
> >> > > types
> >> > >
> >> > > > > in
> >> > >
> >> > >  the future. This could balance the feature requests with clean
> >> > >
> >> > > > > interfaces design. And thus, we can also use sorted map state in
> >> the
> >> > > > > popular min/max functions.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > By the way, current FLIP lacks of consideration of the work on
> >> > >
> >> > > changelog
> >> > >
> >> > > > > state-backend once two new state types are introduced.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > [1]
> >> > >
> >> > >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtim
> >> > >
> >> > > > > e
> >> > >
> >> > >
> >>
> /src/main/java/org/apache/flink/table/runtime/functions/aggregate/MinWith
> >> > >
> >> > > > > Ret ractAggFunction.java
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > Best,
> >> > > > > Yun Tang
> >> > > > > ________________________________
> >> > > > > From: David Morávek <david.mora...@gmail.com>
> >> > > > > Sent: Wednesday, April 13, 2022 19:50
> >> > > > > To: dev <dev@flink.apache.org>
> >> > > > > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > Here is a very naive implementation [1] from a prototype I did
> few
> >> > >
> >> > > months
> >> > >
> >> > > > > back that uses list and insertion sort. Since the list is sorted
> >> we
> >> > > > > can
> >> > > > > use
> >> > >
> >> > >  binary search to create sub-list, that could leverage the same
> thing
> >> > >
> >> > > > > I've described above.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > I think back then I didn't go for the SortedMap as it would be
> >> hard to
> >> > > > > implement with the current heap state backend internals and
> would
> >> have
> >> > > > > bigger memory overhead.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > The ideal solution would probably use skip list [2] to lower the
> >> > >
> >> > > overhead
> >> > >
> >> > > > > of the binary search, while maintaining a reasonable memory
> >> footprint.
> >> > > > > Other than that it could be pretty much the same as the
> prototype
> >> > > > > implementation [1].
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > [1]
> >> > >
> >> > >
> >>
> https://github.com/dmvk/flink/blob/ecdbc774b13b515e8c0943b2c143fb1e34eca6f
> >> > >
> >> > > > > 0/
> >> > >
> >> > >
> >>
> flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapTempo
> >> > >
> >> > > > > ral ListState.java
> >> > > >
> >> > > >  [2] https://en.wikipedia.org/wiki/Skip_list
> >> > > >
> >> > > > > Best,
> >> > > > > D.
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Wed, Apr 13, 2022 at 1:27 PM David Morávek <
> >> david.mora...@gmail.com
> >> > > > >
> >> > > > > wrote:
> >> > > > > > Hi David,
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > It seems to me that at least with the heap-based state
> backend,
> >> > > > > > readRange
> >> > > > > >
> >> > > > > >> is going to have to do a lot of unnecessary work to implement
> >> this
> >> > > > > >> isEmpty() operation, since it have will to consider the
> entire
> >> > > > > >> range
> >> > > > > >> from
> >> > > > > >> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit
> >> isEmpty
> >> > > > > >> method?
> >> > > > > >> I'm not convinced we need it, but it would be cheaper to
> >> implement.
> >> > >
> >> > > Or
> >> > >
> >> > > > > >> perhaps this join can be rewritten to not need this
> operation;
> >> I
> >> > > > > >> haven't
> >> > > > > >> thought enough about that alternative.)
> >> > > > > >
> >> > > > > > I think this really boils down to how the returned iterable is
> >> going
> >> > >
> >> > > to
> >> > >
> >> > > > > > be
> >> > > > > > implemented. Basically for checking whether state is empty,
> you
> >> need
> >> > >
> >> > > to
> >> > >
> >> > > > > > do
> >> > > > > > something along the lines of:
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE));
> >> //
> >> > > > > > basically checking `hasNext() == false` or `isEmpty()` in case
> >> of
> >> > > > > > `Collection`
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > Few notes:
> >> > > > > > 1) It could be lazy (the underlying collection doesn't have to
> >> be
> >> > > > > > materialized - eg. in case of RocksDB);
> >> > > > > > 2) For HeapStateBackend it depends on the underlying
> >> implementation.
> >> > > > > > I'd
> >> > > > > > probably do something along the lines of sorted tree (eg.
> >> SortedMap
> >> > > > > > /
> >> > > > > > NavigableMap), that allows effective range scans / range
> >> deletes.
> >> > >
> >> > > Then
> >> > >
> >> > > > > > you
> >> > > > > > could simply do something like (from top of the head):
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > @Value
> >> > > > > > class TimestampedKey<K> {
> >> > > > > >
> >> > > > > >   K key;
> >> > > > > >   long timestamap;
> >> > > > > >
> >> > > > > > }
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > SortedMap<TimestampedKey<K>, V> internalState;
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > Iterable<TimestampedValue<V>> readRange(long min, long max) {
> >> > > > > >
> >> > > > > >   return toIterable(internalState.subMap(new
> >> > > > > >   TimestampedKey(currentKey(),
> >> > > > > >
> >> > > > > > min), new TimestampedKey(currentKey(), max)));
> >> > > > > > }
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > This should be fairly cheap. The important bit is that the
> >> returned
> >> > > > > > iterator is always non-null, but could be empty.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > Does that answer your question?
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > D.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson <
> >> > >
> >> > > da...@alpinegizmo.com>
> >> > >
> >> > > > > > wrote:
> >> > > > > >> Yun Tang and Jingsong,
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> Some flavor of OrderedMapState is certainly feasible, and I
> do
> >> see
> >> > > > > >> some
> >> > > > > >> appeal in supporting Binary**State.
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> However, I haven't seen a motivating use case for this
> >> > >
> >> > > generalization,
> >> > >
> >> > > > > >> and
> >> > > > > >> would rather keep this as simple as possible. By handling
> >> Longs we
> >> > >
> >> > > can
> >> > >
> >> > > > > >> already optimize a wide range of use cases.
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> David
> >> > > > > >>
> >> > > > > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com>
> >> wrote:
> >> > > > > >> >  Hi David,
> >> > > > > >> >
> >> > > > > >> > Could you share some explanations why SortedMapState cannot
> >> work
> >> > >
> >> > > in
> >> > >
> >> > > > > >> > details? I just cannot catch up what the statement below
> >> means:
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > This was rejected as being overly difficult to implement in
> >> a way
> >> > > > > >> > that
> >> > > > > >> > would cleanly leverage RocksDB’s iterators.
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > Best
> >> > > > > >> > Yun Tang
> >> > > > > >> > ________________________________
> >> > > > > >> > From: Aitozi <gjying1...@gmail.com>
> >> > > > > >> > Sent: Tuesday, April 12, 2022 15:00
> >> > > > > >> > To: dev@flink.apache.org <dev@flink.apache.org>
> >> > > > > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > Hi David
> >> > > > > >> >
> >> > > > > >> >      I have look through the doc, I think it will be a good
> >> > > > > >> >      improvement
> >> > > > > >>
> >> > > > > >> to
> >> > > > > >>
> >> > > > > >> > this pattern usage, I'm interested in it. Do you have some
> >> POC
> >> > >
> >> > > work
> >> > >
> >> > > > > >> > to
> >> > > > > >> > share for a closer look.
> >> > > > > >> > Besides, I have one question that can we support expose the
> >> > > > > >> > namespace
> >> > > > > >> > in
> >> > > > > >> > the different state type not limited to `TemporalState`. By
> >> this,
> >> > > > > >> > user
> >> > > > > >>
> >> > > > > >> can
> >> > > > > >>
> >> > > > > >> > specify the namespace
> >> > > > > >> > and the TemporalState is one of the special case that it
> use
> >> > > > > >> > timestamp
> >> > > > > >>
> >> > > > > >> as
> >> > > > > >>
> >> > > > > >> > the namespace. I think it will be more extendable.
> >> > > > > >> >
> >> > > > > >> >     What do you think about this ?
> >> > > > > >> >
> >> > > > > >> > Best,
> >> > > > > >> > Aitozi.
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > David Anderson <dander...@apache.org> 于2022年4月11日周一
> 20:54写道:
> >> > > > > >> >
> >> > > > > >> > > Greetings, Flink developers.
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > I would like to open up a discussion of a proposal [1] to
> >> add a
> >> > > > > >> > > new
> >> > > > > >>
> >> > > > > >> kind
> >> > > > > >>
> >> > > > > >> > of
> >> > > > > >> >
> >> > > > > >> > > state to Flink.
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > The goal here is to optimize a fairly common pattern,
> >> which is
> >> > > > > >> > > using
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > MapState<Long, List<Event>>
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > to store lists of events associated with timestamps. This
> >> > >
> >> > > pattern
> >> > >
> >> > > > > >> > > is
> >> > > > > >>
> >> > > > > >> used
> >> > > > > >>
> >> > > > > >> > > internally in quite a few operators that implement
> sorting
> >> and
> >> > > > > >> > > joins,
> >> > > > > >>
> >> > > > > >> and
> >> > > > > >>
> >> > > > > >> > > it also shows up in user code, for example, when
> >> implementing
> >> > > > > >> > > custom
> >> > > > > >> > > windowing in a KeyedProcessFunction.
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC
> >> that
> >> > > > > >> > > achieves
> >> > > > > >>
> >> > > > > >> a
> >> > > > > >>
> >> > > > > >> > > more than 2x improvement in throughput when performing
> >> these
> >> > > > > >>
> >> > > > > >> operations
> >> > > > > >>
> >> > > > > >> > on
> >> > > > > >> >
> >> > > > > >> > > RocksDB by better leveraging the capabilities of the
> >> RocksDB
> >> > >
> >> > > state
> >> > >
> >> > > > > >> > backend.
> >> > > > > >> >
> >> > > > > >> > > See FLIP-220 [1] for details.
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > Best,
> >> > > > > >> > > David
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > >
> >> > > > > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> >> > > >
> >> > > > Dr. Nico Kruber | Solutions Architect
> >> > > >
> >> > > > Follow us @VervericaData Ververica
> >> > > > --
> >> > > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >> > > > --
> >> > > > Ververica GmbH
> >> > > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> > > > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang,
> Karl
> >> > > > Anton
> >> > > > Wehner
> >> > >
> >> > > --
> >> > > Dr. Nico Kruber | Solutions Architect
> >> > >
> >> > > Follow us @VervericaData Ververica
> >> > > --
> >> > > Join Flink Forward - The Apache Flink Conference
> >> > > Stream Processing | Event Driven | Real Time
> >> > > --
> >> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >> > > --
> >> > > Ververica GmbH
> >> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> > > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
> >> Anton
> >> > > Wehner
> >>
> >>
> >> --
> >> Dr. Nico Kruber | Solutions Architect
> >>
> >> Follow us @VervericaData Ververica
> >> --
> >> Join Flink Forward - The Apache Flink Conference
> >> Stream Processing | Event Driven | Real Time
> >> --
> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >> --
> >> Ververica GmbH
> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> >> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
> Anton
> >> Wehner
> >>
> >>
> >>
>

Reply via email to