Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Thanks, this works. Hopefully I didn't miss something important with this approach. вт, 2 июня 2015 г. в 20:15, Cody Koeninger : > If you're using the spark partition id directly as the key, then you don't > need to access offset ranges at all, right? > You can create a single instance of a parti

Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Cody Koeninger
If you're using the spark partition id directly as the key, then you don't need to access offset ranges at all, right? You can create a single instance of a partitioner in advance, and all it needs to know is the number of partitions (which is just the count of all the kafka topic/partitions). On

Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Krot Viacheslav
Cody, Thanks, good point. I fixed getting partition id to: class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { override def numPartitions: Int = offsetRanges.size override def getPartition(key: Any): Int = { // this is set in .map(m => (TaskContext.get().partition

Re: updateStateByKey and kafka direct approach without shuffle

2015-06-02 Thread Cody Koeninger
I think the general idea is worth pursuing. However, this line: override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } is using the kafka partition id, not the spark partition index, so it's going to give you fewer partitions / incorrect index Cast the rdd to H