Hi, I was playing with spark streaming and I wanted to collect data from MQTT and publish them into Cassandra.
Here is my code, package com.wouri.streamly.examples.mqtt; import java.io.Serializable; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.mqtt.MQTTUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.driver.core.Session; import com.datastax.spark.connector.cql.CassandraConnector; import com.datastax.spark.connector.japi.CassandraJavaUtil; /** * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server. * * Usage: JavaMQTTStreamWordCount <brokerUrl> <topic> <brokerUrl> and <topic> * describe the MQTT server that Structured Streaming would connect to receive * data. * * To run this on your local machine, a MQTT Server should be up and running. * */ public class JavaMQTTStreamWordCount implements Serializable{ static Logger log = LoggerFactory.getLogger(JavaMQTTStreamWordCount.class); static String wordTable = "words"; private transient static SparkConf conf; private JavaMQTTStreamWordCount(SparkConf conf) { this.conf = conf; } private static void sendDataToCassandra(JavaSparkContext sc,String keyspace,Map<String, Long> wordCounts) { log.info("In generateData"); CassandraConnector connector = CassandraConnector.apply(sc.getConf()); System.out.println("WordCounts "+wordCounts); // Prepare the schema try (Session session = connector.openSession()){ session.execute("CREATE TABLE IF NOT EXISTS " + keyspace +"." +wordTable + " (id INT PRIMARY KEY, word TEXT, counts INT)"); } log.info("keyspace {} and tables : {}, {} and {} created successfully", keyspace, productsTable, salesTable, summariesTable); // Prepare the products hierarchy List<Word> words = new ArrayList<>(); int i = 0; for (Map.Entry<String, Long> word : wordCounts.entrySet()){ Word eWord = new Word(i, word.getKey(), (int) (long)word.getValue()); words.add(i, eWord); i++; } log.info("Products to add : {}", words); JavaRDD<Word> productsRDD = sc.parallelize(words); CassandraJavaUtil.javaFunctions(productsRDD).writerBuilder(keyspace, wordTable, CassandraJavaUtil.mapToRow(Word.class)).saveToCassandra(); log.info("Words successfully added : {}", words); } static private Map<String,Long> wordCounts = new HashMap<>(); public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaMQTTStreamWordCount <brokerUrl> <topic> <clientId> <username> <password> <keyspace> <cassandraContactPoints>"); System.exit(1); } String brokerUrl = args[0]; String topic = args[1]; String clientID = args[2]; String username = args[3]; String password = args[4]; String keyspace = args[5]; String cassandraContactPoints = args[6]; SparkConf conf = new SparkConf(); SparkConf sparkConf = new SparkConf().setAppName("JavaMQTTStreamWordCount"); sparkConf.set("spark.driver.allowMultipleContexts", "true"); sparkConf.set("spark.cores.max", "2"); sparkConf.set("spark.cassandra.connection.host", cassandraContactPoints ); sparkConf.set("spark.cassandra.connection.port", "9042" ); sparkConf.set("spark.cassandra.auth.username", username); sparkConf.set("spark.cassandra.auth.password", password); sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true"); sparkConf.set("spark.cores.max", "2"); // check Spark configuration for master URL, set it to local if not // configured if (!sparkConf.contains("spark.master")) { sparkConf.setMaster("local[4]"); } JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); JavaReceiverInputDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic, clientID, username, password, false); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }); // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() { JavaMQTTStreamWordCount jms = new JavaMQTTStreamWordCount(conf); // @Override public void call(JavaRDD<String> rdd, Time time) { SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() { // @Override public JavaRecord call(String word) { JavaRecord record = new JavaRecord(); record.setWord(word); return record; } }); Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it Dataset<Row> wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word"); System.out.println("========= " + time + "========="); System.out.println("About to show"); List<Row> listRows = wordCountsDataFrame.collectAsList(); for (Row row : listRows){ System.out.println(row.get(0)); System.out.println(row.get(1)); wordCounts.put((String)row.get(0), (Long)row.get(1)); } // sendDataToCassandra(jssc.sparkContext(), keyspace, wordCounts); sendDataToCassandra(new JavaSparkContext(conf), keyspace, wordCounts); System.out.println("Show done"); } }); jssc.start(); jssc.awaitTermination(); } } /** Lazily instantiated singleton instance of SparkSession */ class JavaSparkSessionSingleton { private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) { if (instance == null) { instance = SparkSession.builder().config(sparkConf).getOrCreate(); } return instance; } } public class Word { private Integer id; private String word; private Integer counts; public Word() { } public Word(Integer id, String word, Integer counts) { this.id = id; this.word = word; this.counts = counts; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getWord() { return word; } public void setWord(String name) { this.word = word; } public Integer getCounts() { return counts; } public void setCounts(Integer counts) { this.counts = counts; } @Override public String toString() { return MessageFormat.format("Word'{'id={0}, word=''{1}'', counts={2}'}'", id, word, counts); } } When I ran the following code, I get stuck with this exception, 17/01/14 20:38:50 ERROR JobScheduler: Error running job streaming job 1484422730000 ms.0 org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.map(RDD.scala:365) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:92) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45) at com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:194) at com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:188) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf Serialization stack: - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@3a501e0b) - field (class: com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, name: val$conf, type: class org.apache.spark.SparkConf) - object (class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2@72377c42) - field (class: com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, name: this$0, type: class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2) - object (class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1@5b4544bb) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 32 more Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.map(RDD.scala:365) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:92) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45) at com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:194) at com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2.call(JavaMQTTStreamWordCount.java:188) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf Serialization stack: - object not serializable (class: org.apache.spark.SparkConf, value: org.apache.spark.SparkConf@3a501e0b) - field (class: com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, name: val$conf, type: class org.apache.spark.SparkConf) - object (class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2, com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2@72377c42) - field (class: com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, name: this$0, type: class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2) - object (class com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1, com.wouri.streamly.examples.mqtt.JavaMQTTStreamWordCount$2$1@5b4544bb) - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) Any clue please? Thanks.