Chandana Kithalagama created ZEPPELIN-4074: ----------------------------------------------
Summary: Spark interpreter failed with "Not a version: 9" error Key: ZEPPELIN-4074 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4074 Project: Zeppelin Issue Type: Bug Components: Interpreters Affects Versions: 0.8.1 Environment: $ java -version java version "1.8.0_191" Java(TM) SE Runtime Environment (build 1.8.0_191-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode) zeppelin-0.8.1-bin-netinst running on local machine spark interpreter 2 external dependencies added to the spark interpreter configs - org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 - com.typesafe.play:play-json_2.11:2.6.8 Connecting to Spark 2.4.0 running on local machine Reporter: Chandana Kithalagama The following spark code connects to a kafka queue running on my local machine (inside devicehive platform) and reads sensor data messages. {code:java} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.{Seconds, StreamingContext} import play.api.libs.json._ case class SenseData(hash: String, value: Int, updated: String) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "sensor_data", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466") val streamingContext = new StreamingContext(sc, Seconds(120)) val stream = KafkaUtils.createDirectStream( streamingContext, PreferBrokers, Subscribe[String, String](topics, kafkaParams) ) stream.map( record => { var json: JsValue = Json.parse(record.value) SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("t").as[Int], json("b")("notification")("timestamp").as[String]) } ).foreachRDD( rdd=> { println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records") rdd.foreach( record => { record.productIterator.foreach{ i => println(i) } } ) rdd.toDF().registerTempTable("sensedata") } ) streamingContext.start(){code} When the code is added to a zeppelin paragraph and executed, the following error is reported. {code:java} WARN [2019-03-15 16:22:35,054] ({pool-2-thread-3} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text java.lang.NumberFormatException: Not a version: 9 at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184) at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187){code} It looks like that some dependency on JDK 9 is being checked during class loading. However I am running JDK 8. Is there a way to fix this? Some observations: 1) I could run the exact same code in spark shell without any error. This suggests that the issue may be in zeppelin code base. {code:java} ./bin/spark-shell --packages "org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0,com.typesafe.play:play-json_2.11:2.7.0"{code} 2) This started to through when com.typesafe.play:play-json_2.11:2.6.8 (or any version below 2.6.11) is added as a dependency to facilitate conversation from JSON to Scala and vice versa in my code. 3) However, if I use com.typesafe.play:play-json_2.11:2.7.0, it throws a different error which I was able to fix by excluding conflicting dependencies. {code:java} WARN [2019-03-15 16:11:29,203] ({pool-2-thread-2} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text warning: there was one deprecation warning; re-run with -deprecation for details com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8 at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64) at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19) at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751) at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82) at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala) at org.apache.spark.streaming.dstream.InputDStream.<init>(InputDStream.scala:80) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:57) at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147) at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124){code} When the following exclusions were added to the spark dependencies against com.typesafe.play:play-json_2.11:2.7.0, this error got disappeared. {code:java} com.fasterxml.jackson.core:jackson-databind,com.fasterxml.jackson.datatype:jackson-datatype{code} Complete stack trace: {code:java} INFO [2019-03-15 16:22:27,440] ({pool-2-thread-3} RemoteInterpreterManagedProcess.java[start]:190) - Run interpreter process [/Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/bin/interpreter.sh, -d, /Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/interpreter/spark, -c, 10.76.26.5, -p, 62197, -r, :, -l, /Users/ckit/platforms/zeppelin-0.8.1-bin-netinst/local-repo/spark, -g, spark] INFO [2019-03-15 16:22:32,991] ({pool-9-thread-1} RemoteInterpreterManagedProcess.java[callback]:123) - RemoteInterpreterServer Registered: CallbackInfo(host:10.76.26.5, port:62200) INFO [2019-03-15 16:22:32,995] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkInterpreter INFO [2019-03-15 16:22:33,170] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkSqlInterpreter INFO [2019-03-15 16:22:33,173] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.DepInterpreter INFO [2019-03-15 16:22:33,181] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.PySparkInterpreter INFO [2019-03-15 16:22:33,189] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.IPySparkInterpreter INFO [2019-03-15 16:22:33,198] ({pool-2-thread-3} RemoteInterpreter.java[call]:168) - Create RemoteInterpreter org.apache.zeppelin.spark.SparkRInterpreter INFO [2019-03-15 16:22:33,200] ({pool-2-thread-3} RemoteInterpreter.java[call]:142) - Open RemoteInterpreter org.apache.zeppelin.spark.SparkInterpreter INFO [2019-03-15 16:22:33,201] ({pool-2-thread-3} RemoteInterpreter.java[pushAngularObjectRegistryToRemote]:436) - Push local angular object registry from ZeppelinServer to remote interpreter group spark:shared_process WARN [2019-03-15 16:22:35,054] ({pool-2-thread-3} NotebookServer.java[afterStatusChange]:2316) - Job 20190304-085834_2122705886 is finished, status: ERROR, exception: null, result: %text java.lang.NumberFormatException: Not a version: 9 at scala.util.PropertiesTrait$class.parts$1(Properties.scala:184) at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:187) at scala.util.Properties$.isJavaAtLeast(Properties.scala:17) at scala.tools.util.PathResolverBase$Calculated$.javaBootClasspath(PathResolver.scala:276) at scala.tools.util.PathResolverBase$Calculated$.basis(PathResolver.scala:283) at scala.tools.util.PathResolverBase$Calculated$.containers$lzycompute(PathResolver.scala:293) at scala.tools.util.PathResolverBase$Calculated$.containers(PathResolver.scala:293) at scala.tools.util.PathResolverBase.containers(PathResolver.scala:309) at scala.tools.util.PathResolver.computeResult(PathResolver.scala:341) at scala.tools.util.PathResolver.computeResult(PathResolver.scala:332) at scala.tools.util.PathResolverBase.result(PathResolver.scala:314) at scala.tools.nsc.backend.JavaPlatform$class.classPath(JavaPlatform.scala:28) at scala.tools.nsc.Global$GlobalPlatform.classPath(Global.scala:115) at scala.tools.nsc.Global.scala$tools$nsc$Global$$recursiveClassPath(Global.scala:131) at scala.tools.nsc.Global$GlobalMirror.rootLoader(Global.scala:64) at scala.reflect.internal.Mirrors$Roots$RootClass.<init>(Mirrors.scala:307) at scala.reflect.internal.Mirrors$Roots.RootClass$lzycompute(Mirrors.scala:321) at scala.reflect.internal.Mirrors$Roots.RootClass(Mirrors.scala:321) at scala.reflect.internal.Mirrors$Roots$EmptyPackageClass.<init>(Mirrors.scala:330) at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass$lzycompute(Mirrors.scala:336) at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass(Mirrors.scala:336) at scala.reflect.internal.Mirrors$Roots.EmptyPackageClass(Mirrors.scala:276) at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:250) at scala.tools.nsc.Global.rootMirror$lzycompute(Global.scala:73) at scala.tools.nsc.Global.rootMirror(Global.scala:71) at scala.tools.nsc.Global.rootMirror(Global.scala:39) at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:257) at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:257) at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1390) at scala.tools.nsc.Global$Run.<init>(Global.scala:1242) at scala.tools.nsc.interpreter.IMain.scala$tools$nsc$interpreter$IMain$$_initialize(IMain.scala:139) at scala.tools.nsc.interpreter.IMain.initializeSynchronous(IMain.scala:161) at org.apache.zeppelin.spark.SparkScala211Interpreter.open(SparkScala211Interpreter.scala:85) at org.apache.zeppelin.spark.NewSparkInterpreter.open(NewSparkInterpreter.java:102) at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:62) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:69) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:616) at org.apache.zeppelin.scheduler.Job.run(Job.java:188) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:140) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)