Thanks for your reply.

I invoked my program with the broker ip and host and it triggered as expected 
but I see the below error

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:9092 TestTopic
15/09/28 17:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/09/28 17:45:11 WARN StreamingContext: spark.master should be set as 
local[n], n > 1 in local mode if you have receivers to get data, otherwise 
Spark jobs will not get resources to process the received data.
Exception in thread "main" org.apache.spark.SparkException: 
java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for 
Set([TestTopic,0])
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at 
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Whene I ran the below to check the offsets I get this

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic TestTopic 
--group test-consumer-group --zookeeper localhost:2181
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/TestTopic 
/0.

Also I just added this below configs to my kafaka/config/consumer.properties 
and restarted kafka

auto.offset.reset=smallest
offsets.storage=zookeeper
offsets.channel.backoff.ms=1000
offsets.channel.socket.timeout.ms=10000
offsets.commit.max.retries=5
dual.commit.enabled=true

From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Monday, September 28, 2015 7:56 PM
To: Ratika Prasad <rpra...@couponsinc.com>
Cc: dev@spark.apache.org
Subject: Re: Spark-Kafka Connector issue

This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka brokers.  
Make sure the broker host and port is available from the driver host (using nc 
or telnet); make sure that you're providing the _broker_ host and port to 
createDirectStream, not the zookeeper host; make sure the topics in question 
actually exist on kafka and the names match what you're providing to 
createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad 
<rpra...@couponsinc.com<mailto:rpra...@couponsinc.com>> wrote:
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics 
which later would be created into streams as below…I have the kafka setup on a 
vm and topics created however when I try to run the program below from my spark 
vm as below I get an error even though the kafka server and zookeeper are up 
and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:2181<http://172.28.161.32:2181> redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
        at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
        at 
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume 
from\n\n");
      System.exit(1);
    }

    String brokers = args[0];
    String topics = args[1];

    // Create context with 2 second batch interval
    SparkConf sparkConf = new 
SparkConf().setAppName("JavaKafkaStreamEventProcessing");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));

    HashSet<String> topicsSet = new 
HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();
    System.out.println("Word Counts are : " + wordCounts.toString());

    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

Reply via email to