Hi Nico,

I did not mean that we need to support all APIs in NavigableMap, and it is 
indeed too heavily to implement them all.
Moreover, I also prefer iterator-like API instead of the original #tailMap like 
API. I just use NavigableMap's API to show examples.

I think we can introduce new APIs like:

SortedMapState<UK, UV> extends State
  Map.Entry<UK, UV> firstEntry() throws Exception;
  Map.Entry<UK, UV> lastEntry() throws Exception;
  Iterator<Map.Entry<UK, UV>> headIterator(UK endUserKey) throws Exception;
  Iterator<Map.Entry<UK, UV>> tailIterator(UK startUserKey) throws Exception;
  Iterator<Map.Entry<UK, UV>> subIterator(UK startUserKey, UK endUserKey) 
throws Exception;

Since SortedMapState has several new APIs, I prefer to introduce new state 
descriptor to distinguish with the original map state.

For the API of SortedMapOfListsState, I would not be strong against, and I just 
want to know the actual benefits if we really want to introduce API.

When talking about the part of changelog state backend, my concern is about how 
to group keys together in the changelog logger.
I can share a problem, and before this I need to spend some time on how to 
implement serializer to keep the order of serialized bytes same as original 
java objects first.
For the fixed-length serializer, such as LongSerializer, we just need to ensure 
all numbers are positive or inverting the sign bit.
However, for non-fixed-length serializer, such as StringSerializer, it will 
write the length of the bytes first, which will break the natural order if 
comparing the bytes. Thus, we might need to avoid to write the length in the 
serialized bytes.
On the other hand, changelog logger would record operation per key one by one 
in the logs. We need to consider how to distinguish each key in the combined 
serialized byte arrays.

Best
Yun Tang

________________________________
From: Nico Kruber <n...@apache.org>
Sent: Thursday, April 21, 2022 23:50
To: dev <dev@flink.apache.org>
Cc: David Morávek <david.mora...@gmail.com>; Yun Tang <myas...@live.com>
Subject: Re: [DISCUSS] FLIP-220: Temporal State

Thanks Yun Tang for your clarifications.
Let me keep my original structure and reply in these points...

3. Should we generalise the Temporal***State to offer arbitrary key types and
not just Long timestamps?

The use cases you detailed do indeed look similar to the ones we were
optimising in our TemporalState PoC...

I don't think, I'd like to offer a full implementation of NavigableMap though
because that seems quite some overhead to implement while we can cover the
mentioned examples with the proposed APIs already when using iterators as well
as single-value retrievals.
So far, when we were iterating from the smallest key, we could just use
Long.MIN_VALUE and start from there. That would be difficult to generalise for
arbitrary data types because you may not always know the smallest possible
value for a certain serialized type (unless we put this into the appropriate
serializer interface).

I see two options here:
a) a slim API but using NULL as an indicator for smallest/largest depending on
the context, e.g.
  - `readRange(null, key)` means from beginning to key
  - `readRange(key, null)` means from key to end
  - `readRange(null, null)` means from beginning to end
  - `value[AtOr]Before(null)` means largest available key
  - `value[AtOr]After(null)` means smallest available key
b) a larger API with special methods for each of these use cases similar to
what NavigableMap has but based on iterators and single-value functions only

> BTW, I prefer to introduce another state descriptor instead of current map
> state descriptor.

Can you elaborate on this? We currently don't need extra functionality, so
this would be a plain copy of the MapStateDescriptor...

> For the API of SortedMapOfListsState, I think this is a bit bounded to
> current implementation of RocksDB state-backend.

Actually, I don't think this is special to RocksDB but generic to all state
backends that do not hold values in memory and allow fast append-like
operations.
Additionally, since this is a very common use case and RocksDB is also widely
used, I wouldn't want to continue without this specialization. For a similar
reason, we offer ListState and not just ValueState<List>...


4. ChangelogStateBackend

> For the discussion of ChangelogStateBackend, you can think of changelog
> state-backend as a write-ahead-log service. And we need to record the
> changes to any state, thus this should be included in the design doc as we
> need to introduce another kind of state, especially you might need to
> consider how to store key bytes serialized by the new serializer (as we
> might not be able to write the length in the beginning of serialized bytes
> to make the order of bytes same as natural order).

