You can just do
You can just do something like this, the Spark Cassandra Connector handles the
rest
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(KafkaTopicRaw -> 10), StorageLevel.DISK_ONLY_2)
.map { case (_, line) => line.split(",")}
.map(RawWeatherData(_))
.saveToCassandra(CassandraKeyspace, CassandraTableRaw)
- Helena
@helenaedelson
On Dec 4, 2014, at 9:51 AM, [email protected] wrote:
> Hi,
>
> I have written the code below which is streaming data from kafka, and
> printing to the console.
> I want to extend this, and want my data to go into Cassandra table instead.
>
> JavaStreamingContext jssc = new JavaStreamingContext("local[4]",
> "SparkStream", new Duration(1000));
> JavaPairReceiverInputDStream<String, String> messages =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
>
> System.out.println("Connection done!");
> JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>,
> String>()
> {
> public String call(Tuple2<String, String> message)
> {
> return message._2();
> }
> }
> );
> //data.print(); --> output to console
> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));
> jssc.start();
> jssc.awaitTermination();
>
>
> How should I implement the line:
> data.foreachRDD(saveToCassandra("mykeyspace","mytable"));
> so that data goes into Cassandra, in each batch. And how do I specify a
> batch, because if i do Ctrl+C on the console of streaming-job-jar, nothing
> will be entered into cassandra for sure since it is getting killed.
>
> Please help.
>
> Thanks and Regards,
>
> Md. Aiman Sarosh.
> Accenture Services Pvt. Ltd.
> Mob #: (+91) - 9836112841.
>
>
> This message is for the designated recipient only and may contain privileged,
> proprietary, or otherwise confidential information. If you have received it
> in error, please notify the sender immediately and delete the original. Any
> other use of the e-mail by you is prohibited. Where allowed by local law,
> electronic communications with Accenture and its affiliates, including e-mail
> and instant messaging (including content), may be scanned by our systems for
> the purposes of information security and assessment of internal compliance
> with Accenture policy.
> ______________________________________________________________________________________
>
> www.accenture.com