Looks like you are missing the kafka dependency.

On Tue, May 16, 2017 at 1:04 PM, kant kodali <kanth...@gmail.com> wrote:

> Looks like I am getting the following runtime exception. I am using Spark
> 2.1.0 and the following jars
>
> *spark-sql_2.11-2.1.0.jar*
>
> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>
> *spark-streaming_2.11-2.1.0.jar*
>
>
> Exception in thread "stream execution thread for [id = 
> fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 
> 7c54940a-e453-41de-b256-049b539b59b1]"
>
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/serialization/ByteArrayDeserializer
>     at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>     at 
> org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>     at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>
>
> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> The default "startingOffsets" is "latest". If you don't push any data
>> after starting the query, it won't fetch anything. You can set it to
>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>> stream from the beginning.
>>
>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code.
>>>
>>>  val ds = sparkSession.readStream()
>>>                 .format("kafka")
>>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>>                 .option("subscribe", topicName)
>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>                 .load();
>>>
>>>  val ds1 = ds.select($"value")
>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>  query.awaitTermination()
>>>
>>> There are no errors when I execute this code however I don't see any
>>> data being printed out to console? When I run my standalone test Kafka
>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>> is going on with above code? any ideas?
>>>
>>> Thanks!
>>>
>>
>>
>

Reply via email to