Many thanks Cody, it explains quite a bit. I had couple of problems with checkpointing and graceful shutdown moving from working code in Spark 1.3.0 to 1.5.0. Having InterruptedExceptions, KafkaDirectStream couldn't initialize, some exceptions regarding WAL even I'm using direct stream. Meanwhile I did some major code refactorings and suddenly it seems to be working same as in 1.3.0, without knowing what actually I did to solve it. But I'm going to put on a side for now as far as it work as it is now because I plan to write my own recovery at some point.
Petr On Fri, Sep 25, 2015 at 12:14 PM, Petr Novak <oss.mli...@gmail.com> wrote: > Many thanks Cody, it explains quite a bit. > > I had couple of problems with checkpointing and graceful shutdown moving > from working code in Spark 1.3.0 to 1.5.0. Having InterruptedExceptions, > KafkaDirectStream couldn't initialize, some exceptions regarding WAL even > I'm using direct stream. Meanwhile I did some major code refactorings and > suddenly it seems to be working same as in 1.3.0, without knowing what > actually I did to solve it. But I'm going to put on a side for now as far > as it work as it is now because I plan to write my own recovery at some > point. > > Petr > > On Wed, Sep 23, 2015 at 4:26 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> TD can correct me on this, but I believe checkpointing is done after a >> set of jobs is submitted, not after they are completed. If you fail while >> processing the jobs, starting over from that checkpoint should put you in >> the correct state. >> >> In any case, are you actually observing a loss of messages when killing / >> restarting a job? >> >> On Wed, Sep 23, 2015 at 3:49 AM, Petr Novak <oss.mli...@gmail.com> wrote: >> >>> Hi, >>> I have 2 streams and checkpointing with code based on documentation. One >>> stream is transforming data from Kafka and saves them to Parquet file. The >>> other stream uses the same stream and does updateStateByKey to compute some >>> aggregations. There is no gracefulShutdown. >>> >>> Both use about this code to save files: >>> >>> stream.foreachRDD { (rdd, time) => >>> ... >>> rdd.toDF().write.save(...use time for the directory name...) >>> } >>> >>> It is not idempotent at the moment but let's put this aside for now. >>> >>> The strange thing is that when I Ctrl+C the job I can see checkpoint >>> file with timestamp for the last batch but there are no stream >>> files/directories for this timestamp or only one of streams have data saved >>> with time aligned with the last checkpoint file. I would expect that >>> checkpoint file is created after both streams successfully finishes its >>> saves and that it is created at the end of the batch. Otherwise I don't >>> know for what checkpointing is good for except maybe cutting lineage. Is >>> file saving asynchronous and Spark checkpointing does not care about it? >>> >>> I actually need to checkpoint both streams atomically at the end of the >>> batch. It seems to me that Spark checkpoiting facility is quite unusable in >>> practice except for some simple scenarios and everybody has to actually >>> roll its own. >>> >>> Am I wrong? How can I use Spark checkpoiting to checkpoint both streams >>> after they successfully save its results to files. It is actually the >>> reason while I think that micro-batch streaming is nice because it has >>> clearly defined synchronization barrier. But it doesn't seems like >>> checkpointing takes an advantage of it. >>> >>> I can't ensure atomicity when saving more files for more streams and it >>> would require some further cleanup code on job restart. But at least I >>> would like to have a quarantee where existence of checkpoint file signals >>> that batch with that timestamp finished successfully with all its RDD >>> actions. >>> >>> Or it is expected to behave like this and I have something wrong with my >>> code? >>> >>> Many thanks for any insights, >>> Petr >>> >>> >> >