Hi Chris, Thanks for sending over the example. As far as I can understand, it seems that this would not have been a problem if "spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher threshold, as mentioned in my previous email.
Though, with 8/7 executors and 2 failedTasksPerExecutor, if the application runs out of executors, that would imply at least 14 task failures in a short period of time. So, I am not sure if the application should still continue to run or fail. If this was not a transient issue, maybe failing was the correct outcome, as it saves lot of unnecessary computation and also alerts admins to look for transient/permanent hardware failures. Please let me know if you think, we should enable blacklisting feature by default with the higher threshold. Thanks, Ankur On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <chris.stev...@databricks.com> wrote: > Hey All, > > My initial reply got lost, because I wasn't on the dev list. Hopefully > this goes through. > > Back story for my experiments: customer was hitting network errors due to > cloud infrastructure problems. Basically, executor X couldn't fetch from Y. > The NIC backing the VM for executor Y was swallowing packets. I wanted to > blacklist node Y. > > What I learned: > > 1. `spark.blacklist.application.fetchFailure.enabled` requires > `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't > created > <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948> > without > the latter). This was a problem because the defaults for > `spark.blacklist.[task|stage|application].*` are aggressive and don't even > apply to fetch failures. Those are always treated as non-transient. It > would be nice to have fetch blacklisting without regular blacklisting. > > 2. Due to the conf coupling in #1 and transient cloud storage errors in > the job (FileScanRDD was failing due to corrupted files), I had to set the > `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000). > Without these high settings, the customer was running out of nodes on the > cluster (as we don't have blacklisting enabled by default, we haven't > hooked it up to any sort of dynamic cloud VM re-provisioning - something > like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure > hit over multiple stages, so even though executors were aggressively > removed within one > stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was > reached. The stages were succeeding because the FileScanRDD attempts on > other executors succeeded. As such, the 8 node cluster ran out of executors > after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`. > If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode` > would have kicked in and the job might have failed after 4-6 stages, > depending on how it played out. (FWIW, this was running one executor per > node). > > -Chris > > On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <ankur.gu...@cloudera.com> > wrote: > >> Thanks Reynold! That is certainly useful to know. >> >> @Chris Will it be possible for you to send out those details if you still >> have them or better create a JIRA, so someone can work on those >> improvements. If there is already a JIRA, can you please provide a link to >> the same. >> >> Additionally, if the concern is with the aggressiveness of the >> blacklisting, then we can enable blacklisting feature by default with >> higher thresholds for failures. Below is an alternate set of defaults that >> were also proposed in the design document for max cluster utilization: >> >> 1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2 >> 2. spark.blacklist.task.maxTaskAttemptsPerNode = 2 >> 3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5 >> 4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4 >> 5. spark.blacklist.application.maxFailedTasksPerExecutor = 5 >> 6. spark.blacklist.application.maxFailedExecutorsPerNode = 4 >> 7. spark.blacklist.timeout = 5 mins >> >> >> >> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <r...@databricks.com> wrote: >> >>> We tried enabling blacklisting for some customers and in the cloud, very >>> quickly they end up having 0 executors due to various transient errors. So >>> unfortunately I think the current implementation is terrible for cloud >>> deployments, and shouldn't be on by default. The heart of the issue is that >>> the current implementation is not great at dealing with transient errors vs >>> catastrophic errors. >>> >>> +Chris who was involved with those tests. >>> >>> >>> >>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta < >>> ankur.gu...@cloudera.com.invalid> wrote: >>> >>>> Hi all, >>>> >>>> This is a follow-on to my PR: >>>> https://github.com/apache/spark/pull/24208, where I aimed to enable >>>> blacklisting for fetch failure by default. From the comments, there is >>>> interest in the community to enable overall blacklisting feature by >>>> default. I have listed down 3 different things that we can do and would >>>> like to gather feedback and see if anyone has objections with regards to >>>> this. Otherwise, I will just create a PR for the same. >>>> >>>> 1. *Enable blacklisting feature by default*. The blacklisting feature >>>> was added as part of SPARK-8425 and is available since 2.2.0. This feature >>>> was deemed experimental and was disabled by default. The feature blacklists >>>> an executor/node from running a particular task, any task in a particular >>>> stage or all tasks in application based on number of failures. There are >>>> various configurations available which control those thresholds. >>>> Additionally, the executor/node is only blacklisted for a configurable time >>>> period. The idea is to enable blacklisting feature with existing defaults, >>>> which are following: >>>> >>>> 1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1 >>>> 2. spark.blacklist.task.maxTaskAttemptsPerNode = 2 >>>> 3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2 >>>> 4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2 >>>> 5. spark.blacklist.application.maxFailedTasksPerExecutor = 2 >>>> 6. spark.blacklist.application.maxFailedExecutorsPerNode = 2 >>>> 7. spark.blacklist.timeout = 1 hour >>>> >>>> 2. *Kill blacklisted executors/nodes by default*. This feature was >>>> added as part of SPARK-16554 and is available since 2.2.0. This is a >>>> follow-on feature to blacklisting, such that if an executor/node is >>>> blacklisted for the application, then it also terminates all running tasks >>>> on that executor for faster failure recovery. >>>> >>>> 3. *Remove legacy blacklisting timeout config* >>>> : spark.scheduler.executorTaskBlacklistTime >>>> >>>> Thanks, >>>> Ankur >>>> >>> >>>