Hi,

   I am very new to spark & spark-streaming. I am planning to use spark
streaming for real time processing.

   I have created a streaming context and checkpointing to hdfs directory
for recovery purposes in case of executor failures & driver failures.

I am creating Dstream with offset map for getting the data from kafka. I am
simply ignoring the offsets to understand the behavior. Whenver I restart
application driver restored from checkpoint as expected but Dstream is not
getting started from the initial offsets. Dstream was created with the last
consumed offsets instead of startign from 0 offsets for each topic
partition as I am not storing the offsets any where.

def main : Unit = {

    var sparkStreamingContext =
StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
  () => creatingFunc())

    ...


}

def creatingFunc(): Unit = {

    ...

    var offsets:Map[TopicAndPartition, Long] =
Map(TopicAndPartition("sample_sample3_json",0) -> 0)

        KafkaUtils.createDirectStream[String,String, StringDecoder,
StringDecoder,
String](sparkStreamingContext, kafkaParams, offsets, messageHandler)

...
}

I want to get control over offset management at event level instead of RDD
level to make sure that at least once delivery to end system.

As per my understanding, every RDD or RDD partition will stored in hdfs as
a file If I choose to use HDFS as output. If I use 1sec as batch interval
then it will be ended up having huge number of small files in HDFS. Having
small files in HDFS will leads to lots of other issues.
Is there any way to write multiple RDDs into single file? Don't have muh
idea about *coalesce* usage. In the worst case, I can merge all small files
in HDFS in regular intervals.

Thanks...

------
Thanks
Raju Bairishetti
www.lazada.com

Reply via email to