Chandana Kithalagama created ZEPPELIN-4079: ----------------------------------------------
Summary: SparkShims.getNoteId returns NPE Key: ZEPPELIN-4079 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4079 Project: Zeppelin Issue Type: Bug Components: zeppelin-interpreter Affects Versions: 0.8.1 Environment: zeppelin-0.8.1-bin-netinst running on local machine, spark interpreter, 2 external dependencies added to the spark interpreter configs: - org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 - com.typesafe.play:play-json_2.11:2.6.8 Connecting to Spark 2.4.0 running on local machine $ java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode) Reporter: Chandana Kithalagama When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file. {code:java} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext, Time} import org.apache.kafka.common.serialization.StringDeserializer import play.api.libs.json._ val PREFIX = "CK-LOG ====------> " case class SenseData(hash: String, value: Float, updated: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "sensor_data-2019", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466") val streamingContext = new StreamingContext(sc, Seconds(60)) println(PREFIX + "streamContext created") val stream = KafkaUtils.createDirectStream( streamingContext, PreferBrokers, Subscribe[String, String](topics, kafkaParams) ) println(PREFIX + "DStream created") // val msgs = stream.window(Seconds(10)) stream.map( record => { var json: JsValue = Json.parse(record.value) SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String]) } ).foreachRDD( (rdd: RDD[SenseData], time: Time) => { println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") // Get the singleton instance of SparkSession val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // this is how to print the rdd in the // https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd // rdd.take(100).foreach(println) rdd.collect().foreach(println) // rdd.toDF() won't work without spark session and imported implicits // https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations rdd.toDF().createOrReplaceTempView("sensedata") val senseDataDF = spark.sql("select value, updated from sensedata") println(s"========= $time =========") senseDataDF.show() } ) streamingContext.start(){code} Error: {code:java} ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception java.lang.NullPointerException at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96) at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117) at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302) at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82){code} It looks like this issue is related to [this|https://jira.apache.org/jira/browse/ZEPPELIN-3242] and fixed in 0.8.0. The issue is in [line 96|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L96] of SparkShims.java file and caused by _jobgroupid_ being null which is caused by [this|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L109] - a missing **_spark.jobGroup.id_. It is set/passed during job [start|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41] [.|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41].] I am not sure why it is not passed or taken from the interpreter properties. Adding the 'spark.jobGroup.id' property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either. -- This message was sent by Atlassian JIRA (v7.6.3#76005)