Cody, Thanks, good point. I fixed getting partition id to:
class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { override def numPartitions: Int = offsetRanges.size override def getPartition(key: Any): Int = { // this is set in .map(m => (TaskContext.get().partitionId(), m.value)) key.asInstanceOf[Int] } } inputStream .map(m => (TaskContext.get().partitionId(), m.value)) .transform { rdd => val part = new MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger <c...@koeninger.org>: > I think the general idea is worth pursuing. > > However, this line: > > override def getPartition(key: Any): Int = { > key.asInstanceOf[(String, Int)]._2 > } > > is using the kafka partition id, not the spark partition index, so it's > going to give you fewer partitions / incorrect index > > Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index > into the offset range array matches the (spark) partition id. That will > also tell you what the value of numPartitions should be. > > > > > > > > On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav < > krot.vyaches...@gmail.com> wrote: > >> Hi all, >> In my streaming job I'm using kafka streaming direct approach and want to >> maintain state with updateStateByKey. My PairRDD has message's topic name + >> partition id as a key. So, I assume that updateByState could work within >> same partition as KafkaRDD and not lead to shuffles. Actually this is not >> true, because updateStateByKey leads to cogroup transformation that thinks, >> that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not >> have partitioner at all. So, dependency is considered to be wide and leads >> to shuffle. >> >> I tried to avoid shuffling by providing custom partitioner to >> updateStateByKey, but KafkaRDD need to use same partitioner. For this I >> created a proxy RDD that just returns my partitioner. >> >> class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: >> Partitioner) extends RDD[T](prev) { >> >> override val partitioner = Some(part) >> >> override def compute(split: Partition, context: TaskContext): >> Iterator[T] = prev.compute(split, context) >> >> override protected def getPartitions: Array[Partition] = prev.partitions >> >> override def getPreferredLocations(thePart: Partition): Seq[String] = >> prev.preferredLocations(thePart) >> } >> >> I use it as: >> val partitioner = new Partitioner { >> // TODO this should be retrieved from kafka >> override def numPartitions: Int = 2 >> >> override def getPartition(key: Any): Int = { >> key.asInstanceOf[(String, Int)]._2 >> } >> } >> >> inputStream >> .map(m => ((m.topic, m.partition), m.value)) >> .transform(new ProxyRDDWithPartitioner(_, partitioner)) >> .updateStateByKey(func, partitioner) >> .... >> >> The question is - is it safe to do such trick? >> > >