Do we think this will be useful for users or do we first want to introduce this for internal use cases, such as the Table API/SQL runner?

Aljoscha

On 15.10.20 10:35, Sean Z wrote:
Hi Jark,

Thanks for the reply and sharing thoughts.
Yes, negative long will make things complicated. We had the exact same
issue when implementing our own sortedMapState.
This could be solved by some special pre-defined serializers, and it looks
like that's what blink did [1] as you mentioned.

Blink implementation could be a good starting point and it would be better
to extend to support random key types as long as users provide their own
serializer, which is what I mentioned in option 2. I'm more in favor of
this approach too. But the support of random key types seems the main
blocker that prevents this Blink feature being merged into Flink, from what
I understand.

In my opinion, limiting the key type might be easier to implement and it is
actually not a bad idea to start with lite/consistent/fixed features. but
yeah I'm not sure what the Temporal State is trying to solve here for I
don't have too much context on it.


[1]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java


Best,
Xinghan

On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <[email protected]> wrote:

Hi,

Thanks for bringing this discussion.

I think limiting the key type to Long can't resolve the comparison problem,
because the bytes order and value order of negative numbers is different.
Unless, we limit the key type to positive Long. But how to check this
before submitting a job?

In Blink code, we keep different sorting behavior in different
statebackends.
We also supported sorted map state for various key types (almost all the
atomic types).
The idea is serializing the given type value into an ordered bytes, see
more:


https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered

Best,
Jark

On Thu, 15 Oct 2020 at 06:46, Sean Z <[email protected]> wrote:

Thanks for the reply! Look forward to learning more about this prototype.
Is there any way that we could track this TemporalState like Jira issue?
or
should we start to create one in Jira? so anyone has interest like me,
could be part of the loop. Besides, is there any written docs/code about
the prototype so we could have more context about that?

Best,
Xinghan


On Wed, Oct 14, 2020 at 11:09 AM David Anderson <[email protected]>
wrote:

I'm very interested in this topic, and have even done some prototyping
of
solution 1 -- limiting the key type to Long -- which Nico Kruber and I
called TemporalState in our prototype.

I look forward to sharing what we learned, and to discussing this
further,
but I am completely overwhelmed with Flink Forward preparations at the
moment.

Best,
David

On Wed, Oct 14, 2020 at 9:30 AM Sean Z <[email protected]> wrote:

Hi devs,

Current DataStream API doesn't have SortedMapState supported. There
are
lots of use cases based on sorted time-series data like range-query
or
higher/lower key fetch, and ordered data seems like a nature of
time-series
stream processing. Therefore, we propose to support the
KeyedSortedMapState
feature.

There were some previous discussions [1] about SortedMapState, and
the
thread was closed because blink code might cover this feature.
However,
the
blink code[2] wasn't merged into the master branch since then. The
major
concern is the inconsistent comparison between heap/off-heap state
backends. In RocksDB, the comparison should be based on bytes, which
makes
generic key types support challenging, and in heap state backend, the
comparison is more about Comparable interface.

There are two possible solutions to this issue in my opinion,
1. We could limit the key type to Long type, for most of the use
cases
are
about timestamp as a key. It's easier to implement but brings
limitations
to support generic key types.
2. We keep the different sorting behavior of different state backends
and
set it to bytes comparison for given serialization by default in
off-heap
state backends. Let users provide their own specific serializer if
they
want to sort some customized type on RocksDB.

Look forward to having some discussions about this feature. Please
share
your ideas if anyone has context on this. Thanks!

Best,
Xinghan

[1] https://issues.apache.org/jira/browse/FLINK-6219
[2]




https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java






Reply via email to