Hi Ivan, Unlike cache/persist, checkpoint does not operate in-place but requires the result to be assigned to a new variable. In your case:
val recordsRDD = convertToRecords(anotherRDD).checkpoint() Best, Jacob Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacyt...@gmail.com>: > Hi! > Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not > checkpointed... > What do I do wrong? > > val recordsRDD = convertToRecords(anotherRDD) > recordsRDD.checkpoint() > logger.info("checkpoint done") > > logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed}, > getCheckpointFile: ${recordsRDD.getCheckpointFile}") > logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}") > > Output: > Job$ - checkpoint done (!!!) > > But then..... > Job$ - isCheckpointed? false, getCheckpointFile: None > Job$ - recordsRDD.toDebugString: > (2) MapPartitionsRDD[7] at map at Job.scala:112 [] > | MapPartitionsRDD[6] at map at Job.scala:111 [] > | MapPartitionsRDD[5] at map at ....scala:40 [] > | ShuffledRDD[4] at reduceByKey at ....scala:31 [] > +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 [] > | MapPartitionsRDD[2] at map at ...:66 [] >