Hi not sure if this will help at all, and pls take it with a pinch of salt as i dont have your setup and i am not running on a cluster
I have tried to run a kafka example which was originally workkign on spark 1.6.1 on spark 2. These are the jars i am using spark-streaming-kafka-0-10_2.11_2.0.1.jar kafka_2.11-0.10.1.1 And here's the code up to the creation of the Direct Stream. apparently with the new version of kafka libs some properties have to be specified import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import java.util.regex.Pattern import java.util.regex.Matcher import Utilities._ import org.apache.spark.streaming.kafka010.KafkaUtils import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** Working example of listening for log data from Kafka's testLogs topic on port 9092. */ object KafkaExample { def main(args: Array[String]) { // Create the context with a 1 second batch size val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1)) setupLogging() // Construct a regular expression (regex) to extract fields from raw Apache log lines val pattern = apacheLogPattern() val kafkaParams = Map("metadata.broker.list" -> "localhost:9092", "bootstrap.servers" -> "localhost:9092", "key.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" ->"org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "group1") val topics = List("testLogs").toSet val lines = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(cr => cr.value()) hth marco On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark > 1.5). > > Admittedly I am messing around with Spark-shell. However, I am surprised > why this does not work with Spark 2 and is ok with CDH 5.1 > > scala> val dstream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) > > java.lang.NoClassDefFoundError: Could not initialize class kafka.consumer. > FetchRequestAndResponseStatsRegistry$ > at kafka.consumer.SimpleConsumer.<init>(SimpleConsumer.scala:39) > at org.apache.spark.streaming.kafka.KafkaCluster.connect( > KafkaCluster.scala:52) > at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$ > org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply( > KafkaCluster.scala:345) > at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$ > org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply( > KafkaCluster.scala:342) > at scala.collection.IndexedSeqOptimized$class. > foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) > at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$ > spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342) > at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata( > KafkaCluster.scala:125) > at org.apache.spark.streaming.kafka.KafkaCluster. > getPartitions(KafkaCluster.scala:112) > at org.apache.spark.streaming.kafka.KafkaUtils$. > getFromOffsets(KafkaUtils.scala:211) > at org.apache.spark.streaming.kafka.KafkaUtils$. > createDirectStream(KafkaUtils.scala:484) > ... 74 elided > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > >