i did it and see lineage change
BEFORE calling action. No success.
Job$ - isCheckpointed? false, getCheckpointFile: None
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at Job.scala:112 []
| MapPartitionsRDD[6] at map at Job.scala:111 []
| MapPartitionsRDD[5] at map at ....scala:40 []
| ShuffledRDD[4] at reduceByKey at ....scala:31 []
+-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
| MapPartitionsRDD[2] at map at ...:66 []
AFTER calling action. nice, it works!
Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
Job$ - recordsRDD.toDebugString:
(2) MapPartitionsRDD[7] at map at Job.scala:112 []
Lineage now contains only one stage but I want to get rid of it too. This
stage happens right before the checkpoint. Will Spark try to re-run it in
case task failure AFTER checkpoint?
My expectation is that spark will read directly from checkpoint dir, It
doesn't have to do anything with previous MapPartitionsRDD[7] at map at
Job.scala:112
ср, 19 авг. 2020 г. в 16:01, Russell Spitzer <[email protected]>:
> Checkpoint is lazy and needs an action to actually do the work. The method
> just marks the rdd as noted in the doc you posted.
>
> Call an action twice. The second run should use the checkpoint.
>
>
>
> On Wed, Aug 19, 2020, 8:49 AM Ivan Petrov <[email protected]> wrote:
>
>> i think it returns Unit... it won't work
>> [image: image.png]
>>
>> I found another way to make it work. Called action after checkpoint
>> val recordsRDD = convertToRecords(anotherRDD)
>> recordsRDD.checkpoint()
>> logger.info("checkpoint done")
>> recordsRDD.count() // (!!!)
>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>> logger.info(s"recordsRDD.toDebugString:
>> \n${recordsRDD.toDebugString}")
>>
>> Output:
>> Job$ - checkpoint done (!!!)
>>
>> Job$ - isCheckpointed? true, getCheckpointFile: Some(path)
>> Job$ - recordsRDD.toDebugString:
>> (2) MapPartitionsRDD[7] at map at Job.scala:112 []
>>
>> But still it has single MapPartitionsRDD in lineage. Lineage became
>> shorter. But i don't want Spark to rebuild RDD from MapPartitionsRDD, i
>> want it to take data directly from checkpoint dir.
>> MapPartitionsRDD has non-idempotent id generation. i don't want to call
>> it twice in case of downstream task failure
>>
>>
>>
>>
>> ср, 19 авг. 2020 г. в 14:47, Jacob Lynn <[email protected]>:
>>
>>> Hi Ivan,
>>>
>>> Unlike cache/persist, checkpoint does not operate in-place but requires
>>> the result to be assigned to a new variable. In your case:
>>>
>>> val recordsRDD = convertToRecords(anotherRDD).checkpoint()
>>>
>>> Best,
>>> Jacob
>>>
>>> Op wo 19 aug. 2020 om 14:39 schreef Ivan Petrov <[email protected]>:
>>>
>>>> Hi!
>>>> Seems like I do smth wrong. I call .checkpoint() on RDD, but it's not
>>>> checkpointed...
>>>> What do I do wrong?
>>>>
>>>> val recordsRDD = convertToRecords(anotherRDD)
>>>> recordsRDD.checkpoint()
>>>> logger.info("checkpoint done")
>>>>
>>>> logger.info(s"isCheckpointed? ${recordsRDD.isCheckpointed},
>>>> getCheckpointFile: ${recordsRDD.getCheckpointFile}")
>>>> logger.info(s"recordsRDD.toDebugString: \n${recordsRDD.toDebugString}")
>>>>
>>>> Output:
>>>> Job$ - checkpoint done (!!!)
>>>>
>>>> But then.....
>>>> Job$ - isCheckpointed? false, getCheckpointFile: None
>>>> Job$ - recordsRDD.toDebugString:
>>>> (2) MapPartitionsRDD[7] at map at Job.scala:112 []
>>>> | MapPartitionsRDD[6] at map at Job.scala:111 []
>>>> | MapPartitionsRDD[5] at map at ....scala:40 []
>>>> | ShuffledRDD[4] at reduceByKey at ....scala:31 []
>>>> +-(2) MapPartitionsRDD[3] at flatMap at ...scala:30 []
>>>> | MapPartitionsRDD[2] at map at ...:66 []
>>>>
>>>