Hi Max, Thanks for the answer. I needed to ensure that in a parallel window operation (which relies on a KeyedStream) each partition contains a single key, in the output stream of the window. I can obtain this using a customPartitioner just after the window, but relying on the partitioner of the keyedStream could avoid the later transformation. I was just wondering if there was a particular reason to limit the partitioner of the KeyedStream to a HashPartitioner. I have no problems of bottleneck or performances anyway.
Best regards. > Le 12 août 2016 à 12:06, Maximilian Michels <m...@apache.org> a écrit : > > Hi Philippe, > > There is no particular reason other than hash partitioning is a > sensible default for most users. It seems like this is rarely an > issue. When the number of keys is close to the parallelism, having > idle partitions is usually not a problem due to low data volume. I see > that it could be a problem if you had multiple "hotspot" keys but then > you will have a hard time to parallelize work load anyways. > > Does this limitation really impact performance for you or is this > question of theoretical nature? :) In any case, we could file an issue > and allow other partitioners for keyed streams. > > Best, > Max > > > On Thu, Aug 11, 2016 at 10:53 PM, Philippe Caparroy > <philippe.capar...@orange.fr> wrote: >> Hi there, >> >> It seems not possible to use some custom partitioner in the context of the >> KeyedStream, without modifying the KeyedStream. >> >> >> protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) >> { >> throw new UnsupportedOperationException("Cannot override partitioning for >> KeyedStream."); >> } >> >> In some particular situations, such as when the keys number is close to the >> partitions number, and small, using the >> keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> >> >> might results in collisions in the partition indexes (and hence empty >> partitions) assigned by the HashPartitioner that is imposed to the >> KeyedStream : >> >> public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> >> keySelector, TypeInformation<KEY> keyType) { >> super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( >> dataStream.getTransformation(), new HashPartitioner<>(keySelector))); >> this.keySelector = keySelector; >> this.keyType = keyType; >> } >> >> due to the characteristics of the underlying (any) hash function : >> >> returnArray[0] = MathUtils.murmurHash(key.hashCode()) % >> numberOfOutputChannels; >> >> Is there a particular reason to force the KeyedStream to use a >> HashPartitioner? >> >> Thanks in advance and best regards. >> >> >> >> >> >> >> >>