Hi Davood, We have done some explicit partitioning in the past, but it’s pretty fragile.
See FlinkUtils#makeKeyForOperatorIndex <https://github.com/ScaleUnlimited/flink-crawler/blob/master/src/main/java/com/scaleunlimited/flinkcrawler/utils/FlinkUtils.java#L153> Though I haven’t tried this with Flink 1.7/1.8, and I’m guessing Fabian would notice some issues if he reviewed it :) — Ken > On Apr 8, 2019, at 1:01 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Davood, > > Flink uses hash partitioning to assign keys to key groups. Each key group is > then assigned to a task for processing (a task might process multiple key > groups). > There is no way to directly assign a key to a particular key group or task. > All you can do is to experiment with different custom KeySelectors which > return keys that are hashed into different key groups. > > Best, Fabian > > Am Sa., 6. Apr. 2019 um 11:43 Uhr schrieb Congxian Qiu > <qcx978132...@gmail.com <mailto:qcx978132...@gmail.com>>: > Hi Davood > Maybe a custom KeySelector can be helpful, you can define the key used to > partition the stream. You can ref the code[1] for detail. > > [1] > https://github.com/apache/flink/blob/8d05e91945c6c8d83f9924c00890ccf350f1f36f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58 > > <https://github.com/apache/flink/blob/8d05e91945c6c8d83f9924c00890ccf350f1f36f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java#L58> > > Best, Congxian > On Apr 5, 2019, 06:35 +0800, Davood Rafiei <rafieidavo...@gmail.com > <mailto:rafieidavo...@gmail.com>>, wrote: >> Hi all, >> >> I partition DataStream (say dsA) with parallelism 2 and get KeyedStream (say >> ksA) with parallelism 2. >> Depending on my keys in dsA, one partition remains empty in ksA. >> For example when my keys are 10 and 20 in dsA, then both partitions in ksA >> are full. >> However, with keys 1000 and 1001, only one partition receives all of the >> upstream data in ksA. >> Is there any way to get information about key ranges for each downstream >> partitions? >> Or is there any way to overcome this issue? >> We can assume that I know all possible keys (in this case 2 different keys) >> in dsA and therefore I want all partitions in ksA to be fully utilized. >> >> Thanks, >> Davood >> -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra