I think the general idea is worth pursuing.

However, this line:

 override def getPartition(key: Any): Int = {
    key.asInstanceOf[(String, Int)]._2
  }

is using the kafka partition id, not the spark partition index, so it's
going to give you fewer partitions / incorrect index

Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
into the offset range array matches the (spark) partition id.  That will
also tell you what the value of numPartitions should be.







On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav <krot.vyaches...@gmail.com>
wrote:

> Hi all,
> In my streaming job I'm using kafka streaming direct approach and want to
> maintain state with updateStateByKey. My PairRDD has message's topic name +
> partition id as a key. So, I assume that updateByState could work within
> same partition as KafkaRDD and not lead to shuffles. Actually this is not
> true, because updateStateByKey leads to cogroup transformation that thinks,
> that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
> have partitioner at all. So, dependency is considered to be wide and leads
> to shuffle.
>
> I tried to avoid shuffling by providing custom partitioner to
> updateStateByKey, but KafkaRDD need to use same partitioner. For this I
> created a proxy RDD that just returns my partitioner.
>
> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
> Partitioner) extends RDD[T](prev) {
>
>   override val partitioner = Some(part)
>
>   override def compute(split: Partition, context: TaskContext):
> Iterator[T] = prev.compute(split, context)
>
>   override protected def getPartitions: Array[Partition] = prev.partitions
>
>   override def getPreferredLocations(thePart: Partition): Seq[String] =
> prev.preferredLocations(thePart)
> }
>
> I use it as:
> val partitioner = new Partitioner {
>   // TODO this should be retrieved from kafka
>   override def numPartitions: Int = 2
>
>   override def getPartition(key: Any): Int = {
>     key.asInstanceOf[(String, Int)]._2
>   }
> }
>
> inputStream
>   .map(m => ((m.topic, m.partition), m.value))
>   .transform(new ProxyRDDWithPartitioner(_, partitioner))
>   .updateStateByKey(func, partitioner)
>   ....
>
> The question is - is it safe to do such trick?
>

Reply via email to