i did it and see lineage change

BEFORE calling action. No success.

Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at  Job.scala:112 []
 |  MapPartitionsRDD[6] at map at  Job.scala:111 []
 |  MapPartitionsRDD[5] at map at ....scala:40 []
 |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
 +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
    |  MapPartitionsRDD[2] at map at ...:66 []

AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
    Job$ - recordsRDD.toDebugString:
    (2) MapPartitionsRDD[7] at map at  Job.scala:112 []

Lineage now contains only one stage but I want to get rid of it too. This
stage happens right before the checkpoint. Will Spark try to re-run it in
case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It
doesn't have to do anything with previous MapPartitionsRDD[7] at map at
 Job.scala:112

ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <russell.spit...@gmail.com>:

> Checkpoint is lazy and needs an action to actually do the work. The method
> just marks the rdd as noted in the doc you posted.
>
> Call an action twice. The second run should use the checkpoint.
>
>
>
> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <capacyt...@gmail.com> wrote:
>
>> i think it returns Unit... it won't work
>> [image: image.png]
>>
>> I found another way to make it work. Called action after checkpoint
>> val recordsRDD = convertToRecords(anotherRDD)
>>     recordsRDD.checkpoint()
>>     logger.info("checkpoint done")
>>     recordsRDD.count() // (!!!)
>>     logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>     logger.info(s"recordsRDD.toDebugString:
>> \n${recordsRDD.toDebugString}")
>>
>>     Output:
>>     Job$ - checkpoint done (!!!)
>>
>>     Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>>     Job$ - recordsRDD.toDebugString:
>>     (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>
>> But still it has single MapPartitionsRDD in lineage. Lineage became
>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>> want it to take data directly from checkpoint dir.
>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>> it twice in case of downstream task failure
>>
>>
>>
>>
>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <abebopare...@gmail.com>:
>>
>>> Hi Ivan,
>>>
>>> Unlike cache/persist, checkpoint does not operate in-place but requires
>>> the result to be assigned to a new variable. In your case:
>>>
>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>
>>> Best,
>>> Jacob
>>>
>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <capacyt...@gmail.com>:
>>>
>>>> Hi!
>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>> checkpointed...
>>>> What do I do wrong?
>>>>
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>> recordsRDD.checkpoint()
>>>> logger.info("checkpoint done")
>>>>
>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>>
>>>> Output:
>>>> Job$ - checkpoint done (!!!)
>>>>
>>>> But then.....
>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>> Job$ - recordsRDD.toDebugString:
>>>> (2) MapPartitionsRDD[7] at map at  Job.scala:112 []
>>>>  |  MapPartitionsRDD[6] at map at  Job.scala:111 []
>>>>  |  MapPartitionsRDD[5] at map at ....scala:40 []
>>>>  |  ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>>  +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>>     |  MapPartitionsRDD[2] at map at ...:66 []
>>>>
>>>

Reply via email to