Hi,

I was playing with spark streaming and I wanted to collect data from MQTT
and publish them into Cassandra.

Here is my code,

package com.wouri.streamly.examples.mqtt;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.mqtt.MQTTUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraJavaUtil;

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from MQTT
Server.
 *
 * Usage: JavaMQTTStreamWordCount <brokerUrl> <topic> <brokerUrl> and
<topic>
 * describe the MQTT server that Structured Streaming would connect to
receive
 * data.
 *
 * To run this on your local machine, a MQTT Server should be up and
running.
 *
 */
public class JavaMQTTStreamWordCount implements Serializable{

    static Logger log =
LoggerFactory.getLogger(JavaMQTTStreamWordCount.class);

    static String wordTable = "words";

    private transient static SparkConf conf;
    private JavaMQTTStreamWordCount(SparkConf conf) {
        this.conf = conf;
    }
    private static void sendDataToCassandra(JavaSparkContext sc,String
keyspace,Map<String, Long> wordCounts) {
        log.info("In generateData");
        CassandraConnector connector =
CassandraConnector.apply(sc.getConf());
        System.out.println("WordCounts "+wordCounts);

        // Prepare the schema
        try (Session session = connector.openSession()){
            session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +"."
+wordTable + " (id INT PRIMARY KEY, word TEXT, counts INT)");
        }

        log.info("keyspace {} and tables : {}, {} and  {} created
successfully", keyspace, productsTable, salesTable, summariesTable);
        // Prepare the products hierarchy
        List<Word> words = new ArrayList<>();
        int i = 0;
        for (Map.Entry<String, Long> word : wordCounts.entrySet()){
            Word eWord = new Word(i, word.getKey(), (int)
(long)word.getValue());
            words.add(i, eWord);
            i++;
        }

        log.info("Products to add : {}", words);
        JavaRDD<Word> productsRDD = sc.parallelize(words);


CassandraJavaUtil.javaFunctions(productsRDD).writerBuilder(keyspace,
wordTable, CassandraJavaUtil.mapToRow(Word.class)).saveToCassandra();

        log.info("Words successfully added : {}", words);
    }
    static private Map<String,Long> wordCounts = new HashMap<>();

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: JavaMQTTStreamWordCount <brokerUrl>
<topic> <clientId> <username> <password> <keyspace>
<cassandraContactPoints>");
            System.exit(1);
        }

        String brokerUrl = args[0];
        String topic = args[1];
        String clientID = args[2];
        String username = args[3];
        String password = args[4];
        String keyspace = args[5];
        String cassandraContactPoints = args[6];
        SparkConf conf = new SparkConf();


        SparkConf sparkConf = new
SparkConf().setAppName("JavaMQTTStreamWordCount");
        sparkConf.set("spark.driver.allowMultipleContexts", "true");
        sparkConf.set("spark.cores.max", "2");
        sparkConf.set("spark.cassandra.connection.host",
cassandraContactPoints );
        sparkConf.set("spark.cassandra.connection.port", "9042" );
        sparkConf.set("spark.cassandra.auth.username", username);
        sparkConf.set("spark.cassandra.auth.password", password);
        sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        sparkConf.set("spark.cores.max", "2");

        // check Spark configuration for master URL, set it to local if not
        // configured
        if (!sparkConf.contains("spark.master")) {
            sparkConf.setMaster("local[4]");
        }

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(5));

        JavaReceiverInputDStream<String> lines =
MQTTUtils.createStream(jssc, brokerUrl, topic, clientID, username,
                password, false);

        JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
            public Iterator<String> call(String x) {
                return Arrays.asList(x.split(" ")).iterator();
            }
        });



        // Convert RDDs of the words DStream to DataFrame and run SQL query
        words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
            JavaMQTTStreamWordCount jms = new JavaMQTTStreamWordCount(conf);
            // @Override
            public void call(JavaRDD<String> rdd, Time time) {
                SparkSession spark =
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
                // Convert JavaRDD[String] to JavaRDD[bean class] to
DataFrame
                JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String,
JavaRecord>() {
                    // @Override
                    public JavaRecord call(String word) {
                        JavaRecord record = new JavaRecord();
                        record.setWord(word);
                        return record;
                    }
                });
                Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD,
JavaRecord.class);

                // Creates a temporary view using the DataFrame
                wordsDataFrame.createOrReplaceTempView("words");
                // Do word count on table using SQL and print it
                Dataset<Row> wordCountsDataFrame = spark.sql("select word,
count(*) as total from words group by word");
                System.out.println("========= " + time + "=========");
                System.out.println("About to show");
                List<Row> listRows = wordCountsDataFrame.collectAsList();
                for (Row row : listRows){
                    System.out.println(row.get(0));
                    System.out.println(row.get(1));
                    wordCounts.put((String)row.get(0), (Long)row.get(1));
                }
//                sendDataToCassandra(jssc.sparkContext(), keyspace,
wordCounts);
                sendDataToCassandra(new JavaSparkContext(conf), keyspace,
wordCounts);

                System.out.println("Show done");

            }
        });


        jssc.start();
        jssc.awaitTermination();
    }
}

/** Lazily instantiated singleton instance of SparkSession */
class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance =
SparkSession.builder().config(sparkConf).getOrCreate();
        }
        return instance;
    }
}

public class Word {
    private Integer id;
    private String word;
    private Integer counts;

    public Word() { }

    public Word(Integer id, String word, Integer counts) {
        this.id = id;
        this.word = word;
        this.counts = counts;
    }

    public Integer getId() { return id; }
    public void setId(Integer id) { this.id = id; }

    public String getWord() { return word; }
    public void setWord(String name) { this.word = word; }

    public Integer getCounts() { return counts; }
    public void setCounts(Integer counts) { this.counts = counts; }

    @Override
    public String toString() {
        return MessageFormat.format("Word'{'id={0}, word=''{1}'',
counts={2}'}'", id, word, counts);
    }
}

When I ran the following code, I get stuck with this exception,

17/01/14 20:38:50 ERROR JobScheduler: Error running job streaming job
1484422730000 ms.0
org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:92)
    at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
    at
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:194)
    at
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:188)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value:
org.apache.spark.SparkConf@3a501e0b)
    - field (class:
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, name: val$conf,
type: class org.apache.spark.SparkConf)
    - object (class
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2,
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2@72377c42)
    - field (class:
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, name: this$0,
type: class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2)
    - object (class
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1,
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1@5b4544bb)
    - field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
<function1>)
    at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 32 more
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
    at org.apache.spark.rdd.RDD.map(RDD.scala:365)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:92)
    at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
    at
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:194)
    at
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:188)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkConf, value:
org.apache.spark.SparkConf@3a501e0b)
    - field (class:
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, name: val$conf,
type: class org.apache.spark.SparkConf)
    - object (class
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2,
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2@72377c42)
    - field (class:
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, name: this$0,
type: class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2)
    - object (class
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1,
com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1@5b4544bb)
    - field (class:
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
<function1>)
    at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)


Any clue please?
Thanks.

Reply via email to