Hi Chris,

Thanks for sending over the example. As far as I can understand, it seems
that this would not have been a problem if
"spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
threshold, as mentioned in my previous email.

Though, with 8/7 executors and 2 failedTasksPerExecutor, if the application
runs out of executors, that would imply at least 14 task failures in a
short period of time. So, I am not sure if the application should still
continue to run or fail. If this was not a transient issue, maybe failing
was the correct outcome, as it saves lot of unnecessary computation and
also alerts admins to look for transient/permanent hardware failures.

Please let me know if you think, we should enable blacklisting feature by
default with the higher threshold.

Thanks,
Ankur

On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <chris.stev...@databricks.com>
wrote:

> Hey All,
>
> My initial reply got lost, because I wasn't on the dev list. Hopefully
> this goes through.
>
> Back story for my experiments: customer was hitting network errors due to
> cloud infrastructure problems. Basically, executor X couldn't fetch from Y.
> The NIC backing the VM for executor Y was swallowing packets. I wanted to
> blacklist node Y.
>
> What I learned:
>
> 1. `spark.blacklist.application.fetchFailure.enabled` requires
> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
> created
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948>
>  without
> the latter). This was a problem because the defaults for
> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
> apply to fetch failures. Those are always treated as non-transient. It
> would be nice to have fetch blacklisting without regular blacklisting.
>
> 2. Due to the conf coupling in #1 and transient cloud storage errors in
> the job (FileScanRDD was failing due to corrupted files), I had to set the
> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
> Without these high settings, the customer was running out of nodes on the
> cluster (as we don't have blacklisting enabled by default, we haven't
> hooked it up to any sort of dynamic cloud VM re-provisioning - something
> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
> hit over multiple stages, so even though executors were aggressively
> removed within one
> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
> reached. The stages were succeeding because the FileScanRDD attempts on
> other executors succeeded. As such, the 8 node cluster ran out of executors
> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
> would have kicked in and the job might have failed after 4-6 stages,
> depending on how it played out. (FWIW, this was running one executor per
> node).
>
> -Chris
>
> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <ankur.gu...@cloudera.com>
> wrote:
>
>> Thanks Reynold! That is certainly useful to know.
>>
>> @Chris Will it be possible for you to send out those details if you still
>> have them or better create a JIRA, so someone can work on those
>> improvements. If there is already a JIRA, can you please provide a link to
>> the same.
>>
>> Additionally, if the concern is with the aggressiveness of the
>> blacklisting, then we can enable blacklisting feature by default with
>> higher thresholds for failures. Below is an alternate set of defaults that
>> were also proposed in the design document for max cluster utilization:
>>
>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>    7. spark.blacklist.timeout = 5 mins
>>
>>
>>
>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <r...@databricks.com> wrote:
>>
>>> We tried enabling blacklisting for some customers and in the cloud, very
>>> quickly they end up having 0 executors due to various transient errors. So
>>> unfortunately I think the current implementation is terrible for cloud
>>> deployments, and shouldn't be on by default. The heart of the issue is that
>>> the current implementation is not great at dealing with transient errors vs
>>> catastrophic errors.
>>>
>>> +Chris who was involved with those tests.
>>>
>>>
>>>
>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
>>> ankur.gu...@cloudera.com.invalid> wrote:
>>>
>>>> Hi all,
>>>>
>>>> This is a follow-on to my PR:
>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable
>>>> blacklisting for fetch failure by default. From the comments, there is
>>>> interest in the community to enable overall blacklisting feature by
>>>> default. I have listed down 3 different things that we can do and would
>>>> like to gather feedback and see if anyone has objections with regards to
>>>> this. Otherwise, I will just create a PR for the same.
>>>>
>>>> 1. *Enable blacklisting feature by default*. The blacklisting feature
>>>> was added as part of SPARK-8425 and is available since 2.2.0. This feature
>>>> was deemed experimental and was disabled by default. The feature blacklists
>>>> an executor/node from running a particular task, any task in a particular
>>>> stage or all tasks in application based on number of failures. There are
>>>> various configurations available which control those thresholds.
>>>> Additionally, the executor/node is only blacklisted for a configurable time
>>>> period. The idea is to enable blacklisting feature with existing defaults,
>>>> which are following:
>>>>
>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>>>    7. spark.blacklist.timeout = 1 hour
>>>>
>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was
>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a
>>>> follow-on feature to blacklisting, such that if an executor/node is
>>>> blacklisted for the application, then it also terminates all running tasks
>>>> on that executor for faster failure recovery.
>>>>
>>>> 3. *Remove legacy blacklisting timeout config*
>>>> : spark.scheduler.executorTaskBlacklistTime
>>>>
>>>> Thanks,
>>>> Ankur
>>>>
>>>
>>>

Reply via email to