I create  a stream from kafka as belows"

val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))

.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION))

I have a map ("intToStringList") which is a Map[Int,List[String]]
using this map i am filtering the stream and finally converting it into
Map[Int,DStream[KafkaGenericEvent]]]

1.
Now on this map, for each and every value (which is a
DStream[KafkaGenericEvent])
i am applying reduceByKeyAndWindow operation.
But since we have to give window duration and slider duration even in
reduceByKeyAndWindow, does that imply that on every window of the given
DStream, reduceByKeyAndWindow can be applied with a different window
duration and slider duration ?
i.e Lets say window DStream is created with window duration-> 16 minutes,
slider duration -> 1 Minute, so  i have one RDD for every window
For reduceByKeyAndWindow, if we have window duration as as 4 minutes and
slider duration as 1 minute, then will i get 4 RDDs since the
windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ?

2.
As suggested in spark doc, i am trying to give checkpointing interval on
the kafkaDStream created in the block shown above in the following way:
kafkaDStream.checkpoint(Minutes(4))

But when i execute this, i get the error:
"WindowedDStream has been marked for checkpointing but the storage level
has not been set to enable persisting. Please use DStream.persist() to set
the storage level to use memory for better checkpointing performance"
But when i went through the implementation of checkpoint function  of
DStream.scala, i see a call to persist() function.
Then do i really have to persist function in the WindowedDStream ?
Just to give a shot i made a call to persist method on the windowedDStream
and then made a call to checkpoint(interval) . Even then i am facing the
above mentioned error.
How do i solve this ?
-- 
/Vamsi

Reply via email to