Hi
List users, I have installed Zeppelin 0.9.0 and it is working fine.
Lately, I am trying structure streaming with a kafka topic in Zeppelin. I
can read a kafka topic without any issues:

--------------------------------------
val df = spark.readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
           .option("subscribe", KAFKA_TOPIC_NAME_CONS)
            ...
            ...
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more
fields]
--------------------------------------

However when i tried to stream the data frame in the console, i am facing
following issue

--------------------------------------
val d = data_stream
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime(0.seconds))
    .option("truncate", "false")
    .start()
    .awaitTermination()

org.apache.spark.sql.streaming.StreamingQueryException: Query [id =
e8444fb6-ed9e-40e5-a867-2433aba0836a, runId =
6480cd9b-1ad8-4517-922c-6da024bc8617] terminated with exception: 28499
  at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
  at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 28499
  at
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.accept(BytecodeReadingParanamer.java:563)
  at
com.thoughtworks.paranamer.BytecodeReadingParanamer$ClassReader.access$200(BytecodeReadingParanamer.java:338)
  at
com.thoughtworks.paranamer.BytecodeReadingParanamer.lookupParameterNames(BytecodeReadingParanamer.java:103)
  at
com.thoughtworks.paranamer.CachingParanamer.lookupParameterNames(CachingParanamer.java:90)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.getCtorParams(BeanIntrospector.scala:45)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1(BeanIntrospector.scala:59)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$1$adapted(BeanIntrospector.scala:59)
  at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
  at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.findConstructorParam$1(BeanIntrospector.scala:59)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$19(BeanIntrospector.scala:181)
  at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14(BeanIntrospector.scala:175)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.$anonfun$apply$14$adapted(BeanIntrospector.scala:174)
  at
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
  at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
  at scala.collection.immutable.List.flatMap(List.scala:355)
  at
com.fasterxml.jackson.module.scala.introspect.BeanIntrospector$.apply(BeanIntrospector.scala:174)
  at
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$._descriptorFor(ScalaAnnotationIntrospectorModule.scala:21)
  at
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.fieldName(ScalaAnnotationIntrospectorModule.scala:29)
  at
com.fasterxml.jackson.module.scala.introspect.ScalaAnnotationIntrospector$.findImplicitPropertyName(ScalaAnnotationIntrospectorModule.scala:77)
  at
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findImplicitPropertyName(AnnotationIntrospectorPair.java:490)
  at
com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._addFields(POJOPropertiesCollector.java:380)
  at
com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:308)
  at
com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueAccessor(POJOPropertiesCollector.java:196)
  at
com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueAccessor(BasicBeanDescription.java:252)
  at
com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:346)
  at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:216)
  at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:165)
  at
com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1388)
  at
com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1336)
  at
com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:510)
  at
com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:713)
  at
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:308)
  at
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094)
  at
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404)
  at
org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:142)
  at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
  at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:363)
  at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
  at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
  at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
  at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
  at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
  at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
  at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
  at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
  at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
  at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
  at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:581)
  at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
  at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
  at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:581)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
  at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
  at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
  at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
  at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
  at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
  ... 1 more
--------------------------------------

I think the issue is related to
https://issues.apache.org/jira/browse/SPARK-22128

I am using Spark 3.0.2 which has paranamer 2.8 in the library and the
structure streaming with Kafka works just fine in the Spark shell. Tried
adding the paranamer JAR in the zeppelin  spark interpreter setting as well
without any success.

I have added the following dependencies in the Spark interpreter setting.

org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2
org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.2
org.apache.spark:spark-avro_2.12:3.0.2
org.apache.spark:spark-sql_2.12:3.0.2
za.co.absa:abris_2.12:4.2.0
org.apache.spark:spark-streaming_2.12:3.0.2
org.apache.spark:spark-core_2.12:3.0.2
com.fasterxml.jackson.module:jackson-module-paranamer:2.12.2
com.fasterxml.jackson.module:jackson-module-scala_2.12:2.12.3
com.thoughtworks.paranamer:paranamer:2.8
com.github.everit-org.json-schema:org.everit.json.schema:1.12.2

Kafka version: Confluent kafka_2.13-6.1.0

Anticipating for a quick response.

Regards

Reply via email to