Hello, I have an interesting use case for a pre-filtered RDD. I have two solutions that I am not entirly happy with and would like to get some feedback and thoughts. Perhaps it is a use case that could be more explicitly supported in Spark.
My data has well defined semantics for they key values that I can use to pre-filter an RDD to exclude those partitions and records that I will not need from being loaded at all. In most cases this is significant savings. Essentially the dataset is geographic image tiles, as you would see on google maps. The entire dataset could be huge, covering an entire continent at high resolution. But if I want to work with a subset, lets say a single city, it makes no sense for me to load all the partitions into memory just so I can filter them as a first step. First attempt was to extent NewHadoopRDD as follows: abstract class PreFilteredHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient conf: Configuration) extends NewHadoopRDD(sc, inputFormatClass, keyClass, valueClass, conf) { /** returns true if specific partition has relevant keys */ def includePartition(p: Partition): Boolean /** returns true if the specific key in the partition passes the filter */ def includeKey(key: K): Boolean override def getPartitions: Array[Partition] = { val partitions = super.getPartitions partitions.filter(includePartition) } override def compute(theSplit: Partition, context: TaskContext) = { val ii = super.compute(theSplit, context) new InterruptibleIterator(ii.context, ii.delegate.filter{case (k,v) => includeKey(k)}) } } NewHadoopRDD for reference: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala This is nice and handles the partition portion of the issue well enough, but by the time the iterator is created by super.compute there is no way avoid reading the values from records that do not pass my filter. Since I am actually using ‘SequenceFileInputFormat’ as my InputFormat I can do better, and avoid deserializing the values if I could get my hands on the reader and re-implement compute(). But this does not seem possible to do through extension because both the NewHadooprRDD.confBroadcast and NewHadoopPartition are private. There does not seem to be a choice but to copy/paste extend the NewHadoopRDD. The two solutions that are apparent are: 1. remove those private modifiers 2. factor out reader creation to a method that can be used to reimplement compute() in a sub-class I would be curious to hear if anybody had/has similar problem and any thoughts on the issue. If you think there is PR in this I’d be happy to code it up and submit it. Thank you -- Eugene Cheipesh -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/pre-filtered-hadoop-RDD-use-case-tp7484.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.