Hi All
Does anyone know a fix for below exception. The XML parsing function
works fine for unit test as you see in below code but fails while
using in RDD.

new_xml: org.apache.spark.rdd.RDD[List[(String, String)]] =
MapPartitionsRDD[119] at map at <console>:57
17/07/10 08:29:54 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 50)
java.lang.NullPointerException
        at $line103.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.xmlParse(<console>:52)
        at 
$line109.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:57)
        at 
$line109.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
        at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
17/07/10 08:29:54 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 51)
java.lang.NullPointerException

cat /home/spark/XML_Project/XML_Prog.scala
println(">>>>>>>START UnitTest for xmlParse")
import com.databricks.spark.xml.XmlReader
def xmlParse (xml:String) = {   
        var xRDD = sc.parallelize(Seq(xml))
        var df = new XmlReader().xmlRdd(spark.sqlContext,xRDD)  
        var out_rdd = df.withColumn("comment",
explode(df("Comments.Comment"))).select($"comment.Description",$"comment.Title").rdd
        out_rdd.collect.map(x=>(x(0).toString,x(1).toString)).toList
}

val 
xml1="<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</Description></Comment><Comment><Title>Title1.2</Title><Description>Descript1.2</Description></Comment><Comment><Title>Title1.3</Title><Description>Descript1.3</Description></Comment></Comments></books>"
val xml_parse = xmlParse(xml1)

println("<<<<<<<<END UnitTest for xmlParse")

val rdd = sc.textFile("file:///home/spark/XML_Project/data.txt")
val xml_pRDDs = rdd.map(x=>(x.split(',')(0).toInt, x.split(',')(3)))

val new_xml = xml_pRDDs.map({case (key,value)=>(xmlParse(value.toString))})
new_xml.foreach(println)


cat /home/spark/XML_Project/data.txt
1,Amol,Kolhapur,<books><Comments><Comment><Title>Title1.1</Title><Description>Descript1.1</Description></Comment><Comment><Title>Title1.2</Title><Description>Descript1.2</Description></Comment><Comment><Title>Title1.3</Title><Description>Descript1.3</Description></Comment></Comments></books>
2,Ameet,Bangalore,<books><Comments><Comment><Title>Title2.1</Title><Description>Descript2.1</Description></Comment><Comment><Title>Title2.2</Title><Description>Descript2.2</Description></Comment></Comments></books>
3,Rajesh,Jaipur,<books><Comments><Comment><Title>Title3.1</Title><Description>Descript3.1</Description></Comment><Comment><Title>Title3.2</Title><Description>Descript3.2</Description></Comment><Comment><Title>Title3.3</Title><Description>Descript3.3</Description></Comment><Comment><Title>Title3.4</Title><Description>Descript3.4</Description></Comment></Comments></books>

Regards,
Amol

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to