Since the ChangelogStateBackend "holds the working state in the underlying
delegatedStateBackend, and forwards state changes to State Changelog", I
honestly still don't see how this needs special handling. As long as the
delegated state backend suppors sorted state, ChangelogStateBackend doesn't
have to do anything special except for recording changes to state. Our PoC
simply uses the namespace for these keys and that's the same thing the Window
API is already using - so there's nothing special here. The order in the log
doesn't have to follow the natural order because this is only required inside
the delegatedStateBackend, isn't it?


Nico

On Wednesday, 20 April 2022 17:03:11 CEST Yun Tang wrote:
> Hi Nico,
>
> Thanks for your clarification.
> For the discussion about generalizing Temporal state to sorted map state. I
> could give some examples of how to use sorted map state in min/max with
> retract functions.

> As you know, NavigableMap in java has several APIs like:
>
>     Map.Entry<K,V> firstEntry();
>     Map.Entry<K,V> lastEntry();
>     NavigableMap<K,V> tailMap(K fromKey, boolean inclusive)
>
> The #firstEntry API could be used in MinWithRetractAggFunction#updateMin,
> #lastEntry could be used in MaxWithRetractAggFunction#updateMax, and
> #tailMap could be used in FirstValueWithRetractAggFunction#retract.
 If we
> can introduce SortedMap-like state, these functions could be benefited.
> BTW, I prefer to introduce another state descriptor instead of current map
> state descriptor.
> For the API of SortedMapOfListsState, I think this is a bit bounded to
> current implementation of RocksDB state-backend.

> For the discussion of ChangelogStateBackend, you can think of changelog
> state-backend as a write-ahead-log service. And we need to record the
> changes to any state, thus this should be included in the design doc as we
> need to introduce another kind of state, especially you might need to
> consider how to store key bytes serialized by the new serializer (as we
> might not be able to write the length in the beginning of serialized bytes
> to make the order of bytes same as natural order).

> Best
> Yun Tang.
> ________________________________
> From: Nico Kruber <n...@apache.org>
> Sent: Wednesday, April 20, 2022 0:38
> To: dev <dev@flink.apache.org>
> Cc: Yun Tang <myas...@live.com>; David Morávek <david.mora...@gmail.com>
> Subject: Re: [DISCUSS] FLIP-220: Temporal State
>
> Hi all,
> I have read the discussion points from the last emails and would like to
> add
 my two cents on what I believe are the remaining points to solve:
> 1. Do we need a TemporalValueState?
>
> I guess, this all boils down to dealing with duplicates / values for the
> same
 timestamp. Either you always have to account for them and thus always
> have to store a list anyway, or you only need to join with "the latest" (or
> the only) value for a given timestamp and get a nicer API and lower
> overhead for that use case.
> At the easiest, you can make an assumption that there is only a single
> value
 for each timestamp by contract, e.g. by increasing the timestamp
> precision and interpreting them as nanoseconds, or maybe milliseconds are
> already good enough. If that contract breaks, however, you will get into
> undefined behaviour.
> The TemporalRowTimeJoinOperator, for example, currently just assumes that
> there is only a single value on the right side of the join (rightState) and
> I
 believe many use cases can make that assumption or otherwise you'd have
> to define the expected behaviour for multiple values at the same timestamp,
> e.g. "join with the most recent value at the time of the left side and if
> there are multiple values, choose X".
>
> I lean towards having a ValueState implementation as well (in addition to
> lists).
>
>
> 2. User-facing API (Iterators vs. valueAtOr[Before|After])
>
> I like the iterable-based APIs that David M was proposing, i.e.
> - Iterable<TimestampedValue<T>> readRange(long minTimestamp, long
> limitTimestamp);
> - void clearRange(long minTimestamp, long limitTimestamp);
>
> However, I find Iterables rather cumbersome to work with if you actually
> only
 need a single value, e.g. the most recent one.
> For iterating over a range of values, however, they feel more natural to me
> than our proposal.
>
> Actually, if we generalise the key type (see below), we may also need to
> offer
 additional value[Before|After] functions to cover "+1" iterations
