Hi Akhil, Vyas, Helena, Thank you for your suggestions.
As Akhil suggested earlier, i have implemented the batch Duration into JavaStreamingContext and waitForTermination(Duration). The approach Helena suggested is Scala oriented. But the issue now is that I want to set Cassandra as my output. I have created a table in cassandra "test_table" with columns "key:text primary key" and "value:text" I have mapped the data successfully into JavaDStream<Tuple2<String,String>> data : JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); JavaDStream<Tuple2<String,String>> data = messages.map(new Function< Tuple2<String,String>, Tuple2<String,String> >() { public Tuple2<String,String> call(Tuple2<String, String> message) { return new Tuple2<String,String>( message._1(), message._2() ); } } ); Then I have created a List: List<TestTable> list = new ArrayList<TestTable>(); where TestTable is my custom class having the same structure as my Cassandra table, with members "key" and "value": class TestTable { String key; String val; public TestTable() {} public TestTable(String k, String v) { key=k; val=v; } public String getKey(){ return key; } public void setKey(String k){ key=k; } public String getVal(){ return val; } public void setVal(String v){ val=v; } public String toString(){ return "Key:"+key+",Val:"+val; } } Please suggest a way how to I add the data from JavaDStream<Tuple2<String,String>> data into the List<TestTable> list. I am doing this so that I can subsequently use JavaRDD<TestTable> rdd = sc.parallelize(list); javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", "test_table"); to save the RDD data into Cassandra. I had tried coding this way: messages.foreachRDD(new Function<Tuple2<String,String>, String>() { public List<TestTable> call(Tuple2<String,String> message) { String k = message._1(); String v = message._2(); TestTable tbl = new TestTable(k,v); list.put(tbl); } } ); but seems some type mis-match happenning. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. ________________________________ From: Helena Edelson <helena.edel...@datastax.com> Sent: Friday, December 5, 2014 6:26 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Spark-Streaming: output to cassandra You can just do You can just do something like this, the Spark Cassandra Connector handles the rest KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2) .map { case (_, line) => line.split(",")} .map(RawWeatherData(_)) .saveToCassandra(CassandraKeyspace, CassandraTableRaw) - Helena @helenaedelson On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com<mailto:m.sar...@accenture.com> wrote: Hi, I have written the code below which is streaming data from kafka, and printing to the console. I want to extend this, and want my data to go into Cassandra table instead. JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000)); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println("Connection done!"); JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> message) { return message._2(); } } ); //data.print(); --> output to console data.foreachRDD(saveToCassandra("mykeyspace","mytable")); jssc.start(); jssc.awaitTermination(); How should I implement the line: data.foreachRDD(saveToCassandra("mykeyspace","mytable"));? so that data goes into Cassandra, in each batch. And how do I specify a batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing will be entered into cassandra for sure since it is getting killed. Please help. Thanks and Regards, Md. Aiman Sarosh. Accenture Services Pvt. Ltd. Mob #: (+91) - 9836112841. ________________________________ This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. ______________________________________________________________________________________ www.accenture.com<http://www.accenture.com>