Hi,
I am integrating Kafka->SparkStreaming->Cassandra, i.e. read the streaming messages from Kafka and save them into Cassandra table using spark-streaming as intermediary. Till now I am able to read streaming data from kafka into spark, and display them on console. My objective now is output the JavaDStream data to Cassandra table, instead of the console. I have written the code below, but it is throwing error. package com.spark; import scala.Tuple2; import org.apache.log4j.Logger; import org.apache.log4j.Level; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.*; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import java.util.Map; import java.util.HashMap; import java.util.List; import com.datastax.spark.connector.*; import com.datastax.driver.core.Session; import com.datastax.spark.connector.cql.CassandraConnector; import static com.datastax.spark.connector.CassandraJavaUtil.*; public class SparkStream { public static void main(String args[]) throws Exception { int counter = 0; if(args.length != 3) { System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>"); System.exit(1); } Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); Map<String,Integer> topicMap = new HashMap<String,Integer>(); String[] topic = args[2].split(","); for(String t: topic) { topicMap.put(t, new Integer(3)); } SparkConf conf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println("Connection done!+++++++++++++++++++++++++++"); JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> message) { return message._2(); } } ); //data.print(); /* Creating table in cassandra to store kafka streamed messages*/ conf.set("spark.cassandra.connection.host", "127.0.0.1"); CassandraConnector connector = CassandraConnector.apply(sc.getConf()); Session session = connector.openSession(); session.execute("CREATE TABLE testkeyspace.test_table (value TEXT PRIMARY KEY)"); /* Writing to Cassandra */ javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(String.class)).saveToCassandra(); jssc.start(); jssc.awaitTermination(new Duration(60*1000)); } } Error which I am getting: [INFO] 3 errors [INFO] ------------------------------------------------------------- [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 9.225 s [INFO] Finished at: 2014-12-06T05:29:19-05:00 [INFO] Final Memory: 25M/247M [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project SparkStreamSample: Compilation failure: Compilation failure: [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,81] cannot find symbol [ERROR] symbol: method mapToRow(java.lang.Class<java.lang.String>) [ERROR] location: class com.spark.SparkStream [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,17] no suitable method found for javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<T>,java.lang.Class<T>) is not applicable [ERROR] (cannot infer type-variable(s) T [ERROR] (actual and formal argument lists differ in length)) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.dstream.DStream<T>,java.lang.Class<T>) is not applicable [ERROR] (cannot infer type-variable(s) T [ERROR] (actual and formal argument lists differ in length)) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.api.java.JavaRDD<T>,java.lang.Class<T>) is not applicable [ERROR] (cannot infer type-variable(s) T [ERROR] (actual and formal argument lists differ in length)) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.rdd.RDD<T>,java.lang.Class<T>) is not applicable [ERROR] (cannot infer type-variable(s) T [ERROR] (actual and formal argument lists differ in length)) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext) is not applicable [ERROR] (argument mismatch; org.apache.spark.streaming.api.java.JavaDStream<java.lang.String> cannot be converted to org.apache.spark.streaming.api.java.JavaStreamingContext) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext) is not applicable [ERROR] (argument mismatch; org.apache.spark.streaming.api.java.JavaDStream<java.lang.String> cannot be converted to org.apache.spark.streaming.StreamingContext) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaSparkContext) is not applicable [ERROR] (argument mismatch; org.apache.spark.streaming.api.java.JavaDStream<java.lang.String> cannot be converted to org.apache.spark.api.java.JavaSparkContext) [ERROR] method com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.SparkContext) is not applicable [ERROR] (argument mismatch; org.apache.spark.streaming.api.java.JavaDStream<java.lang.String> cannot be converted to org.apache.spark.SparkContext) [ERROR] /root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[75,39] incompatible types: org.apache.spark.streaming.Duration cannot be converted to long [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. Please help me out. I am struggling with this since long. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841.