Did you try: val data = indexed_files.groupByKey
val *modified_data* = data.map { a => var name = a._2.mkString(",") (a._1, name) } *modified_data*.foreach { a => var file = sc.textFile(a._2) println(file.count) } Thanks Best Regards On Wed, Jul 22, 2015 at 2:18 AM, MorEru <hsb.sh...@gmail.com> wrote: > I have a number of CSV files and need to combine them into a RDD by part of > their filenames. > > For example, for the below files > $ ls > 20140101_1.csv 20140101_3.csv 20140201_2.csv 20140301_1.csv > 20140301_3.csv 20140101_2.csv 20140201_1.csv 20140201_3.csv > > I need to combine files with names 20140101*.csv into a RDD to work on it > and so on. > > I am using sc.wholeTextFiles to read the entire directory and then grouping > the filenames by their patters to form a string of filenames. I am then > passing the string to sc.textFile to open the files as a single RDD. > > This is the code I have - > > val files = sc.wholeTextFiles("*.csv") > val indexed_files = files.map(a => (a._1.split("_")(0),a._1)) > val data = indexed_files.groupByKey > > data.map { a => > var name = a._2.mkString(",") > (a._1, name) > } > > data.foreach { a => > var file = sc.textFile(a._2) > println(file.count) > } > > And I get SparkException - NullPointerException when I try to call > textFile. > The error stack refers to an Iterator inside the RDD. I am not able to > understand the error - > > 15/07/21 15:37:37 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks > have all completed, from pool > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 > in > stage 65.0 failed 4 times, most recent failure: Lost task 1.3 in stage 65.0 > (TID 115, 10.132.8.10): java.lang.NullPointerException > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) > at > > org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:870) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) > > However, when I do sc.textFile(data.first._2).count in the spark shell, I > am > able to form the RDD and able to retrieve the count. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-inside-RDD-when-calling-sc-textFile-tp23943.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >