I think what you are looking for is something like: JavaRDD<Double> pricesRDD = javaFunctions(sc).cassandraTable("ks", "tab", mapColumnTo(Double.class)).select("price"); JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", mapRowTo(Person.class));
noted here: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md ? - Helena @helenaedelson On Dec 5, 2014, at 10:15 AM, <m.sar...@accenture.com> <m.sar...@accenture.com> wrote: > Hi Akhil, Vyas, Helena, > > Thank you for your suggestions. > > As Akhil suggested earlier, i have implemented the batch Duration into > JavaStreamingContext and waitForTermination(Duration). > The approach Helena suggested is Scala oriented. > > But the issue now is that I want to set Cassandra as my output. > I have created a table in cassandra "test_table" with columns "key:text > primary key" and "value:text" > I have mapped the data successfully into JavaDStream<Tuple2<String,String>> > data : > > JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf); > JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000)); > JavaPairReceiverInputDStream<String, String> messages = > KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); > JavaDStream<Tuple2<String,String>> data = messages.map(new Function< > Tuple2<String,String>, Tuple2<String,String> >() > { > public Tuple2<String,String> call(Tuple2<String, String> message) > { > return new Tuple2<String,String>( message._1(), message._2() ); > } > } > ); > > Then I have created a List: > List<TestTable> list = new ArrayList<TestTable>(); > where TestTable is my custom class having the same structure as my Cassandra > table, with members "key" and "value": > class TestTable > { > String key; > String val; > public TestTable() {} > public TestTable(String k, String v) > { > key=k; > val=v; > } > public String getKey(){ > return key; > } > public void setKey(String k){ > key=k; > } > public String getVal(){ > return val; > } > public void setVal(String v){ > val=v; > } > public String toString(){ > return "Key:"+key+",Val:"+val; > } > } > > Please suggest a way how to I add the data from > JavaDStream<Tuple2<String,String>> data into the List<TestTable> list. > I am doing this so that I can subsequently use > JavaRDD<TestTable> rdd = sc.parallelize(list); > javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", > "test_table"); > to save the RDD data into Cassandra. > > I had tried coding this way: > messages.foreachRDD(new Function<Tuple2<String,String>, String>() > { > public List<TestTable> call(Tuple2<String,String> message) > { > String k = message._1(); > String v = message._2(); > TestTable tbl = new TestTable(k,v); > list.put(tbl); > } > } > ); > but seems some type mis-match happenning. > Please help. > > > > > Thanks and Regards, > > Md. Aiman Sarosh. > Accenture Services Pvt. Ltd. > Mob #: (+91) - 9836112841. > From: Helena Edelson <helena.edel...@datastax.com> > Sent: Friday, December 5, 2014 6:26 PM > To: Sarosh, M. > Cc: user@spark.apache.org > Subject: Re: Spark-Streaming: output to cassandra > > You can just do > > You can just do something like this, the Spark Cassandra Connector handles > the rest > > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2) > .map { case (_, line) => line.split(",")} > .map(RawWeatherData(_)) > .saveToCassandra(CassandraKeyspace, CassandraTableRaw) > - Helena > @helenaedelson > > On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote: > >> Hi, >> >> I have written the code below which is streaming data from kafka, and >> printing to the console. >> I want to extend this, and want my data to go into Cassandra table instead. >> >> JavaStreamingContext jssc = new JavaStreamingContext("local[4]", >> "SparkStream", new Duration(1000)); >> JavaPairReceiverInputDStream<String, String> messages = >> KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); >> >> System.out.println("Connection done!"); >> JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, >> String>() >> { >> public String call(Tuple2<String, String> message) >> { >> return message._2(); >> } >> } >> ); >> //data.print(); --> output to console >> data.foreachRDD(saveToCassandra("mykeyspace","mytable")); >> jssc.start(); >> jssc.awaitTermination(); >> >> >> How should I implement the line: >> data.foreachRDD(saveToCassandra("mykeyspace","mytable")); >> so that data goes into Cassandra, in each batch. And how do I specify a >> batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing >> will be entered into cassandra for sure since it is getting killed. >> >> Please help. >> >> Thanks and Regards, >> >> Md. Aiman Sarosh. >> Accenture Services Pvt. Ltd. >> Mob #: (+91) - 9836112841. >> >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Where allowed by >> local law, electronic communications with Accenture and its affiliates, >> including e-mail and instant messaging (including content), may be scanned >> by our systems for the purposes of information security and assessment of >> internal compliance with Accenture policy. >> ______________________________________________________________________________________ >> >> www.accenture.com >