Hi all,
In my streaming job I'm using kafka streaming direct approach and want to
maintain state with updateStateByKey. My PairRDD has message's topic name +
partition id as a key. So, I assume that updateByState could work within
same partition as KafkaRDD and not lead to shuffles. Actually this is not
true, because updateStateByKey leads to cogroup transformation that thinks,
that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
have partitioner at all. So, dependency is considered to be wide and leads
to shuffle.

I tried to avoid shuffling by providing custom partitioner to
updateStateByKey, but KafkaRDD need to use same partitioner. For this I
created a proxy RDD that just returns my partitioner.

class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner)
extends RDD[T](prev) {

  override val partitioner = Some(part)

  override def compute(split: Partition, context: TaskContext): Iterator[T]
= prev.compute(split, context)

  override protected def getPartitions: Array[Partition] = prev.partitions

  override def getPreferredLocations(thePart: Partition): Seq[String] =
prev.preferredLocations(thePart)
}

I use it as:
val partitioner = new Partitioner {
  // TODO this should be retrieved from kafka
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
    key.asInstanceOf[(String, Int)]._2
  }
}

inputStream
  .map(m => ((m.topic, m.partition), m.value))
  .transform(new ProxyRDDWithPartitioner(_, partitioner))
  .updateStateByKey(func, partitioner)
  ....

The question is - is it safe to do such trick?

Reply via email to