Hi Philippe,

There is no particular reason other than hash partitioning is a
sensible default for most users. It seems like this is rarely an
issue. When the number of keys is close to the parallelism, having
idle partitions is usually not a problem due to low data volume. I see
that it could be a problem if you had multiple "hotspot" keys but then
you will have a hard time to parallelize work load anyways.

Does this limitation really impact performance for you or is this
question of theoretical nature? :) In any case, we could file an issue
and allow other partitioners for keyed streams.


On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy
<philippe.capar...@orange.fr> wrote:
> Hi there,
> It seems not possible to use some custom partitioner in the context of the
> KeyedStream, without modifying the KeyedStream.
> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner)
> {
> throw new UnsupportedOperationException("Cannot override partitioning for
> KeyedStream.");
> }
> In some particular situations, such as when the keys number is close to the
> partitions number, and small, using the
> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>
> might results in collisions in the partition indexes (and hence empty
> partitions) assigned by the HashPartitioner that is imposed to the
> KeyedStream :
> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY>
> keySelector, TypeInformation<KEY> keyType) {
> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
> dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
> this.keySelector = keySelector;
> this.keyType = keyType;
> }
> due to the characteristics of the underlying (any) hash function :
> returnArray[0] = MathUtils.murmurHash(key.hashCode()) %
> numberOfOutputChannels;
> Is there a particular reason to force the KeyedStream to use a
> HashPartitioner?
> Thanks in advance and best regards.

Reply via email to