Hello,

As I need to generate the same number of keys as that of partitions, I also
suffer from this problem [1]:
My current solution is to generate enough keys until I have at least one
key per partition, which looks very stupid to me (I copy and paste my code
below).
If Flink changes its way to compute a partition from a given key for keyBy
operator, I need to modify my vulnerable key generator.
Stephan once mentioned that a custom partitioner for keyBy can make things
complicated in [2].

Hope Flink can provide a way to specify a custom partitioner for keyBy.
I know Flink is primarily targeting data intensive applications as
mentioned in [3],
but compute-intensive applications (especially from the
MachineLearning/DeepLearning domain) can require this feature for evenly
distributing a small number of keys over another small number of
partitions.

Below is my *vulnerable* key generator written in Scala:
>>>>
import org.apache.flink.util.MathUtils
import scala.collection.mutable

class KeyGenerator(val partitions: Int,
                  val maxPartitions: Int) {

  def this(partitions: Int) = this(partitions, 128)

  val ids = Stream.from(1).iterator
  val cache = mutable.HashMap[Int, mutable.Queue[Int]]()

  def next(targetPartition: Int): Int = {
    val queue = cache.getOrElseUpdate(targetPartition, mutable.Queue[Int]())
    if (queue.size == 0) {
      var found = false
      while (!found) {
        val id = ids.next
        val partition =
          (MathUtils.murmurHash(id) % maxPartitions) * partitions /
maxPartitions

        cache
          .getOrElseUpdate(partition, mutable.Queue[Int]())
          .enqueue(id)

        if (partition == targetPartition) {
          found = true
        }
      }
    }
    queue.dequeue()
  }
}
<<<<

I use it like this:
>>>>
import org.apache.flink.runtime.state.KeyGroupRangeAssignment
...
    val numPartitions = 10
    val numKeys = 10
    val parallelism = 10

    val keyGenerator = new KeyGenerator(numPartitions,
KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism))
    val desiredKeys = (0 until numKeys) map idGen.next
...
<<<<

Thanks,

[1]
https://image.slidesharecdn.com/pdm-with-apache-flink-flinkforward-170919021613/95/predictive-maintenance-with-deep-learning-and-apache-flink-41-638.jpg?cb=1505787617
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/keyBy-using-custom-partitioner-tt5379.html#a5389
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-CustomPartitionerWrapper-with-KeyedStream-td8481.htm

- Dongwon


On Thu, Nov 2, 2017 at 8:00 PM, m@xi <makisnt...@gmail.com> wrote:

> Hello Tony,
>
> Thanks a lot for your answer. Now I know exactly what happens with keyBy
> function, yet still I haven't figured out a proper (non hard coded way) to
> deterministically send a tuple to each key.
>
> If somenone from the Flink team could help it would be great!
>
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to