This is a user list question not a dev list question.

Looks like your driver is having trouble communicating to the kafka
brokers.  Make sure the broker host and port is available from the driver
host (using nc or telnet); make sure that you're providing the _broker_
host and port to createDirectStream, not the zookeeper host; make sure the
topics in question actually exist on kafka and the names match what you're
providing to createDirectStream.





On Sat, Sep 26, 2015 at 11:50 PM, Ratika Prasad <rpra...@couponsinc.com>
wrote:

> Hi All,
>
>
>
> I am trying out the spark streaming and reading the messages from kafka
> topics which later would be created into streams as below…I have the kafka
> setup on a vm and topics created however when I try to run the program
> below from my spark vm as below I get an error even though the kafka server
> and zookeeper are up and running
>
>
>
> ./bin/spark-submit --class
> org.stream.processing.JavaKafkaStreamEventProcessing --master local
> spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> 172.28.161.32:2181 redemption_inbound
>
>
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>
>         at scala.util.Either.fold(Either.scala:97)
>
>         at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
>
>         at
> org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
>
>         at
> org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
>
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:497)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
> Program
>
>
>
> *public* *static* *void* main(String[] args) {
>
>     *if* (args.length < 2) {
>
>       System.*err*.println("Usage: DirectKafkaWordCount <brokers>
> <topics>\n" +
>
>           "  <brokers> is a list of one or more Kafka brokers\n" +
>
>           "  <topics> is a list of one or more kafka topics to consume
> from\n\n");
>
>       System.*exit*(1);
>
>     }
>
>
>
>     String brokers = args[0];
>
>     String topics = args[1];
>
>
>
>     // Create context with 2 second batch interval
>
>     SparkConf sparkConf = *new* SparkConf().setAppName(
> "JavaKafkaStreamEventProcessing");
>
>     JavaStreamingContext jssc = *new* JavaStreamingContext(sparkConf,
> Durations.*seconds*(2));
>
>
>
>     HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*
> (topics.split(",")));
>
>     HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
>
>     kafkaParams.put("metadata.broker.list", brokers);
>
>
>
>     // Create direct *kafka* stream with brokers and topics
>
>     JavaPairInputDStream<String, String> messages = KafkaUtils.
> *createDirectStream*(
>
>         jssc,
>
>         String.*class*,
>
>         String.*class*,
>
>         StringDecoder.*class*,
>
>         StringDecoder.*class*,
>
>         kafkaParams,
>
>         topicsSet
>
>     );
>
>
>
>     // Get the lines, split them into words, count the words and print
>
>     JavaDStream<String> lines = messages.map(*new* *Function<Tuple2<String,
> String>, String>()* {
>
>       *public* String call(Tuple2<String, String> tuple2) {
>
>         *return* tuple2._2();
>
>       }
>
>     });
>
>     JavaDStream<String> words = lines.flatMap(*new* *FlatMapFunction<String,
> String>()* {
>
>       *public* Iterable<String> call(String x) {
>
>         *return* Lists.*newArrayList*(*SPACE*.split(x));
>
>       }
>
>     });
>
>     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>
>       *new* *PairFunction<String, String, Integer>()* {
>
>         *public* Tuple2<String, Integer> call(String s) {
>
>           *return* *new* Tuple2<String, Integer>(s, 1);
>
>         }
>
>       }).reduceByKey(
>
>         *new* *Function2<Integer, Integer, Integer>()* {
>
>         *public* Integer call(Integer i1, Integer i2) {
>
>           *return* i1 + i2;
>
>         }
>
>       });
>
>     wordCounts.print();
>
>     System.*out*.println("Word Counts are : " + wordCounts.toString());
>
>
>
>     // Start the computation
>
>     jssc.start();
>
>     jssc.awaitTermination();
>
>   }
>
> }
>

Reply via email to