What is the Spark master that you are using. Use local[4], not local
if you are running locally.

On Mon, Nov 10, 2014 at 3:01 PM, Something Something
<mailinglist...@gmail.com> wrote:
> I am embarrassed to admit but I can't get a basic 'word count' to work under
> Kafka/Spark streaming.  My code looks like this.  I  don't see any word
> counts in console output.  Also, don't see any output in UI.  Needless to
> say, I am newbie in both 'Spark' as well as 'Kafka'.
>
> Please help.  Thanks.
>
> Here's the code:
>
>     public static void main(String[] args) {
>         if (args.length < 4) {
>             System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group>
> <topics> <numThreads>");
>             System.exit(1);
>         }
>
> //        StreamingExamples.setStreamingLogLevels();
> //        SparkConf sparkConf = new
> SparkConf().setAppName("JavaKafkaWordCount");
>
>         // Location of the Spark directory
>         String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>
>         // URL of the Spark cluster
>         String sparkUrl = "spark://mymachine:7077";
>
>         // Location of the required JAR files
>         String jarFiles =
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>
>         SparkConf sparkConf = new SparkConf();
>         sparkConf.setAppName("JavaKafkaWordCount");
>         sparkConf.setJars(new String[]{jarFiles});
>         sparkConf.setMaster(sparkUrl);
>         sparkConf.set("spark.ui.port", "2348");
>         sparkConf.setSparkHome(sparkHome);
>
>         Map<String, String> kafkaParams = new HashMap<String, String>();
>         kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>         kafkaParams.put("group.id", "1");
>         kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>         kafkaParams.put("serializer.class",
> "kafka.serializer.StringEncoder");
>         kafkaParams.put("request.required.acks", "1");
>
>         // Create the context with a 1 second batch size
>         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
>         int numThreads = Integer.parseInt(args[3]);
>         Map<String, Integer> topicMap = new HashMap<String, Integer>();
>         String[] topics = args[2].split(",");
>         for (String topic: topics) {
>             topicMap.put(topic, numThreads);
>         }
>
> //        JavaPairReceiverInputDStream<String, String> messages =
> //                KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>         JavaPairDStream<String, String> messages =
> KafkaUtils.createStream(jssc,
>                 String.class,
>                 String.class,
>                 StringDecoder.class,
>                 StringDecoder.class,
>                 kafkaParams,
>                 topicMap,
>                 StorageLevel.MEMORY_ONLY_SER());
>
>
>         JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
>             @Override
>             public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>             }
>         });
>
>         JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
>             @Override
>             public Iterable<String> call(String x) {
>                 return Lists.newArrayList(SPACE.split(x));
>             }
>         });
>
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>                 new PairFunction<String, String, Integer>() {
>                     @Override
>                     public Tuple2<String, Integer> call(String s) {
>                         return new Tuple2<String, Integer>(s, 1);
>                     }
>                 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>             @Override
>             public Integer call(Integer i1, Integer i2) {
>                 return i1 + i2;
>             }
>         });
>
>         wordCounts.print();
>         jssc.start();
>         jssc.awaitTermination();
>

Reply via email to