Thanks for your reply. I invoked my program with the broker ip and host and it triggered as expected but I see the below error
./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:9092 TestTopic 15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data. Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException org.apache.spark.SparkException: Couldn't find leader offsets for Set([TestTopic,0]) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Whene I ran the below to check the offsets I get this bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic --group test-consumer-group --zookeeper localhost:2181 Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic /0. Also I just added this below configs to my kafaka/config/consumer.properties and restarted kafka auto.offset.reset=smallest offsets.storage=zookeeper offsets.channel.backoff.ms=1000 offsets.channel.socket.timeout.ms=10000 offsets.commit.max.retries=5 dual.commit.enabled=true From: Cody Koeninger [mailto:c...@koeninger.org] Sent: Monday, September 28, 2015 7:56 PM To: Ratika Prasad <rpra...@couponsinc.com> Cc: dev@spark.apache.org Subject: Re: Spark-Kafka Connector issue This is a user list question not a dev list question. Looks like your driver is having trouble communicating to the kafka brokers. Make sure the broker host and port is available from the driver host (using nc or telnet); make sure that you're providing the _broker_ host and port to createDirectStream, not the zookeeper host; make sure the topics in question actually exist on kafka and the names match what you're providing to createDirectStream. On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rpra...@couponsinc.com<mailto:rpra...@couponsinc.com>> wrote: Hi All, I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below…I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running ./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Program public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" + " <brokers> is a list of one or more Kafka brokers\n" + " <topics> is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } String brokers = args[0]; String topics = args[1]; // Create context with 2 second batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); System.out.println("Word Counts are : " + wordCounts.toString()); // Start the computation jssc.start(); jssc.awaitTermination(); } }