Thanks for your thoughts Chris! Please find my response below: - Rather than a fixed timeout, could we do some sort of exponential backoff? Start with a 10 or 20 second blacklist and increase from there? The nodes with catastrophic errors should quickly hit long blacklist intervals. - +1 I like this idea. This will have some additional costs with respect to tracking interval for each executor/node but it will certainly be very useful.
- Correct me if I'm wrong, but once a task fails on an executor, even if maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask" only adds to the executor failures. If the tasks recovers on the second attempt on the same executor, there is no way to remove the failure. I'd argue that if the task succeeds on a second attempt on the same executor, then it is definitely transient and the first attempt's failure should not count towards the executor's total stage/application failure count. - I am not sure about this. I think the purpose of blacklisting is to find nodes with transient failures as well and blacklist them for a short period of time to avoid re-computation. So, it will be useful to count a failure against an executor even if it successfully recovered from that failure later on. And with the exponential backoff, blacklisting will be transient in nature so it will not be a huge penalty, if that failure was truly transient. - W.r.t turning it on by default: Do we have a sense of how many teams are using blacklisting today using the current default settings? It may be worth changing the defaults for a release or two and gather feedback to help make a call on turning it on by default. We could potentially get that feedback now: two question survey "Have you enabled blacklisting?" and "What settings did you use?" - I think this email was intended for that purpose. Additionally, from the comments on my PR: https://github.com/apache/spark/pull/24208, it seems some teams have that enabled by default already. On Mon, Apr 1, 2019 at 3:08 PM Chris Stevens <chris.stev...@databricks.com> wrote: > Hey Ankur, > > I think the significant decrease in "spark.blacklist.timeout" (1 hr down > to 5 minutes) in your updated suggestion is the key here. > > Looking at a few *successful* runs of the application I was debugging, > here are the error rates when I did *not* have blacklisting enabled: > > Run A: 8 executors with 36 total errors over the last 25 minutes of a 1 > hour and 6 minute run. > Run B: 8 executors with 50 total errors over the last 30 minutes of a 1 > hour run. > > Increasing "spark.blacklist.application.maxFailedTasksPerExecutor" to 5 > would have allowed run A (~3 failures/executor) to pass, but run B (~6 > failures/executor) would not have without the change to > "spark.blacklist.timeout". > > With such a small timeout of 5 minutes, the worst you get is executors > flipping between blacklisted and not blacklisted (e.g. fail 5 tasks quickly > due to disk failures, wait 5 minutes, fail 5 tasks quickly, wait 5 > minutes). For catastrophic errors, this is probably OK. The executor will > fail fast each time it comes back online and will effectively be > blacklisted 90+% of the time. For transient errors, the executor will come > back online and probably be fine. The only trouble you get into is if you > run out of executors for a stage due to a high amount of transient errors, > but you're right, perhaps that many transient errors is something worth > failing for. > > In the case I was debugging with fetch failures, only the 5 minute timeout > applies, but I don't think it would have mattered. Fetch task attempts were > "hanging" for 30+ minutes without failing (it took that long for the netty > channel to timeout). As such, there was no opportunity to blacklist. Even > reducing the number of fetch retry attempts didn't help, as the first > attempt occasionally stalled due to the underlying networking issues. > > A few thoughts: > - Correct me if I'm wrong, but once a task fails on an executor, even if > maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count > against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask" > only adds to the executor failures. If the tasks recovers on the second > attempt on the same executor, there is no way to remove the failure. I'd > argue that if the task succeeds on a second attempt on the same executor, > then it is definitely transient and the first attempt's failure should not > count towards the executor's total stage/application failure count. > - Rather than a fixed timeout, could we do some sort of exponential > backoff? Start with a 10 or 20 second blacklist and increase from there? > The nodes with catastrophic errors should quickly hit long blacklist > intervals. > - W.r.t turning it on by default: Do we have a sense of how many teams are > using blacklisting today using the current default settings? It may be > worth changing the defaults for a release or two and gather feedback to > help make a call on turning it on by default. We could potentially get that > feedback now: two question survey "Have you enabled blacklisting?" and > "What settings did you use?" > > -Chris > > On Mon, Apr 1, 2019 at 9:05 AM Ankur Gupta <ankur.gu...@cloudera.com> > wrote: > >> Hi Chris, >> >> Thanks for sending over the example. As far as I can understand, it seems >> that this would not have been a problem if >> "spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher >> threshold, as mentioned in my previous email. >> >> Though, with 8/7 executors and 2 failedTasksPerExecutor, if the >> application runs out of executors, that would imply at least 14 task >> failures in a short period of time. So, I am not sure if the application >> should still continue to run or fail. If this was not a transient issue, >> maybe failing was the correct outcome, as it saves lot of unnecessary >> computation and also alerts admins to look for transient/permanent hardware >> failures. >> >> Please let me know if you think, we should enable blacklisting feature by >> default with the higher threshold. >> >> Thanks, >> Ankur >> >> On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens < >> chris.stev...@databricks.com> wrote: >> >>> Hey All, >>> >>> My initial reply got lost, because I wasn't on the dev list. Hopefully >>> this goes through. >>> >>> Back story for my experiments: customer was hitting network errors due >>> to cloud infrastructure problems. Basically, executor X couldn't fetch from >>> Y. The NIC backing the VM for executor Y was swallowing packets. I wanted >>> to blacklist node Y. >>> >>> What I learned: >>> >>> 1. `spark.blacklist.application.fetchFailure.enabled` requires >>> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't >>> created >>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948> >>> without >>> the latter). This was a problem because the defaults for >>> `spark.blacklist.[task|stage|application].*` are aggressive and don't even >>> apply to fetch failures. Those are always treated as non-transient. It >>> would be nice to have fetch blacklisting without regular blacklisting. >>> >>> 2. Due to the conf coupling in #1 and transient cloud storage errors in >>> the job (FileScanRDD was failing due to corrupted files), I had to set the >>> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000). >>> Without these high settings, the customer was running out of nodes on the >>> cluster (as we don't have blacklisting enabled by default, we haven't >>> hooked it up to any sort of dynamic cloud VM re-provisioning - something >>> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure >>> hit over multiple stages, so even though executors were aggressively >>> removed within one >>> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was >>> reached. The stages were succeeding because the FileScanRDD attempts on >>> other executors succeeded. As such, the 8 node cluster ran out of executors >>> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`. >>> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode` >>> would have kicked in and the job might have failed after 4-6 stages, >>> depending on how it played out. (FWIW, this was running one executor per >>> node). >>> >>> -Chris >>> >>> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <ankur.gu...@cloudera.com> >>> wrote: >>> >>>> Thanks Reynold! That is certainly useful to know. >>>> >>>> @Chris Will it be possible for you to send out those details if you >>>> still have them or better create a JIRA, so someone can work on those >>>> improvements. If there is already a JIRA, can you please provide a link to >>>> the same. >>>> >>>> Additionally, if the concern is with the aggressiveness of the >>>> blacklisting, then we can enable blacklisting feature by default with >>>> higher thresholds for failures. Below is an alternate set of defaults that >>>> were also proposed in the design document for max cluster utilization: >>>> >>>> 1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2 >>>> 2. spark.blacklist.task.maxTaskAttemptsPerNode = 2 >>>> 3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5 >>>> 4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4 >>>> 5. spark.blacklist.application.maxFailedTasksPerExecutor = 5 >>>> 6. spark.blacklist.application.maxFailedExecutorsPerNode = 4 >>>> 7. spark.blacklist.timeout = 5 mins >>>> >>>> >>>> >>>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <r...@databricks.com> >>>> wrote: >>>> >>>>> We tried enabling blacklisting for some customers and in the cloud, >>>>> very quickly they end up having 0 executors due to various transient >>>>> errors. So unfortunately I think the current implementation is terrible >>>>> for >>>>> cloud deployments, and shouldn't be on by default. The heart of the issue >>>>> is that the current implementation is not great at dealing with transient >>>>> errors vs catastrophic errors. >>>>> >>>>> +Chris who was involved with those tests. >>>>> >>>>> >>>>> >>>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta < >>>>> ankur.gu...@cloudera.com.invalid> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> This is a follow-on to my PR: >>>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable >>>>>> blacklisting for fetch failure by default. From the comments, there is >>>>>> interest in the community to enable overall blacklisting feature by >>>>>> default. I have listed down 3 different things that we can do and would >>>>>> like to gather feedback and see if anyone has objections with regards to >>>>>> this. Otherwise, I will just create a PR for the same. >>>>>> >>>>>> 1. *Enable blacklisting feature by default*. The blacklisting >>>>>> feature was added as part of SPARK-8425 and is available since 2.2.0. >>>>>> This >>>>>> feature was deemed experimental and was disabled by default. The feature >>>>>> blacklists an executor/node from running a particular task, any task in a >>>>>> particular stage or all tasks in application based on number of failures. >>>>>> There are various configurations available which control those >>>>>> thresholds. >>>>>> Additionally, the executor/node is only blacklisted for a configurable >>>>>> time >>>>>> period. The idea is to enable blacklisting feature with existing >>>>>> defaults, >>>>>> which are following: >>>>>> >>>>>> 1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1 >>>>>> 2. spark.blacklist.task.maxTaskAttemptsPerNode = 2 >>>>>> 3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2 >>>>>> 4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2 >>>>>> 5. spark.blacklist.application.maxFailedTasksPerExecutor = 2 >>>>>> 6. spark.blacklist.application.maxFailedExecutorsPerNode = 2 >>>>>> 7. spark.blacklist.timeout = 1 hour >>>>>> >>>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was >>>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a >>>>>> follow-on feature to blacklisting, such that if an executor/node is >>>>>> blacklisted for the application, then it also terminates all running >>>>>> tasks >>>>>> on that executor for faster failure recovery. >>>>>> >>>>>> 3. *Remove legacy blacklisting timeout config* >>>>>> : spark.scheduler.executorTaskBlacklistTime >>>>>> >>>>>> Thanks, >>>>>> Ankur >>>>>> >>>>> >>>>>