Hello, As I need to generate the same number of keys as that of partitions, I also suffer from this problem [1]: My current solution is to generate enough keys until I have at least one key per partition, which looks very stupid to me (I copy and paste my code below). If Flink changes its way to compute a partition from a given key for keyBy operator, I need to modify my vulnerable key generator. Stephan once mentioned that a custom partitioner for keyBy can make things complicated in [2].
Hope Flink can provide a way to specify a custom partitioner for keyBy. I know Flink is primarily targeting data intensive applications as mentioned in [3], but compute-intensive applications (especially from the MachineLearning/DeepLearning domain) can require this feature for evenly distributing a small number of keys over another small number of partitions. Below is my *vulnerable* key generator written in Scala: >>>> import org.apache.flink.util.MathUtils import scala.collection.mutable class KeyGenerator(val partitions: Int, val maxPartitions: Int) { def this(partitions: Int) = this(partitions, 128) val ids = Stream.from(1).iterator val cache = mutable.HashMap[Int, mutable.Queue[Int]]() def next(targetPartition: Int): Int = { val queue = cache.getOrElseUpdate(targetPartition, mutable.Queue[Int]()) if (queue.size == 0) { var found = false while (!found) { val id = ids.next val partition = (MathUtils.murmurHash(id) % maxPartitions) * partitions / maxPartitions cache .getOrElseUpdate(partition, mutable.Queue[Int]()) .enqueue(id) if (partition == targetPartition) { found = true } } } queue.dequeue() } } <<<< I use it like this: >>>> import org.apache.flink.runtime.state.KeyGroupRangeAssignment ... val numPartitions = 10 val numKeys = 10 val parallelism = 10 val keyGenerator = new KeyGenerator(numPartitions, KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism)) val desiredKeys = (0 until numKeys) map idGen.next ... <<<< Thanks, [1] https://image.slidesharecdn.com/pdm-with-apache-flink-flinkforward-170919021613/95/predictive-maintenance-with-deep-learning-and-apache-flink-41-638.jpg?cb=1505787617 [2] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-tt5379.html#a5389 [3] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-td8481.htm - Dongwon On Thu, Nov 2, 2017 at 8:00 PM, m@xi <makisnt...@gmail.com> wrote: > Hello Tony, > > Thanks a lot for your answer. Now I know exactly what happens with keyBy > function, yet still I haven't figured out a proper (non hard coded way) to > deterministically send a tuple to each key. > > If somenone from the Flink team could help it would be great! > > Max > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >