[ 
https://issues.apache.org/jira/browse/SPARK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-10042:
---------------------------------
    Labels: bulk-closed  (was: )

> Use consistent behavior for Internal Accumulators across stage retries
> ----------------------------------------------------------------------
>
>                 Key: SPARK-10042
>                 URL: https://issues.apache.org/jira/browse/SPARK-10042
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, Web UI
>    Affects Versions: 1.5.0
>            Reporter: Imran Rashid
>            Priority: Major
>              Labels: bulk-closed
>
> [~andrewor14]
> The internal accumulators introduced in SPARK-8735 aren't counted in a 
> consistent manner during stage retries.  Whether the accumulators are counted 
> once or multiple times is very random.
> First a little interlude on how stage failures & retry works.  When there is 
> a fetch failure, spark looks at the block manager that it failed to fetch 
> data from, and it assumes none of the data from that BM is available.  It 
> fails the stage with the fetch failure, then it goes back to the 
> ShuffleMapStage that produced the data.  It looks at which partitions were 
> stored in the failed BM, and it reruns just those partitions.  Meanwhile, all 
> currently running tasks for current stage keep running, potentially producing 
> more fetch failures.  In fact, some of those tasks can even keep running 
> until the dependent stage has been re-run, and this stage has been restarted. 
>  (Yes, this can and does happen under real workloads, and is the cause of a 
> SPARK-8029, a serious failure in real workloads.)
> If Spark has lost multiple BMs (which might mean its lost all the shuffle map 
> output of an earlier stage), there are a few different ways that shuffle map 
> output will get regenerated.  Perhaps there will be enough tasks running to 
> trigger fetch failures on all the lost BMs before the earlier stage is 
> restarted, so by the time the stage is re-scheduled, the scheduler knows to 
> rerun all the tasks.  Or maybe it only gets a failure on one block manager, 
> so it re-generates the map output for that one block manager, and then on 
> trying the downstream stage, it realizes another block manager is down, and 
> repeats the process, one BM at a time, till everything has been regenerated.  
> Or perhaps as its regenerating the map output from the first failure, the 
> "zombie" tasks from the failed stage that are still running trigger fetch 
> failures from all the other block managers.  And then as soon as shuffle map 
> stage is done regenerating data for one BM, it'll immediately regenerate the 
> data for the other lost BMs before trying the downstream stage.  (And then 
> there are assorted combinations as well.)
> This means that it is totally unpredictable how many partitions will get 
> rerun for the ShuffleMapStage that was previously successful.  Eg., run your 
> example program:
> {noformat}
> import org.apache.spark._
> import org.apache.spark.shuffle.FetchFailedException
> val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
> val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_, 
> _, _]].shuffleHandle
> // Simulate fetch failures
> val mappedData = data.map { case (i, _) =>
>   val taskContext = TaskContext.get
>   if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
>     // Cause the post-shuffle stage to fail on its first attempt with a 
> single task failure
>     val env = SparkEnv.get
>     val bmAddress = env.blockManager.blockManagerId
>     val shuffleId = shuffleHandle.shuffleId
>     val mapId = 0
>     val reduceId = taskContext.partitionId()
>     val message = "Simulated fetch failure"
>     throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, 
> message)
>   } else {
>     (i, i)
>   }
> }
> mappedData.reduceByKey ({ _ + _ }, 500).count()
> {noformat}
> with the current condition on resetting the accumulators, that is 
> {{(stage.internalAccumulators.isEmpty || allPartitions == 
> partitionsToCompute)}}.  In {{local}} mode all partitions will get re-run.  
> Then try running it with {{local-cluster[2,1,1024]}} (which will create two 
> block managers).  Here's some example debug output from when I ran it:
> {noformat}
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 1, 2, 3, 4
>   - internal accum values:
> === STAGE 0 IS CREATING NEW ACCUMULATORS ===
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 1, 2, 3, 4
>   - internal accum values:
> === STAGE 1 IS CREATING NEW ACCUMULATORS ===
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6, 
> 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), 
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...)
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 1, 2, 4
>   - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 2, 3, 4
>   - internal accum values: 3936
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11, 
> 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639), 
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 2
>   - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 4
>   - internal accum values: 7872
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15, 
> 192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640), 
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 1, 3, 4
>   - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
>   - all partitions: 0, 1, 2, 3, 4
>   - partitions to compute: 0, 1, 2, 3, 4
>   - internal accum values: 9840
> === STAGE 1 IS CREATING NEW ACCUMULATORS ===
> ...
> {noformat}
> As you can see, {{partitionsToCompute != allPartitions}} in most cases.  For 
> example, in the second submission of stage 1, we would have **double 
> counted** the accumulators for partitions 0,2,3,4.  By the third submission 
> of stage 1, we would have **triple counted** partitions 0 & 4.  Or then 
> again, we just might reset the values and count singly, as we do in the final 
> iteration you see here.
> I had earlier suggested that we should never reset the value, just initialize 
> it once, and have the value keep increasing.  But maybe that isn't what you 
> want -- maybe you want to *always* reset the value?  Then the metrics would 
> clearly apply to that one stage *attempt* alone.  In any case, we are stuck 
> with the fact that skipped stages (which come from a shared narrow 
> dependency) do not share the same {{Stage}} object, even though they are 
> conceptually the same stage to a user.  So retries from skipped stages also 
> suggests that our goal should be for each attempt to have a cleared value for 
> the accumulators, since that is the behavior we're stuck with on retries via 
> a skipped stage in any case.  We could either always reset internal 
> accumulators, or have them be a property of the stage **attempt** which just 
> gets intiailized w/ the attempt and never reset.
> Another option would be for the UI to just display the *update* from each 
> task, rather than the accumulator value at the end of the task
> https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372
> That would make the global value of the accumulator entirely irrelevant.  In 
> fact, I'm not certain how to interpret the sum of the memory used in each 
> task.  If I have 10K tasks, running in 20 slots, the sum across all 10K tasks 
> is probably over-estimating the memory used by 500x.  Its even stranger to 
> report the quartiles of that partial sum as tasks complete.  I highly doubt 
> most users will understand what that summary metric means, and even if they 
> did understand, it seems to have very little value.
> (Only using the *update* from each task would also mean that we wouldn't be 
> using the accumulators to "accumulate" anything, it just becomes the place we 
> happen to cram our per-task metrics.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to