Cody,

Thanks, good point. I fixed getting partition id to:

class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
  override def numPartitions: Int = offsetRanges.size

  override def getPartition(key: Any): Int = {
    // this is set in .map(m => (TaskContext.get().partitionId(), m.value))
    key.asInstanceOf[Int]
  }
}

inputStream
    .map(m => (TaskContext.get().partitionId(), m.value))
    .transform { rdd =>
        val part = new
MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
        new ProxyRDDWithPartitioner(rdd, part)
    }
    ...

But how can I create same partitioner during updateStateByKey call?   I
have no idea how to access rdd when calling updateStateByKey.

вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>:

> I think the general idea is worth pursuing.
>
> However, this line:
>
>  override def getPartition(key: Any): Int = {
>     key.asInstanceOf[(String, Int)]._2
>   }
>
> is using the kafka partition id, not the spark partition index, so it's
> going to give you fewer partitions / incorrect index
>
> Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
> into the offset range array matches the (spark) partition id.  That will
> also tell you what the value of numPartitions should be.
>
>
>
>
>
>
>
> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav <
> krot.vyaches...@gmail.com> wrote:
>
>> Hi all,
>> In my streaming job I'm using kafka streaming direct approach and want to
>> maintain state with updateStateByKey. My PairRDD has message's topic name +
>> partition id as a key. So, I assume that updateByState could work within
>> same partition as KafkaRDD and not lead to shuffles. Actually this is not
>> true, because updateStateByKey leads to cogroup transformation that thinks,
>> that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not
>> have partitioner at all. So, dependency is considered to be wide and leads
>> to shuffle.
>>
>> I tried to avoid shuffling by providing custom partitioner to
>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I
>> created a proxy RDD that just returns my partitioner.
>>
>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
>> Partitioner) extends RDD[T](prev) {
>>
>>   override val partitioner = Some(part)
>>
>>   override def compute(split: Partition, context: TaskContext):
>> Iterator[T] = prev.compute(split, context)
>>
>>   override protected def getPartitions: Array[Partition] = prev.partitions
>>
>>   override def getPreferredLocations(thePart: Partition): Seq[String] =
>> prev.preferredLocations(thePart)
>> }
>>
>> I use it as:
>> val partitioner = new Partitioner {
>>   // TODO this should be retrieved from kafka
>>   override def numPartitions: Int = 2
>>
>>   override def getPartition(key: Any): Int = {
>>     key.asInstanceOf[(String, Int)]._2
>>   }
>> }
>>
>> inputStream
>>   .map(m => ((m.topic, m.partition), m.value))
>>   .transform(new ProxyRDDWithPartitioner(_, partitioner))
>>   .updateStateByKey(func, partitioner)
>>   ....
>>
>> The question is - is it safe to do such trick?
>>
>
>

Reply via email to