Hi there, It seems not possible to use some custom partitioner in the context of the KeyedStream, without modifying the KeyedStream.
protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream."); } In some particular situations, such as when the keys number is close to the partitions number, and small, using the keyBy(<keyExtractor>).window(<windowAssigner>).<windowOperation> might results in collisions in the partition indexes (and hence empty partitions) assigned by the HashPartitioner that is imposed to the KeyedStream : public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) { super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>( dataStream.getTransformation(), new HashPartitioner<>(keySelector))); this.keySelector = keySelector; this.keyType = keyType; } due to the characteristics of the underlying (any) hash function : returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels; Is there a particular reason to force the KeyedStream to use a HashPartitioner? Thanks in advance and best regards.