Hi I am getting null pointer exception while saving the data into hadoop. code as follows. If I change the last line to sorted_tup.take(count.toInt).foreach { case ((a, b, c), l) => sc.parallelize(l.toSeq).coalesce(1).saveAsTextFile(hdfsDir + a + "/" + b + "/" + c)} . I am able to save it , But for larger files I am getting heap space error . I am thinking it is due to "take" . Can some please help me with this.
Thanks, Durga import org.apache.spark.SparkContext._ val conf = new SparkConf() .setMaster(master) .setAppName(appName) .set("spark.cores.max", numCores) .setJars(Seq("/home/hadoopuser/testing/Rest_1/FileSplitter/target/scala-2.10/filesplitter_2.10-1.0.jar")).set("spark.executor.memory", "5g") val sc = new SparkContext(conf) val action_results = sc.textFile(inputData) import scala.util.parsing.json.JSON val actions = action_results.map(l => JSON.parseFull(l).get).cache() val tuples = actions.map { l => var m = l.asInstanceOf[Map[Any, Any]]; ((m("deviceId").asInstanceOf[Map[Any, Any]]("$numberLong").asInstanceOf[String], m("actionName").asInstanceOf[String], m("timestamp").asInstanceOf[Map[Any, Any]]("$date").asInstanceOf[String].substring(0, 10)), l) } val tup_grp = tuples.groupByKey val tup_counts = tup_grp.map { case ((d: String, a: String, t: String), g) => ((d, a, t), g.toArray)} val sorted_tup = tup_counts.sortByKey(true) // val count = sorted_tup.count // println("Sorted Tuples: " + sorted_tup.count) sorted_tup.foreach{case ((a, b, c), l:Array[Any]) => val lines = sc.parallelize(l.toSeq) lines.coalesce(2,true).saveAsTextFile(hdfsDir + a + "/" + b + "/" + c) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-null-pointer-exception-while-saving-hadoop-file-tp10220.html Sent from the Apache Spark User List mailing list archive at Nabble.com.