> where we cannot simply add 1 as we do now.
>
> (a) How about offering both Iterables and
> value[AtOrBefore|AtOrAfter|Before|
 After]?
> This would be similar to what NavigableMap [2] is offering but with a more
> explicit API than "ceiling", "floor",...
>
> (b) Our API proposal currently also allows iterating backwards which is not
> covered by the readRange proposal - we could, however, just do that if
> minTimestamp > limitTimestamp). What do you think?
>
> (c) When implementing the iterators, I actually also see two different
> modes
 which may differ in performance: I call them iteration with eager
> vs. lazy value retrieval. Eager retrieval may retrieve all values in a
> range at once and make them available in memory, e.g. for smaller data sets
> similar to what TemporalRowTimeJoinOperator is doing for the right side of
> the join. This can be spare a lot of Java<->JNI calls and let RocksDB
> iterate only once (as long at things fit into memory). Lazy retrieval would
> fetch results one-by-one. -> We could set one as default and allow the user
> to override that behaviour.
>
> 3. Should we generalise the Temporal***State to offer arbitrary key types
> and
 not just Long timestamps?
>
> @Yun Tang: can you describe in more detail where you think this would be
> needed for SQL users? I don't quite get how this would be beneficial. The
> example you linked doesn't quite show the same behaviour.
>
> Other than this, I could see that you can leverage such a generalisation
> for
 arbitrary joins between, for example, IDs and ID ranges which don't
> have a time component attached to it. Given that this shouldn't be too
> difficult to expose (the functionality has to exist anyway, but otherwise
> buried into Flink's internals). We'd just have to find suitable names.
>
> (a) I don't think TemporalListState<T> is actually SortedMapState<Long,
> List<T>> because we need efficient "add to list" primitives which cannot
> easily be made available with a single generic SortedMapState...
>
> (b) So the most expressive (yet kind-of ugly) names could be
> - SortedMapState<Long, ValueType>
> - SortedMapOfListsState<Long, List<ValueType>>
>
> (c) For both of these, we could then re-use the existing MapStateDescriptor
> to
 define key and value/list-element types and require that the key type /
> serializer implements a certain RetainingSortOrderSerializer interface (and
> think about a better name for this) which defines the contract that the
> binary sort order is the same as the Java Object one.
> -> that can also be verified at runtime to fail early.
>
>
> 4. ChangelogStateBackend: we don't think this needs special attention - it
> is
 just delegating to the other backends anyway and these methods are
> already adapted in our POC code
>
>
> @David M, Yun Tang: let me/us know what you think about these proposals
>
>
>
> Nico
>
>
> [2] https://docs.oracle.com/javase/8/docs/api/java/util/NavigableMap.html
>
> On Thursday, 14 April 2022 14:15:53 CEST Yun Tang wrote:
>
> > Hi David Anderson,
> >
> >
> >
> > I feel doubted that no motivating use case for this generalization to
> > SortedMapState. From our internal stats, SQL user would use much more
> > cases
 of min/max with retract functions [1] compared with interval join.
>
>
> > From my understanding, the TemporalListState<T> is actually
> > SortedMapState<Long, List<T>>, while TemporalValueState<T> is
> > SortedMapState<Long, T>. As you can see, if we just restrict
> > SortedMapState
 with the key type as Long, all current two new interfaces
> > could be replaced.
>
>
>
> > Moreover, once we introduce temporal state, the extension would be
> > limited.
 Apart from Long, many other types could be comparable, e.g.
> > TimeStamp, Int, Float and so on. How could we handle these feature
> > request after
> > TemporalState merged? I don't think introducing too many state types is a
> > good idea. We can only support Long type for the 1st version when
> > introducing SortedMapState, and then extends it to many other more types
> > in
 the future. This could balance the feature requests with clean
> > interfaces design. And thus, we can also use sorted map state in the
> > popular min/max functions.
>
>
>
> >
> >
> > By the way, current FLIP lacks of consideration of the work on changelog
> > state-backend once two new state types are introduced.
>
>
>
> >
> >
> > [1]
> > https://github.com/apache/flink/blob/master/flink-table/flink-table-runtim
> > e
> > /src/main/java/org/apache/flink/table/runtime/functions/aggregate/MinWith
> > Ret ractAggFunction.java
>
>
>
> > Best,
> > Yun Tang
> > ________________________________
> > From: David Morávek <david.mora...@gmail.com>
> > Sent: Wednesday, April 13, 2022 19:50
> > To: dev <dev@flink.apache.org>
> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> >
> >
> >
> > Here is a very naive implementation [1] from a prototype I did few months
> > back that uses list and insertion sort. Since the list is sorted we can
> > use
 binary search to create sub-list, that could leverage the same thing
> > I've described above.
> >
> >
> >
> > I think back then I didn't go for the SortedMap as it would be hard to
> > implement with the current heap state backend internals and would have
> > bigger memory overhead.
> >
> >
> >
> > The ideal solution would probably use skip list [2] to lower the overhead
> > of the binary search, while maintaining a reasonable memory footprint.
> > Other than that it could be pretty much the same as the prototype
> > implementation [1].
> >
> >
> >
> > [1]
> > https://github.com/dmvk/flink/blob/ecdbc774b13b515e8c0943b2c143fb1e34eca6f
> > 0/
> > flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapTempo
> > ral ListState.java
>
>  [2] https://en.wikipedia.org/wiki/Skip_list
>
> >
> >
> > Best,
> > D.
> >
> >
> >
> > On Wed, Apr 13, 2022 at 1:27 PM David Morávek <david.mora...@gmail.com>
> > wrote:
> >
> >
> >
> >
> > > Hi David,
> > >
> > >
> > >
> > >
> > >
> > > It seems to me that at least with the heap-based state backend,
> > > readRange
> > >
> > >
> > >
> > >> is going to have to do a lot of unnecessary work to implement this
> > >> isEmpty() operation, since it have will to consider the entire range
> > >> from
> > >> MIN_VALUE to MAX_VALUE. (Maybe we should add an explicit isEmpty
> > >> method?
> > >> I'm not convinced we need it, but it would be cheaper to implement. Or
> > >> perhaps this join can be rewritten to not need this operation; I
> > >> haven't
> > >> thought enough about that alternative.)
> > >>
> > >>
> > >>
> > >
> > >
> > >
> > >
> > >
> > > I think this really boils down to how the returned iterable is going to
> > > be
> > > implemented. Basically for checking whether state is empty, you need to
> > > do
> > > something along the lines of:
> > >
> > >
> > >
> > >
> > >
> > > Iterables.isEmpty(state.readRange(Long.MIN_VALUE, MAX_VALUE)); //
> > > basically checking `hasNext() == false` or `isEmpty()` in case of
> > > `Collection`
> > >
> > >
> > >
> > >
> > >
> > > Few notes:
> > > 1) It could be lazy (the underlying collection doesn't have to be
> > > materialized - eg. in case of RocksDB);
> > > 2) For HeapStateBackend it depends on the underlying implementation.
> > > I'd
> > > probably do something along the lines of sorted tree (eg. SortedMap /
> > > NavigableMap), that allows effective range scans / range deletes. Then
> > > you
> > > could simply do something like (from top of the head):
> > >
> > >
> > >
> > >
> > >
> > > @Value
> > > class TimestampedKey<K> {
> > >
> > >
> > >
> > >   K key;
> > >   long timestamap;
> > >
> > >
> > >
> > > }
> > >
> > >
> > >
> > >
> > >
> > > SortedMap<TimestampedKey<K>, V> internalState;
> > >
> > >
> > >
> > >
> > >
> > > Iterable<TimestampedValue<V>> readRange(long min, long max) {
> > >
> > >
> > >
> > >   return toIterable(internalState.subMap(new
> > >   TimestampedKey(currentKey(),
> > >
> > >
> > >
> > > min), new TimestampedKey(currentKey(), max)));
> > > }
> > >
> > >
> > >
> > >
> > >
> > > This should be fairly cheap. The important bit is that the returned
> > > iterator is always non-null, but could be empty.
> > >
> > >
> > >
> > >
> > >
> > > Does that answer your question?
> > >
> > >
> > >
> > >
> > >
> > > D.
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Apr 13, 2022 at 12:21 PM David Anderson <da...@alpinegizmo.com>
> > > wrote:
> > >
> > >
> > >
> > >
> > >
> > >> Yun Tang and Jingsong,
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> Some flavor of OrderedMapState is certainly feasible, and I do see
> > >> some
> > >> appeal in supporting Binary**State.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> However, I haven't seen a motivating use case for this generalization,
> > >> and
> > >> would rather keep this as simple as possible. By handling Longs we can
> > >> already optimize a wide range of use cases.
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> David
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Tue, Apr 12, 2022 at 9:21 AM Yun Tang <myas...@live.com> wrote:
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> >  Hi David,
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Could you share some explanations why SortedMapState cannot work in
> > >> > details? I just cannot catch up what the statement below means:
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > This was rejected as being overly difficult to implement in a way
> > >> > that
> > >> > would cleanly leverage RocksDB’s iterators.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Best
> > >> > Yun Tang
> > >> > ________________________________
> > >> > From: Aitozi <gjying1...@gmail.com>
> > >> > Sent: Tuesday, April 12, 2022 15:00
> > >> > To: dev@flink.apache.org <dev@flink.apache.org>
> > >> > Subject: Re: [DISCUSS] FLIP-220: Temporal State
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Hi David
> > >> >
> > >> >
> > >> >
> > >> >      I have look through the doc, I think it will be a good
> > >> >      improvement
> > >>
> > >>
> > >>
> > >> to
> > >>
> > >>
> > >>
> > >> > this pattern usage, I'm interested in it. Do you have some POC work
> > >> > to
> > >> > share for a closer look.
> > >> > Besides, I have one question that can we support expose the
> > >> > namespace
> > >> > in
> > >> > the different state type not limited to `TemporalState`. By this,
> > >> > user
> > >>
> > >>
> > >>
> > >> can
> > >>
> > >>
> > >>
> > >> > specify the namespace
> > >> > and the TemporalState is one of the special case that it use
> > >> > timestamp
> > >>
> > >>
> > >>
> > >> as
> > >>
> > >>
> > >>
> > >> > the namespace. I think it will be more extendable.
> > >> >
> > >> >
> > >> >
> > >> >     What do you think about this ?
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > Best,
> > >> > Aitozi.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > David Anderson <dander...@apache.org> 于2022年4月11日周一 20:54写道:
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > > Greetings, Flink developers.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > I would like to open up a discussion of a proposal [1] to add a
> > >> > > new
> > >>
> > >>
> > >>
> > >> kind
> > >>
> > >>
> > >>
> > >> > of
> > >> >
> > >> >
> > >> >
> > >> > > state to Flink.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > The goal here is to optimize a fairly common pattern, which is
> > >> > > using
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > MapState<Long, List<Event>>
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > to store lists of events associated with timestamps. This pattern
> > >> > > is
> > >>
> > >>
> > >>
> > >> used
> > >>
> > >>
> > >>
> > >> > > internally in quite a few operators that implement sorting and
> > >> > > joins,
> > >>
> > >>
> > >>
> > >> and
> > >>
> > >>
> > >>
> > >> > > it also shows up in user code, for example, when implementing
> > >> > > custom
> > >> > > windowing in a KeyedProcessFunction.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > Nico Kruber, Seth Wiesman, and I have implemented a POC that
> > >> > > achieves
> > >>
> > >>
> > >>
> > >> a
> > >>
> > >>
> > >>
> > >> > > more than 2x improvement in throughput when performing these
> > >>
> > >>
> > >>
> > >> operations
> > >>
> > >>
> > >>
> > >> > on
> > >> >
> > >> >
> > >> >
> > >> > > RocksDB by better leveraging the capabilities of the RocksDB state
> > >> >
> > >> >
> > >> >
> > >> > backend.
> > >> >
> > >> >
> > >> >
> > >> > >
> > >> > >
> > >> > >
> > >> > > See FLIP-220 [1] for details.
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > Best,
> > >> > > David
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > [1] https://cwiki.apache.org/confluence/x/Xo_FD
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >> >
> > >> >
> > >>
> > >>
> > >>
> > >
> > >
> > >
>
>
> Dr. Nico Kruber | Solutions Architect
>
> Follow us @VervericaData Ververica
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> Wehner
>
>


--
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Reply via email to