Todd's withSessionDo suggestion seems like a better idea. On Wed, Feb 17, 2016 at 12:25 AM, Abhishek Anand <abhis.anan...@gmail.com> wrote:
> Hi Cody, > > I am able to do using this piece of code > > kafkaStreamRdd.foreachRDD((rdd,batchMilliSec) -> { > Date currentBatchTime = new Date(); > currentBatchTime.setTime(batchMilliSec.milliseconds()); > List<OffsetsClass> r = new ArrayList(); > OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); > for(int partition = 0; partition < offsetRanges.length; partition++){ > //Add offsets to the list > } > JavaSparkContext ctx = new JavaSparkContext(rdd.context()); > JavaRDD<OffsetsClass> currrentBatchOffsets = ctx.parallelize(r); > //write currrentBatchOffsets rdd to cassandra > return null; > }); > > > Is this the correct way of doing this ? > > > Thanks !! > Abhi > > On Tue, Feb 16, 2016 at 9:31 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> You could use sc.parallelize... but the offsets are already available at >> the driver, and they're a (hopefully) small enough amount of data that's >> it's probably more straightforward to just use the normal cassandra client >> to save them from the driver. >> >> On Tue, Feb 16, 2016 at 1:15 AM, Abhishek Anand <abhis.anan...@gmail.com> >> wrote: >> >>> I have a kafka rdd and I need to save the offsets to cassandra table at >>> the begining of each batch. >>> >>> Basically I need to write the offsets of the type Offsets below that I >>> am getting inside foreachRD, to cassandra. The javafunctions api to write >>> to cassandra needs a rdd. How can I create a rdd from offsets and write to >>> cassandra table. >>> >>> >>> public static void writeOffsets(JavaPairDStream<String, >>> String> kafkastream){ >>> kafkastream.foreachRDD((rdd,batchMilliSec) -> { >>> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); >>> return null; >>> }); >>> >>> >>> Thanks !! >>> Abhi >>> >>> >>> >> >