You'll resume and re-process the rdd that didnt finish On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <dgoldenberg...@gmail.com > wrote:
> Our additional question on checkpointing is basically the logistics of it > -- > > At which point does the data get written into checkpointing? Is it > written as soon as the driver program retrieves an RDD from Kafka (or > another source)? Or, is it written after that RDD has been processed and > we're basically moving on to the next RDD? > > What I'm driving at is, what happens if the driver program is killed? The > next time it's started, will it know, from Spark Streaming's checkpointing, > to resume from the same RDD that was being processed at the time of the > program getting killed? In other words, will we, upon restarting the > consumer, resume from the RDD that was unfinished, or will we be looking at > the next RDD? > > Will we pick up from the last known *successfully processed* topic offset? > > Thanks. > > > > > On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen <so...@cloudera.com> wrote: > >> If you've set the checkpoint dir, it seems like indeed the intent is >> to use a default checkpoint interval in DStream: >> >> private[streaming] def initialize(time: Time) { >> ... >> // Set the checkpoint interval to be slideDuration or 10 seconds, >> which ever is larger >> if (mustCheckpoint && checkpointDuration == null) { >> checkpointDuration = slideDuration * math.ceil(Seconds(10) / >> slideDuration).toInt >> logInfo("Checkpoint interval automatically set to " + >> checkpointDuration) >> } >> >> Do you see that log message? what's the interval? that could at least >> explain why it's not doing anything, if it's quite long. >> >> It sort of seems wrong though since >> https://spark.apache.org/docs/latest/streaming-programming-guide.html >> suggests it was intended to be a multiple of the batch interval. The >> slide duration wouldn't always be relevant anyway. >> >> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg >> <dgoldenberg...@gmail.com> wrote: >> > I've instrumented checkpointing per the programming guide and I can tell >> > that Spark Streaming is creating the checkpoint directories but I'm not >> > seeing any content being created in those directories nor am I seeing >> the >> > effects I'd expect from checkpointing. I'd expect any data that comes >> into >> > Kafka while the consumers are down, to get picked up when the consumers >> are >> > restarted; I'm not seeing that. >> > >> > For now my checkpoint directory is set to the local file system with the >> > directory URI being in this form: file:///mnt/dir1/dir2. I see a >> > subdirectory named with a UUID being created under there but no files. >> > >> > I'm using a custom JavaStreamingContextFactory which creates a >> > JavaStreamingContext with the directory set into it via the >> > checkpoint(String) method. >> > >> > I'm currently not invoking the checkpoint(Duration) method on the >> DStream >> > since I want to first rely on Spark's default checkpointing interval. >> My >> > streaming batch duration millis is set to 1 second. >> > >> > Anyone have any idea what might be going wrong? >> > >> > Also, at which point does Spark delete files from checkpointing? >> > >> > Thanks. >> > >