Hi I am getting null pointer exception while saving the data into hadoop.

code as follows.
If I change the last line to 
sorted_tup.take(count.toInt).foreach { case ((a, b, c), l) =>
sc.parallelize(l.toSeq).coalesce(1).saveAsTextFile(hdfsDir + a + "/" + b +
"/" + c)} . I am able to save it , But for larger files I am getting heap
space error . I am thinking it is due to "take" . Can some please help me
with this.

Thanks,
Durga 

import org.apache.spark.SparkContext._
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)
      .set("spark.cores.max", numCores)
     
.setJars(Seq("/home/hadoopuser/testing/Rest_1/FileSplitter/target/scala-2.10/filesplitter_2.10-1.0.jar")).set("spark.executor.memory",
"5g")
    val sc = new SparkContext(conf)
    val action_results = sc.textFile(inputData)

    import scala.util.parsing.json.JSON

    val actions = action_results.map(l => JSON.parseFull(l).get).cache()

    val tuples = actions.map { l =>
      var m = l.asInstanceOf[Map[Any, Any]];
      ((m("deviceId").asInstanceOf[Map[Any,
Any]]("$numberLong").asInstanceOf[String],
        m("actionName").asInstanceOf[String],
m("timestamp").asInstanceOf[Map[Any,
Any]]("$date").asInstanceOf[String].substring(0, 10)), l)
    }

    val tup_grp = tuples.groupByKey

    val tup_counts = tup_grp.map { case ((d: String, a: String, t: String),
g) => ((d, a, t), g.toArray)}

    val sorted_tup = tup_counts.sortByKey(true)
//    val count = sorted_tup.count
//    println("Sorted Tuples: " + sorted_tup.count)

    sorted_tup.foreach{case ((a, b, c), l:Array[Any]) =>
        val lines = sc.parallelize(l.toSeq)
        lines.coalesce(2,true).saveAsTextFile(hdfsDir + a + "/" + b + "/" +
c)
    }





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-null-pointer-exception-while-saving-hadoop-file-tp10220.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to