Hi,

Thanks for bringing this discussion.

I think limiting the key type to Long can't resolve the comparison problem,
because the bytes order and value order of negative numbers is different.
Unless, we limit the key type to positive Long. But how to check this
before submitting a job?

In Blink code, we keep different sorting behavior in different
statebackends.
We also supported sorted map state for various key types (almost all the
atomic types).
The idea is serializing the given type value into an ordered bytes, see
more:

https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered

Best,
Jark

On Thu, 15 Oct 2020 at 06:46, Sean Z <xingha...@gmail.com> wrote:

> Thanks for the reply! Look forward to learning more about this prototype.
> Is there any way that we could track this TemporalState like Jira issue? or
> should we start to create one in Jira? so anyone has interest like me,
> could be part of the loop. Besides, is there any written docs/code about
> the prototype so we could have more context about that?
>
> Best,
> Xinghan
>
>
> On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
> wrote:
>
> > I'm very interested in this topic, and have even done some prototyping of
> > solution 1 -- limiting the key type to Long -- which Nico Kruber and I
> > called TemporalState in our prototype.
> >
> > I look forward to sharing what we learned, and to discussing this
> further,
> > but I am completely overwhelmed with Flink Forward preparations at the
> > moment.
> >
> > Best,
> > David
> >
> > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xingha...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > Current DataStream API doesn't have SortedMapState supported. There are
> > > lots of use cases based on sorted time-series data like range-query or
> > > higher/lower key fetch, and ordered data seems like a nature of
> > time-series
> > > stream processing. Therefore, we propose to support the
> > KeyedSortedMapState
> > > feature.
> > >
> > > There were some previous discussions [1] about SortedMapState, and the
> > > thread was closed because blink code might cover this feature. However,
> > the
> > > blink code[2] wasn't merged into the master branch since then. The
> major
> > > concern is the inconsistent comparison between heap/off-heap state
> > > backends. In RocksDB, the comparison should be based on bytes, which
> > makes
> > > generic key types support challenging, and in heap state backend, the
> > > comparison is more about Comparable interface.
> > >
> > > There are two possible solutions to this issue in my opinion,
> > > 1. We could limit the key type to Long type, for most of the use cases
> > are
> > > about timestamp as a key. It's easier to implement but brings
> limitations
> > > to support generic key types.
> > > 2. We keep the different sorting behavior of different state backends
> and
> > > set it to bytes comparison for given serialization by default in
> off-heap
> > > state backends. Let users provide their own specific serializer if they
> > > want to sort some customized type on RocksDB.
> > >
> > > Look forward to having some discussions about this feature. Please
> share
> > > your ideas if anyone has context on this. Thanks!
> > >
> > > Best,
> > > Xinghan
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-6219
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
> > >
> >
>

Reply via email to