Thanks, this works.
Hopefully I didn't miss something important with this approach.
вт, 2 июня 2015 г. в 20:15, Cody Koeninger :
> If you're using the spark partition id directly as the key, then you don't
> need to access offset ranges at all, right?
> You can create a single instance of a parti
If you're using the spark partition id directly as the key, then you don't
need to access offset ranges at all, right?
You can create a single instance of a partitioner in advance, and all it
needs to know is the number of partitions (which is just the count of all
the kafka topic/partitions).
On
Cody,
Thanks, good point. I fixed getting partition id to:
class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner {
override def numPartitions: Int = offsetRanges.size
override def getPartition(key: Any): Int = {
// this is set in .map(m => (TaskContext.get().partition
I think the general idea is worth pursuing.
However, this line:
override def getPartition(key: Any): Int = {
key.asInstanceOf[(String, Int)]._2
}
is using the kafka partition id, not the spark partition index, so it's
going to give you fewer partitions / incorrect index
Cast the rdd to H