If you're using the spark partition id directly as the key, then you don't need to access offset ranges at all, right? You can create a single instance of a partitioner in advance, and all it needs to know is the number of partitions (which is just the count of all the kafka topic/partitions).
On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav <krot.vyaches...@gmail.com> wrote: > Cody, > > Thanks, good point. I fixed getting partition id to: > > class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { > override def numPartitions: Int = offsetRanges.size > > override def getPartition(key: Any): Int = { > // this is set in .map(m => (TaskContext.get().partitionId(), m.value)) > key.asInstanceOf[Int] > } > } > > inputStream > .map(m => (TaskContext.get().partitionId(), m.value)) > .transform { rdd => > val part = new > MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) > new ProxyRDDWithPartitioner(rdd, part) > } > ... > > But how can I create same partitioner during updateStateByKey call? I > have no idea how to access rdd when calling updateStateByKey. > > вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>: > >> I think the general idea is worth pursuing. >> >> However, this line: >> >> override def getPartition(key: Any): Int = { >> key.asInstanceOf[(String, Int)]._2 >> } >> >> is using the kafka partition id, not the spark partition index, so it's >> going to give you fewer partitions / incorrect index >> >> Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index >> into the offset range array matches the (spark) partition id. That will >> also tell you what the value of numPartitions should be. >> >> >> >> >> >> >> >> On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav < >> krot.vyaches...@gmail.com> wrote: >> >>> Hi all, >>> In my streaming job I'm using kafka streaming direct approach and want >>> to maintain state with updateStateByKey. My PairRDD has message's topic >>> name + partition id as a key. So, I assume that updateByState could work >>> within same partition as KafkaRDD and not lead to shuffles. Actually this >>> is not true, because updateStateByKey leads to cogroup transformation that >>> thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd >>> does not have partitioner at all. So, dependency is considered to be wide >>> and leads to shuffle. >>> >>> I tried to avoid shuffling by providing custom partitioner to >>> updateStateByKey, but KafkaRDD need to use same partitioner. For this I >>> created a proxy RDD that just returns my partitioner. >>> >>> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: >>> Partitioner) extends RDD[T](prev) { >>> >>> override val partitioner = Some(part) >>> >>> override def compute(split: Partition, context: TaskContext): >>> Iterator[T] = prev.compute(split, context) >>> >>> override protected def getPartitions: Array[Partition] = >>> prev.partitions >>> >>> override def getPreferredLocations(thePart: Partition): Seq[String] = >>> prev.preferredLocations(thePart) >>> } >>> >>> I use it as: >>> val partitioner = new Partitioner { >>> // TODO this should be retrieved from kafka >>> override def numPartitions: Int = 2 >>> >>> override def getPartition(key: Any): Int = { >>> key.asInstanceOf[(String, Int)]._2 >>> } >>> } >>> >>> inputStream >>> .map(m => ((m.topic, m.partition), m.value)) >>> .transform(new ProxyRDDWithPartitioner(_, partitioner)) >>> .updateStateByKey(func, partitioner) >>> .... >>> >>> The question is - is it safe to do such trick? >>> >> >>