[
https://issues.apache.org/jira/browse/SPARK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-10042:
---------------------------------
Labels: bulk-closed (was: )
> Use consistent behavior for Internal Accumulators across stage retries
> ----------------------------------------------------------------------
>
> Key: SPARK-10042
> URL: https://issues.apache.org/jira/browse/SPARK-10042
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, Web UI
> Affects Versions: 1.5.0
> Reporter: Imran Rashid
> Priority: Major
> Labels: bulk-closed
>
> [~andrewor14]
> The internal accumulators introduced in SPARK-8735 aren't counted in a
> consistent manner during stage retries. Whether the accumulators are counted
> once or multiple times is very random.
> First a little interlude on how stage failures & retry works. When there is
> a fetch failure, spark looks at the block manager that it failed to fetch
> data from, and it assumes none of the data from that BM is available. It
> fails the stage with the fetch failure, then it goes back to the
> ShuffleMapStage that produced the data. It looks at which partitions were
> stored in the failed BM, and it reruns just those partitions. Meanwhile, all
> currently running tasks for current stage keep running, potentially producing
> more fetch failures. In fact, some of those tasks can even keep running
> until the dependent stage has been re-run, and this stage has been restarted.
> (Yes, this can and does happen under real workloads, and is the cause of a
> SPARK-8029, a serious failure in real workloads.)
> If Spark has lost multiple BMs (which might mean its lost all the shuffle map
> output of an earlier stage), there are a few different ways that shuffle map
> output will get regenerated. Perhaps there will be enough tasks running to
> trigger fetch failures on all the lost BMs before the earlier stage is
> restarted, so by the time the stage is re-scheduled, the scheduler knows to
> rerun all the tasks. Or maybe it only gets a failure on one block manager,
> so it re-generates the map output for that one block manager, and then on
> trying the downstream stage, it realizes another block manager is down, and
> repeats the process, one BM at a time, till everything has been regenerated.
> Or perhaps as its regenerating the map output from the first failure, the
> "zombie" tasks from the failed stage that are still running trigger fetch
> failures from all the other block managers. And then as soon as shuffle map
> stage is done regenerating data for one BM, it'll immediately regenerate the
> data for the other lost BMs before trying the downstream stage. (And then
> there are assorted combinations as well.)
> This means that it is totally unpredictable how many partitions will get
> rerun for the ShuffleMapStage that was previously successful. Eg., run your
> example program:
> {noformat}
> import org.apache.spark._
> import org.apache.spark.shuffle.FetchFailedException
> val data = sc.parallelize(1 to 1e3.toInt, 500).map(identity).groupBy(identity)
> val shuffleHandle = data.dependencies.head.asInstanceOf[ShuffleDependency[_,
> _, _]].shuffleHandle
> // Simulate fetch failures
> val mappedData = data.map { case (i, _) =>
> val taskContext = TaskContext.get
> if (taskContext.attemptNumber() == 0 && taskContext.partitionId() == 50) {
> // Cause the post-shuffle stage to fail on its first attempt with a
> single task failure
> val env = SparkEnv.get
> val bmAddress = env.blockManager.blockManagerId
> val shuffleId = shuffleHandle.shuffleId
> val mapId = 0
> val reduceId = taskContext.partitionId()
> val message = "Simulated fetch failure"
> throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId,
> message)
> } else {
> (i, i)
> }
> }
> mappedData.reduceByKey ({ _ + _ }, 500).count()
> {noformat}
> with the current condition on resetting the accumulators, that is
> {{(stage.internalAccumulators.isEmpty || allPartitions ==
> partitionsToCompute)}}. In {{local}} mode all partitions will get re-run.
> Then try running it with {{local-cluster[2,1,1024]}} (which will create two
> block managers). Here's some example debug output from when I ran it:
> {noformat}
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 1, 2, 3, 4
> - internal accum values:
> === STAGE 0 IS CREATING NEW ACCUMULATORS ===
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 1, 2, 3, 4
> - internal accum values:
> === STAGE 1 IS CREATING NEW ACCUMULATORS ===
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 6,
> 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639),
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...)
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 1, 2, 4
> - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 2, 3, 4
> - internal accum values: 3936
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.1 (TID 11,
> 192.168.1.106): FetchFailed(BlockManagerId(0, 192.168.1.106, 61639),
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 2
> - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 4
> - internal accum values: 7872
> 15/08/14 17:17:41 WARN TaskSetManager: Lost task 0.0 in stage 1.2 (TID 15,
> 192.168.1.106): FetchFailed(BlockManagerId(1, 192.168.1.106, 61640),
> shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Simulated fetch failure
> ...
> === STAGE ShuffleMapStage (0) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 1, 3, 4
> - internal accum values: 0
> === STAGE ShuffleMapStage (1) IS SUBMITTING MISSING TASKS ===
> - all partitions: 0, 1, 2, 3, 4
> - partitions to compute: 0, 1, 2, 3, 4
> - internal accum values: 9840
> === STAGE 1 IS CREATING NEW ACCUMULATORS ===
> ...
> {noformat}
> As you can see, {{partitionsToCompute != allPartitions}} in most cases. For
> example, in the second submission of stage 1, we would have **double
> counted** the accumulators for partitions 0,2,3,4. By the third submission
> of stage 1, we would have **triple counted** partitions 0 & 4. Or then
> again, we just might reset the values and count singly, as we do in the final
> iteration you see here.
> I had earlier suggested that we should never reset the value, just initialize
> it once, and have the value keep increasing. But maybe that isn't what you
> want -- maybe you want to *always* reset the value? Then the metrics would
> clearly apply to that one stage *attempt* alone. In any case, we are stuck
> with the fact that skipped stages (which come from a shared narrow
> dependency) do not share the same {{Stage}} object, even though they are
> conceptually the same stage to a user. So retries from skipped stages also
> suggests that our goal should be for each attempt to have a cleared value for
> the accumulators, since that is the behavior we're stuck with on retries via
> a skipped stage in any case. We could either always reset internal
> accumulators, or have them be a property of the stage **attempt** which just
> gets intiailized w/ the attempt and never reset.
> Another option would be for the UI to just display the *update* from each
> task, rather than the accumulator value at the end of the task
> https://github.com/apache/spark/blob/cf016075a006034c24c5b758edb279f3e151d25d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L372
> That would make the global value of the accumulator entirely irrelevant. In
> fact, I'm not certain how to interpret the sum of the memory used in each
> task. If I have 10K tasks, running in 20 slots, the sum across all 10K tasks
> is probably over-estimating the memory used by 500x. Its even stranger to
> report the quartiles of that partial sum as tasks complete. I highly doubt
> most users will understand what that summary metric means, and even if they
> did understand, it seems to have very little value.
> (Only using the *update* from each task would also mean that we wouldn't be
> using the accumulators to "accumulate" anything, it just becomes the place we
> happen to cram our per-task metrics.)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]