Hi Jark,

Thanks for the reply and sharing thoughts.
Yes, negative long will make things complicated. We had the exact same
issue when implementing our own sortedMapState.
This could be solved by some special pre-defined serializers, and it looks
like that's what blink did [1] as you mentioned.

Blink implementation could be a good starting point and it would be better
to extend to support random key types as long as users provide their own
serializer, which is what I mentioned in option 2. I'm more in favor of
this approach too. But the support of random key types seems the main
blocker that prevents this Blink feature being merged into Flink, from what
I understand.

In my opinion, limiting the key type might be easier to implement and it is
actually not a bad idea to start with lite/consistent/fixed features. but
yeah I'm not sure what the Temporal State is trying to solve here for I
don't have too much context on it.


[1]
https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered/OrderedBytes.java


Best,
Xinghan

On Wed, Oct 14, 2020 at 7:19 PM Jark Wu <imj...@gmail.com> wrote:

> Hi,
>
> Thanks for bringing this discussion.
>
> I think limiting the key type to Long can't resolve the comparison problem,
> because the bytes order and value order of negative numbers is different.
> Unless, we limit the key type to positive Long. But how to check this
> before submitting a job?
>
> In Blink code, we keep different sorting behavior in different
> statebackends.
> We also supported sorted map state for various key types (almost all the
> atomic types).
> The idea is serializing the given type value into an ordered bytes, see
> more:
>
>
> https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered
>
> Best,
> Jark
>
> On Thu, 15 Oct 2020 at 06:46, Sean Z <xingha...@gmail.com> wrote:
>
> > Thanks for the reply! Look forward to learning more about this prototype.
> > Is there any way that we could track this TemporalState like Jira issue?
> or
> > should we start to create one in Jira? so anyone has interest like me,
> > could be part of the loop. Besides, is there any written docs/code about
> > the prototype so we could have more context about that?
> >
> > Best,
> > Xinghan
> >
> >
> > On Wed, Oct 14, 2020 at 11:09 AM David Anderson <da...@alpinegizmo.com>
> > wrote:
> >
> > > I'm very interested in this topic, and have even done some prototyping
> of
> > > solution 1 -- limiting the key type to Long -- which Nico Kruber and I
> > > called TemporalState in our prototype.
> > >
> > > I look forward to sharing what we learned, and to discussing this
> > further,
> > > but I am completely overwhelmed with Flink Forward preparations at the
> > > moment.
> > >
> > > Best,
> > > David
> > >
> > > On Wed, Oct 14, 2020 at 9:30 AM Sean Z <xingha...@gmail.com> wrote:
> > >
> > > > Hi devs,
> > > >
> > > > Current DataStream API doesn't have SortedMapState supported. There
> are
> > > > lots of use cases based on sorted time-series data like range-query
> or
> > > > higher/lower key fetch, and ordered data seems like a nature of
> > > time-series
> > > > stream processing. Therefore, we propose to support the
> > > KeyedSortedMapState
> > > > feature.
> > > >
> > > > There were some previous discussions [1] about SortedMapState, and
> the
> > > > thread was closed because blink code might cover this feature.
> However,
> > > the
> > > > blink code[2] wasn't merged into the master branch since then. The
> > major
> > > > concern is the inconsistent comparison between heap/off-heap state
> > > > backends. In RocksDB, the comparison should be based on bytes, which
> > > makes
> > > > generic key types support challenging, and in heap state backend, the
> > > > comparison is more about Comparable interface.
> > > >
> > > > There are two possible solutions to this issue in my opinion,
> > > > 1. We could limit the key type to Long type, for most of the use
> cases
> > > are
> > > > about timestamp as a key. It's easier to implement but brings
> > limitations
> > > > to support generic key types.
> > > > 2. We keep the different sorting behavior of different state backends
> > and
> > > > set it to bytes comparison for given serialization by default in
> > off-heap
> > > > state backends. Let users provide their own specific serializer if
> they
> > > > want to sort some customized type on RocksDB.
> > > >
> > > > Look forward to having some discussions about this feature. Please
> > share
> > > > your ideas if anyone has context on this. Thanks!
> > > >
> > > > Best,
> > > > Xinghan
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-6219
> > > > [2]
> > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java
> > > >
> > >
> >
>

Reply via email to