Hi Philippe, There is no particular reason other than hash partitioning is a sensible default for most users. It seems like this is rarely an issue. When the number of keys is close to the parallelism, having idle partitions is usually not a problem due to low data volume. I see that it could be a problem if you had multiple "hotspot" keys but then you will have a hard time to parallelize work load anyways.
Does this limitation really impact performance for you or is this question of theoretical nature? :) In any case, we could file an issue and allow other partitioners for keyed streams. Best, Max On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy <philippe.capar...@orange.fr> wrote: > Hi there, > > It seems not possible to use some custom partitioner in the context of the > KeyedStream, without modifying the KeyedStream. > > > protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) > { > throw new UnsupportedOperationException("Cannot override partitioning for > KeyedStream."); > } > > In some particular situations, such as when the keys number is close to the > partitions number, and small, using the > keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> > > might results in collisions in the partition indexes (and hence empty > partitions) assigned by the HashPartitioner that is imposed to the > KeyedStream : > > public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> > keySelector, TypeInformation<KEY> keyType) { > super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( > dataStream.getTransformation(), new HashPartitioner<>(keySelector))); > this.keySelector = keySelector; > this.keyType = keyType; > } > > due to the characteristics of the underlying (any) hash function : > > returnArray[0] = MathUtils.murmurHash(key.hashCode()) % > numberOfOutputChannels; > > Is there a particular reason to force the KeyedStream to use a > HashPartitioner? > > Thanks in advance and best regards. > > > > > > > >