i did it and see lineage change BEFORE calling action. No success.
Job$ - isCheckpointed? false, getCheckpointFile: None Job$ - recordsRDD.toDebugString: (2) MapPartitionsRDD[7] at map at Job.scala:112 [] | MapPartitionsRDD[6] at map at Job.scala:111 [] | MapPartitionsRDD[5] at map at ....scala:40 [] | ShuffledRDD[4] at reduceByKey at ....scala:31 [] +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 [] | MapPartitionsRDD[2] at map at ...:66 [] AFTER calling action. nice, it works! Job$ - isCheckpointed? true, getCheckpointFile: Some(path) Job$ - recordsRDD.toDebugString: (2) MapPartitionsRDD[7] at map at Job.scala:112 [] Lineage now contains only one stage but I want to get rid of it too. This stage happens right before the checkpoint. Will Spark try to re-run it in case task failure AFTER checkpoint? My expectation is that spark will read directly from checkpoint dir, It doesn't have to do anything with previous MapPartitionsRDD[7] at map at Job.scala:112 ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <russell.spit...@gmail.com>: > Checkpoint is lazy and needs an action to actually do the work. The method > just marks the rdd as noted in the doc you posted. > > Call an action twice. The second run should use the checkpoint. > > > > On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <capacyt...@gmail.com> wrote: > >> i think it returns Unit... it won't work >> [image: image.png] >> >> I found another way to make it work. Called action after checkpoint >> val recordsRDD = convertToRecords(anotherRDD) >> recordsRDD.checkpoint() >> logger.info("checkpoint done") >> recordsRDD.count() // (!!!) >> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, >> getCheckpointFile: ${recordsRDD.getCheckpointFile}") >> logger.info(s"recordsRDD.toDebugString: >> \n${recordsRDD.toDebugString}") >> >> Output: >> Job$ - checkpoint done (!!!) >> >> Job$ - isCheckpointed? true, getCheckpointFile: Some(path) >> Job$ - recordsRDD.toDebugString: >> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >> >> But still it has single MapPartitionsRDD in lineage. Lineage became >> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i >> want it to take data directly from checkpoint dir. >> MapPartitionsRDD has non-idempotent id generation. i don't want to call >> it twice in case of downstream task failure >> >> >> >> >> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abebopare...@gmail.com>: >> >>> Hi Ivan, >>> >>> Unlike cache/persist, checkpoint does not operate in-place but requires >>> the result to be assigned to a new variable. In your case: >>> >>> val recordsRDD = convertToRecords(anotherRDD).checkpoint() >>> >>> Best, >>> Jacob >>> >>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacyt...@gmail.com>: >>> >>>> Hi! >>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not >>>> checkpointed... >>>> What do I do wrong? >>>> >>>> val recordsRDD = convertToRecords(anotherRDD) >>>> recordsRDD.checkpoint() >>>> logger.info("checkpoint done") >>>> >>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, >>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}") >>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}") >>>> >>>> Output: >>>> Job$ - checkpoint done (!!!) >>>> >>>> But then..... >>>> Job$ - isCheckpointed? false, getCheckpointFile: None >>>> Job$ - recordsRDD.toDebugString: >>>> (2) MapPartitionsRDD[7] at map at Job.scala:112 [] >>>> | MapPartitionsRDD[6] at map at Job.scala:111 [] >>>> | MapPartitionsRDD[5] at map at ....scala:40 [] >>>> | ShuffledRDD[4] at reduceByKey at ....scala:31 [] >>>> +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 [] >>>> | MapPartitionsRDD[2] at map at ...:66 [] >>>> >>>