Hi, I am trying to implement a variation of the issue FLINK-1725 [1]. Basically, I want to have the frequency of the keys, and if there is skewness I will split this key into different key-groups.
My first tests (word count) are working. I introduced this new operator (keyByPartial) [2][3], then I decide at the KeyGroupStreamPartitioner which strategy to pick [4]. My problem is that I am using two "IFrequency frequency" objects in two different places. The first is at the AbstractKeyedStateBackend [5] and the second is in the StreamPartitioner [6]. Then I compute their result at the KeyGroupRangeAssignment [7]. I would like to have just one instance of my "IFrequency frequency" for the keyByPartial transformation. Is that possible? Where do you suggest to put it? If I was not very clear on my question, please let me know. [1] https://issues.apache.org/jira/browse/FLINK-1725 [2] https://github.com/felipegutierrez/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/partitioning/WordCountKeyPartitioning.java#L108 [3] https://github.com/felipegutierrez/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L320 [4] https://github.com/felipegutierrez/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L70 [5] https://github.com/felipegutierrez/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java#L181 [6] https://github.com/felipegutierrez/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitioner.java#L39 [7] https://github.com/felipegutierrez/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L89 Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*