I think what you are looking for is something like:

JavaRDD<Double> pricesRDD = javaFunctions(sc).cassandraTable("ks", "tab", 
mapColumnTo(Double.class)).select("price");
JavaRDD<Person> rdd = javaFunctions(sc).cassandraTable("ks", "people", 
mapRowTo(Person.class)); 

noted here: 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

?

- Helena
@helenaedelson

On Dec 5, 2014, at 10:15 AM, <m.sar...@accenture.com> <m.sar...@accenture.com> 
wrote:

> Hi Akhil, Vyas, Helena,
> 
> Thank you for your suggestions.
> 
> As Akhil suggested earlier, i have implemented the batch Duration into 
> JavaStreamingContext and waitForTermination(Duration).
> The approach Helena suggested is Scala oriented. 
> 
> But the issue now is that I want to set Cassandra as my output.
> I have created a table in cassandra "test_table" with columns "key:text 
> primary key" and "value:text"
> I have mapped the data successfully into JavaDStream<Tuple2<String,String>> 
> data :
> 
> JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
> JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(3000));
> JavaPairReceiverInputDStream<String, String> messages = 
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
> JavaDStream<Tuple2<String,String>> data = messages.map(new Function< 
> Tuple2<String,String>, Tuple2<String,String> >() 
> {
> public Tuple2<String,String> call(Tuple2<String, String> message)
> {
> return new Tuple2<String,String>( message._1(), message._2() );
> }
> }
> );
> 
> Then I have created a List:
> List<TestTable> list = new ArrayList<TestTable>();
> where TestTable is my custom class having the same structure as my Cassandra 
> table, with members "key" and "value":
> class TestTable
> {
> String key;
> String val;
> public TestTable() {}
> public TestTable(String k, String v)
> {
> key=k;
> val=v;
> }
> public String getKey(){
> return key;
> }
> public void setKey(String k){
> key=k;
> }
> public String getVal(){
> return val;
> }
> public void setVal(String v){
> val=v;
> }
> public String toString(){
> return "Key:"+key+",Val:"+val;
> }
> }
> 
> Please suggest a way how to I add the data from 
> JavaDStream<Tuple2<String,String>> data into the List<TestTable> list.
> I am doing this so that I can subsequently use 
> JavaRDD<TestTable> rdd = sc.parallelize(list); 
> javaFunctions(rdd, TestTable.class).saveToCassandra("testkeyspace", 
> "test_table"); 
> to save the RDD data into Cassandra.
> 
> I had tried coding this way:
>  messages.foreachRDD(new Function<Tuple2<String,String>, String>()
> {
> public List<TestTable> call(Tuple2<String,String> message)
> {
> String k = message._1();
> String v = message._2();
> TestTable tbl = new TestTable(k,v);
> list.put(tbl);
> }
> }
> );
> but seems some type mis-match happenning.
> Please help.
> 
> 
> 
> 
> Thanks and Regards,
> 
> Md. Aiman Sarosh.
> Accenture Services Pvt. Ltd.
> Mob #:  (+91) - 9836112841.
> From: Helena Edelson <helena.edel...@datastax.com>
> Sent: Friday, December 5, 2014 6:26 PM
> To: Sarosh, M.
> Cc: user@spark.apache.org
> Subject: Re: Spark-Streaming: output to cassandra
>  
> You can just do
> 
> You can just do something like this, the Spark Cassandra Connector handles 
> the rest
> 
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>   ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2)
>   .map { case (_, line) => line.split(",")}
>   .map(RawWeatherData(_)) 
>   .saveToCassandra(CassandraKeyspace, CassandraTableRaw)
> - Helena
> @helenaedelson
> 
> On Dec 4, 2014, at 9:51 AM, m.sar...@accenture.com wrote:
> 
>> Hi,
>> 
>> I have written the code below which is streaming data from kafka, and 
>> printing to the console.
>> I want to extend this, and want my data to go into Cassandra table instead.
>> 
>> JavaStreamingContext jssc = new JavaStreamingContext("local[4]", 
>> "SparkStream", new Duration(1000));
>> JavaPairReceiverInputDStream<String, String> messages = 
>> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
>> 
>> System.out.println("Connection done!");
>> JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, 
>> String>() 
>> {
>> public String call(Tuple2<String, String> message)
>> {
>> return message._2();
>> }
>> }
>> );
>> //data.print();   --> output to console
>> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));
>> jssc.start();
>> jssc.awaitTermination();
>> 
>> 
>> How should I implement the line:
>> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));​
>> so that data goes into Cassandra, in each batch.  And how do I specify a 
>> batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing 
>> will be entered into cassandra for sure since it is getting killed.
>> 
>> Please help.
>> 
>> Thanks and Regards,
>> 
>> Md. Aiman Sarosh.
>> Accenture Services Pvt. Ltd.
>> Mob #:  (+91) - 9836112841.
>> 
>> 
>> This message is for the designated recipient only and may contain 
>> privileged, proprietary, or otherwise confidential information. If you have 
>> received it in error, please notify the sender immediately and delete the 
>> original. Any other use of the e-mail by you is prohibited. Where allowed by 
>> local law, electronic communications with Accenture and its affiliates, 
>> including e-mail and instant messaging (including content), may be scanned 
>> by our systems for the purposes of information security and assessment of 
>> internal compliance with Accenture policy. 
>> ______________________________________________________________________________________
>> 
>> www.accenture.com
> 

Reply via email to