Sean Z created FLINK-19709:
------------------------------

             Summary: Support KeyedSortedMapState in DataStream API
                 Key: FLINK-19709
                 URL: https://issues.apache.org/jira/browse/FLINK-19709
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream, Runtime / State Backends
            Reporter: Sean Z


Current DataStream API doesn't have SortedMapState supported. There are lots of 
use cases based on sorted time-series data like range-query or higher/lower key 
fetch, and ordered data seems like a nature of time-series stream processing. 
Therefore, we propose to support the KeyedSortedMapState feature.
 
There were some previous discussions 
[FLINK-6219|https://issues.apache.org/jira/browse/FLINK-6219] about 
SortedMapState, and the thread was closed because blink code might cover this 
feature. However, the [blink 
code|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/state/keyed/KeyedSortedMapState.java]]
  wasn't merged into the master branch since then. The major concern is the 
inconsistent comparison between heap/off-heap state backends. In RocksDB, the 
comparison should be based on bytes, which makes generic key types support 
challenging, and in heap state backend, the comparison is more about Comparable 
interface.
 
There are two possible solutions to this issue as discussed in dev email list,
 
1. There is a prototype feature called TemporalState which could limit the key 
type to Long type. It makes sense for most of the use cases are about timestamp 
as a key but it brings limitations to support generic key types. We also need 
to gracefully handle negative long.
 
2. We keep the different sorting behavior of different state backends and set 
it to bytes comparison for given serialization by default in off-heap state 
backends. Let users provide their own specific serializer if they want to sort 
some customized type on RocksDB. This solution could start with blink 
implementation, which has already supported  serializing various key types 
(almost all the atomic types) into an ordered bytes. 
[code|[https://github.com/apache/flink/tree/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/typeutils/ordered]]
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to