Hi All, I am trying out the spark streaming and reading the messages from kafka topics which later would be created into streams as below...I have the kafka setup on a vm and topics created however when I try to run the program below from my spark vm as below I get an error even though the kafka server and zookeeper are up and running
./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing --master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 172.28.161.32:2181 redemption_inbound Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) at scala.util.Either.fold(Either.scala:97) at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422) at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532) at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) at org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Program public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" + " <brokers> is a list of one or more Kafka brokers\n" + " <topics> is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } String brokers = args[0]; String topics = args[1]; // Create context with 2 second batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaStreamEventProcessing"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); System.out.println("Word Counts are : " + wordCounts.toString()); // Start the computation jssc.start(); jssc.awaitTermination(); } }