Hi there,

It seems not possible to use some custom partitioner in the context of the 
KeyedStream, without modifying the KeyedStream.


protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
                throw new UnsupportedOperationException("Cannot override 
partitioning for KeyedStream.");
}

In some particular situations, such as when the keys number is close to the 
partitions number, and small, using the 
keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation>

might results in collisions in the partition indexes (and hence empty 
partitions) assigned by the HashPartitioner that is imposed to the KeyedStream 
: 

public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, 
TypeInformation<KEY> keyType) {
                super(dataStream.getExecutionEnvironment(), new 
PartitionTransformation<>(
                                dataStream.getTransformation(), new 
HashPartitioner<>(keySelector)));
                this.keySelector = keySelector;
                this.keyType = keyType;
        }

due to the characteristics of the underlying (any) hash function :

returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;

Is there a particular reason to force the KeyedStream to use a HashPartitioner?

Thanks in advance and best regards.





 


Reply via email to