Hi devs,

Current DataStream API doesn't have SortedMapState supported. There are
lots of use cases based on sorted time-series data like range-query or
higher/lower key fetch, and ordered data seems like a nature of time-series
stream processing. Therefore, we propose to support the KeyedSortedMapState
feature.

There were some previous discussions [1] about SortedMapState, and the
thread was closed because blink code might cover this feature. However, the
blink code[2] wasn't merged into the master branch since then. The major
concern is the inconsistent comparison between heap/off-heap state
backends. In RocksDB, the comparison should be based on bytes, which makes
generic key types support challenging, and in heap state backend, the
comparison is more about Comparable interface.

There are two possible solutions to this issue in my opinion,
1. We could limit the key type to Long type, for most of the use cases are
about timestamp as a key. It's easier to implement but brings limitations
to support generic key types.
2. We keep the different sorting behavior of different state backends and
set it to bytes comparison for given serialization by default in off-heap
state backends. Let users provide their own specific serializer if they
want to sort some customized type on RocksDB.

Look forward to having some discussions about this feature. Please share
your ideas if anyone has context on this. Thanks!

Best,
Xinghan

[1] https://issues.apache.org/jira/browse/FLINK-6219
[2]
https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java

Reply via email to