You could use sc.parallelize... but the offsets are already available at
the driver, and they're a (hopefully) small enough amount of data that's
it's probably more straightforward to just use the normal cassandra client
to save them from the driver.

On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <abhis.anan...@gmail.com>
wrote:

> I have a kafka rdd and I need to save the offsets to cassandra table at
> the begining of each batch.
>
> Basically I need to write the offsets of the type Offsets below that I am
> getting inside foreachRD, to cassandra. The javafunctions api to write to
> cassandra needs a rdd. How can I create a rdd from offsets and write to
> cassandra table.
>
>
> public static void writeOffsets(JavaPairDStream<String,
> String> kafkastream){
> kafkastream.foreachRDD((rdd,batchMilliSec) -> {
> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> return null;
> });
>
>
> Thanks !!
> Abhi
>
>
>

Reply via email to