Hi
 not sure if this will help at all, and pls take it with a pinch of salt as
i dont have your setup and i am not running on a cluster

 I have tried to run a kafka example which was originally workkign on spark
1.6.1 on spark 2.
These are the jars i am using

spark-streaming-kafka-0-10_2.11_2.0.1.jar
kafka_2.11-0.10.1.1


And here's the code up to the creation of the Direct Stream. apparently
with the new version of kafka libs some properties have to be specified


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher

import Utilities._

import org.apache.spark.streaming.kafka010.KafkaUtils
import kafka.serializer.StringDecoder
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/** Working example of listening for log data from Kafka's testLogs topic
on port 9092. */
object KafkaExample {

  def main(args: Array[String]) {

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw
Apache log lines
    val pattern = apacheLogPattern()

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
"bootstrap.servers" -> "localhost:9092",
            "key.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
            "group.id" -> "group1")
    val topics = List("testLogs").toSet
    val lines = KafkaUtils.createDirectStream[String, String](
                                            ssc,
                                            PreferConsistent,
                                            Subscribe[String,
String](topics, kafkaParams)
                                          ).map(cr => cr.value())

hth

 marco












On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> 1.5).
>
> Admittedly I am messing around with Spark-shell. However, I am surprised
> why this does not work with Spark 2 and is ok with CDH 5.1
>
> scala>     val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>
> java.lang.NoClassDefFoundError: Could not initialize class kafka.consumer.
> FetchRequestAndResponseStatsRegistry$
>   at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39)
>   at org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
>   at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>   at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(
> KafkaCluster.scala:125)
>   at org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> getFromOffsets(KafkaUtils.scala:211)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
>   ... 74 elided
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Reply via email to