If you're using the spark partition id directly as the key, then you don't
need to access offset ranges at all, right?
You can create a single instance of a partitioner in advance, and all it
needs to know is the number of partitions (which is just the count of all
the kafka topic/partitions).

On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav <krot.vyaches...@gmail.com>
wrote:

> Cody,
>
> Thanks, good point. I fixed getting partition id to:
>
> class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
>   override def numPartitions: Int = offsetRanges.size
>
>   override def getPartition(key: Any): Int = {
>     // this is set in .map(m => (TaskContext.get().partitionId(), m.value))
>     key.asInstanceOf[Int]
>   }
> }
>
> inputStream
>     .map(m => (TaskContext.get().partitionId(), m.value))
>     .transform { rdd =>
>         val part = new
> MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
>         new ProxyRDDWithPartitioner(rdd, part)
>     }
>     ...
>
> But how can I create same partitioner during updateStateByKey call?   I
> have no idea how to access rdd when calling updateStateByKey.
>
> вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>:
>
>> I think the general idea is worth pursuing.
>>
>> However, this line:
>>
>>  override def getPartition(key: Any): Int = {
>>     key.asInstanceOf[(String, Int)]._2
>>   }
>>
>> is using the kafka partition id, not the spark partition index, so it's
>> going to give you fewer partitions / incorrect index
>>
>> Cast the rdd to HasOffsetRanges, get the offsetRanges from it.  The index
>> into the offset range array matches the (spark) partition id.  That will
>> also tell you what the value of numPartitions should be.
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav <
>> krot.vyaches...@gmail.com> wrote:
>>
>>> Hi all,
>>> In my streaming job I'm using kafka streaming direct approach and want
>>> to maintain state with updateStateByKey. My PairRDD has message's topic
>>> name + partition id as a key. So, I assume that updateByState could work
>>> within same partition as KafkaRDD and not lead to shuffles. Actually this
>>> is not true, because updateStateByKey leads to cogroup transformation that
>>> thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd
>>> does not have partitioner at all. So, dependency is considered to be wide
>>> and leads to shuffle.
>>>
>>> I tried to avoid shuffling by providing custom partitioner to
>>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I
>>> created a proxy RDD that just returns my partitioner.
>>>
>>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part:
>>> Partitioner) extends RDD[T](prev) {
>>>
>>>   override val partitioner = Some(part)
>>>
>>>   override def compute(split: Partition, context: TaskContext):
>>> Iterator[T] = prev.compute(split, context)
>>>
>>>   override protected def getPartitions: Array[Partition] =
>>> prev.partitions
>>>
>>>   override def getPreferredLocations(thePart: Partition): Seq[String] =
>>> prev.preferredLocations(thePart)
>>> }
>>>
>>> I use it as:
>>> val partitioner = new Partitioner {
>>>   // TODO this should be retrieved from kafka
>>>   override def numPartitions: Int = 2
>>>
>>>   override def getPartition(key: Any): Int = {
>>>     key.asInstanceOf[(String, Int)]._2
>>>   }
>>> }
>>>
>>> inputStream
>>>   .map(m => ((m.topic, m.partition), m.value))
>>>   .transform(new ProxyRDDWithPartitioner(_, partitioner))
>>>   .updateStateByKey(func, partitioner)
>>>   ....
>>>
>>> The question is - is it safe to do such trick?
>>>
>>
>>

Reply via email to