I create a stream from kafka as belows" val kafkaDStream = KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, kafkaConf, Set(topics))
.window(Minutes(WINDOW_DURATION),Minutes(SLIDER_DURATION)) I have a map ("intToStringList") which is a Map[Int,List[String]] using this map i am filtering the stream and finally converting it into Map[Int,DStream[KafkaGenericEvent]]] 1. Now on this map, for each and every value (which is a DStream[KafkaGenericEvent]) i am applying reduceByKeyAndWindow operation. But since we have to give window duration and slider duration even in reduceByKeyAndWindow, does that imply that on every window of the given DStream, reduceByKeyAndWindow can be applied with a different window duration and slider duration ? i.e Lets say window DStream is created with window duration-> 16 minutes, slider duration -> 1 Minute, so i have one RDD for every window For reduceByKeyAndWindow, if we have window duration as as 4 minutes and slider duration as 1 minute, then will i get 4 RDDs since the windowDStream_batchDuration / reduceByKeyAndwindow_batchDuration is 4 ? 2. As suggested in spark doc, i am trying to give checkpointing interval on the kafkaDStream created in the block shown above in the following way: kafkaDStream.checkpoint(Minutes(4)) But when i execute this, i get the error: "WindowedDStream has been marked for checkpointing but the storage level has not been set to enable persisting. Please use DStream.persist() to set the storage level to use memory for better checkpointing performance" But when i went through the implementation of checkpoint function of DStream.scala, i see a call to persist() function. Then do i really have to persist function in the WindowedDStream ? Just to give a shot i made a call to persist method on the windowedDStream and then made a call to checkpoint(interval) . Even then i am facing the above mentioned error. How do i solve this ? -- /Vamsi