Hi,

I am integrating Kafka->SparkStreaming->Cassandra, i.e. read the
streaming messages from Kafka and save them into Cassandra table using
spark-streaming as intermediary. Till now I am able to read streaming
data from kafka into spark, and display them on console.


My objective now is output the JavaDStream data to Cassandra table,
instead of the console.

I have written the code below, but it is throwing error.


package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import com.datastax.spark.connector.*;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class SparkStream {
    public static void main(String args[]) throws Exception
    {
        int counter = 0;
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm>
<topic1,topic2,...>");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        SparkConf conf = new SparkConf();
        JavaSparkContext sc = new JavaSparkContext("local[4]",
"SparkStream",conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new
Duration(3000));

        JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("Connection done!+++++++++++++++++++++++++++");


        JavaDStream<String> data = messages.map(new
Function<Tuple2<String, String>, String>()
                                                {
                                                    public String
call(Tuple2<String, String> message)
                                                    {
                                                        return message._2();
                                                    }
                                                }
                                                );

        //data.print();

        /* Creating table in cassandra to store kafka streamed messages*/
        conf.set("spark.cassandra.connection.host", "127.0.0.1");
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        Session session = connector.openSession();
        session.execute("CREATE TABLE testkeyspace.test_table (value
TEXT PRIMARY KEY)");

        /* Writing to Cassandra */
        javaFunctions(data).writerBuilder("testkeyspace",
"test_table", mapToRow(String.class)).saveToCassandra();

        jssc.start();
        jssc.awaitTermination(new Duration(60*1000));
    }
}


Error which I am getting:

[INFO] 3 errors
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.225 s
[INFO] Finished at: 2014-12-06T05:29:19-05:00
[INFO] Final Memory: 25M/247M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
(default-compile) on project SparkStreamSample: Compilation failure:
Compilation failure:
[ERROR] 
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,81]
cannot find symbol
[ERROR] symbol:   method mapToRow(java.lang.Class<java.lang.String>)
[ERROR] location: class com.spark.SparkStream
[ERROR] 
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[72,17]
no suitable method found for
javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>)
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.api.java.JavaDStream<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.streaming.dstream.DStream<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.api.java.JavaRDD<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.<T>javaFunctions(org.apache.spark.rdd.RDD<T>,java.lang.Class<T>)
is not applicable
[ERROR] (cannot infer type-variable(s) T
[ERROR] (actual and formal argument lists differ in length))
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.api.java.JavaStreamingContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to
org.apache.spark.streaming.api.java.JavaStreamingContext)
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.streaming.StreamingContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.streaming.StreamingContext)
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.api.java.JavaSparkContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.api.java.JavaSparkContext)
[ERROR] method 
com.datastax.spark.connector.CassandraJavaUtil.javaFunctions(org.apache.spark.SparkContext)
is not applicable
[ERROR] (argument mismatch;
org.apache.spark.streaming.api.java.JavaDStream<java.lang.String>
cannot be converted to org.apache.spark.SparkContext)
[ERROR] 
/root/Documents/SparkStreamSample/src/main/java/com/spark/SparkStream.java:[75,39]
incompatible types: org.apache.spark.streaming.Duration cannot be
converted to long
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with
the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.

Please help me out.
I am struggling with this since long.




Thanks and Regards,

Md. Aiman Sarosh.
Accenture Services Pvt. Ltd.
Mob #:  (+91) - 9836112841.

Reply via email to