Hi,
I am integrating Kafka and Spark, using spark-streaming. I have created a topic
as a kafka producer:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
I am publishing messages in kafka and trying to read them using spark-streaming
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:
package com.spark;
import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;
public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("Usage: spark-submit -class
com.spark.SparkStream target/SparkStream-with-dependencies.jar <zookeeper_ip>
<group_name> <topic1,topic2,...>");
System.exit(1);
}
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}
JavaStreamingContext jssc = new
JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new
Duration(3000));
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("Connection done");
JavaDStream<String> data = messages.map(new
Function<Tuple2<String, String>, String>()
{
public String
call(Tuple2<String, String> message)
{
System.out.println("NewMessage: "+message._2()); //for debugging
return
message._2();
}
});
data.print();
jssc.start();
jssc.awaitTermination();
}
}
I am running the job, and at other terminal I am running kafka-producer to
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> Hi kafka
> second message
> another message
But the output logs at the spark-streaming console doesn't show the messages,
but shows zero blocks received:
-------------------------------------------
Time: 1417107363000 ms
-------------------------------------------
14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for
time 1417107363000 ms (execution: 0.000 s)
14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
1417107363000 ms
14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000
ms
14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory
14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
Why isn't the data block getting received? i have tried using kafka
producer-consumer on console bin/kafka-console-producer.... and
bin/kafka-console-consumer... its working perfect, but why not the code above?
Please help me.
Regards,
Aiman Sarosh
________________________________
This message is for the designated recipient only and may contain privileged,
proprietary, or otherwise confidential information. If you have received it in
error, please notify the sender immediately and delete the original. Any other
use of the e-mail by you is prohibited. Where allowed by local law, electronic
communications with Accenture and its affiliates, including e-mail and instant
messaging (including content), may be scanned by our systems for the purposes
of information security and assessment of internal compliance with Accenture
policy.
______________________________________________________________________________________
www.accenture.com