wanglijie95 commented on code in PR #20507: URL: https://github.com/apache/flink/pull/20507#discussion_r940972330
########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and +how to develop/improve custom sources to work with speculative execution. + +{{< hint warning >}} +Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases. +{{< /hint >}} + +{{< hint warning >}} +Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated +in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. +{{< /hint >}} + +### Enable Speculative Execution +To enable speculative execution, you need to set the following configuration options: +- `jobmanager.scheduler: AdaptiveBatch` + - Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution. Review Comment: I think we should descirbe it more clearly here, because the documentation of `Adaptive Batch Scheduler` does not describe the relationship with speculative execution. Maybe add a description like `speculative execution works along with adaptive batch scheduling ...` ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler Review Comment: Maybe `that the slow tasks locate in` -> `where the slow tasks located in` ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and +how to develop/improve custom sources to work with speculative execution. + +{{< hint warning >}} +Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases. +{{< /hint >}} + +{{< hint warning >}} +Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated +in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. +{{< /hint >}} + +### Enable Speculative Execution +To enable speculative execution, you need to set the following configuration options: +- `jobmanager.scheduler: AdaptiveBatch` + - Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution. +- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true` + +### Tuning Configuration +To make speculative execution work better for different jobs, you can tune below configuration options of the scheduler: +- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e): +Controls the maximum number of execution attempts of each operator that can execute concurrently, including the original one and speculative ones. +- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node): +Controls how long an detected slow node should be blocked for. + +You can also need to tune below configuration options to control the slow task detector: +- [`slow-task-detector.check-interval`]({{< ref "docs/deployment/config" >}}#slow-task-detector-check-interval): +The interval to check slow tasks. +- [`slow-task-detector.execution-time.baseline-lower-bound`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-lower-bound): +The lower bound of slow task detection baseline. +- [`slow-task-detector.execution-time.baseline-multiplier`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-multiplier): +The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. Review Comment: N*R should be quoted with `, otherwise the * will be recognized as italic symbol ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and +how to develop/improve custom sources to work with speculative execution. + +{{< hint warning >}} +Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases. +{{< /hint >}} + +{{< hint warning >}} +Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated +in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. +{{< /hint >}} + +### Enable Speculative Execution +To enable speculative execution, you need to set the following configuration options: +- `jobmanager.scheduler: AdaptiveBatch` + - Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution. +- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true` + +### Tuning Configuration +To make speculative execution work better for different jobs, you can tune below configuration options of the scheduler: +- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e): +Controls the maximum number of execution attempts of each operator that can execute concurrently, including the original one and speculative ones. +- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node): +Controls how long an detected slow node should be blocked for. + +You can also need to tune below configuration options to control the slow task detector: +- [`slow-task-detector.check-interval`]({{< ref "docs/deployment/config" >}}#slow-task-detector-check-interval): +The interval to check slow tasks. +- [`slow-task-detector.execution-time.baseline-lower-bound`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-lower-bound): +The lower bound of slow task detection baseline. +- [`slow-task-detector.execution-time.baseline-multiplier`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-multiplier): +The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. +- [`slow-task-detector.execution-time.baseline-ratio`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-ratio): +The finished execution ratio threshold to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. + +### Enable Sources for Speculative Execution +If your job uses a custom {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="Source" >}}, +and the source uses custom {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java" name="SourceEvent" >}}, +you need to change the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java" name="SplitEnumerator" >}} +of that source to implement {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java" name="SupportsHandleExecutionAttemptSourceEvent" >}} +interface. +```java +public interface SupportsHandleExecutionAttemptSourceEvent { + void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent); +} +``` +This means the SplitEnumerator should be aware of the attempt which sends the event. Otherwise, exceptions +will happen when the job manager receives a source event from the tasks and lead to job failures. + +No extra change is required for other sources to work with speculative execution. All the source connectors Review Comment: Maybe describe the `other sources` more clearly, the `other sources` includes SourceFunction, InputFormat sources and the new sources. ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data Review Comment: `problematic/slow` -> `problematic`. I think `problematic` is enough, which is same as the above description. ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and Review Comment: `tuning` -> `tune` ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and +how to develop/improve custom sources to work with speculative execution. + +{{< hint warning >}} +Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases. +{{< /hint >}} + +{{< hint warning >}} +Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated +in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. +{{< /hint >}} + +### Enable Speculative Execution +To enable speculative execution, you need to set the following configuration options: +- `jobmanager.scheduler: AdaptiveBatch` + - Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution. +- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true` + +### Tuning Configuration +To make speculative execution work better for different jobs, you can tune below configuration options of the scheduler: +- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e): +Controls the maximum number of execution attempts of each operator that can execute concurrently, including the original one and speculative ones. +- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node): +Controls how long an detected slow node should be blocked for. + +You can also need to tune below configuration options to control the slow task detector: Review Comment: Maybe `You can also need` -> `You may also need` ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler Review Comment: `blocklist handler` -> `blocklist mechanism`, and I'm not sure if we should add a FLIP-224 link here. ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time Review Comment: `affects` -> `affect` ########## docs/content/docs/deployment/speculative_execution.md: ########## @@ -0,0 +1,100 @@ +--- +title: Speculative Execution +weight: 5 +type: docs + +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Speculative Execution +Apache Flink supports speculative execution of batch jobs. + +This page describes the background of speculative execution and how to use it. + +## Background +Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes. +A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may +make the hosted tasks run much slower than tasks on other nodes, and affects the overall execution time +of a batch job. + +In such cases, speculative execution will start a new attempt of the slow task on nodes that are not +detected as problematic/slow. The new attempt processes the same input data and produces the same data +as the old one. The old attempt will not be affected and will keep running. The first finished attempt +will be admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts +will be canceled. + +To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks +locate in will be identified as problematic nodes and get blocked via the blocklist handler. The scheduler +will create new attempts for the slow tasks and deploy them on nodes that are not blocked. + +## Usage +This section describes how to use speculative execution, including how to enable it, how to tuning it, and +how to develop/improve custom sources to work with speculative execution. + +{{< hint warning >}} +Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases. +{{< /hint >}} + +{{< hint warning >}} +Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated +in near future. DataStream API is now the recommended low level API to develop Flink batch jobs. +{{< /hint >}} + +### Enable Speculative Execution +To enable speculative execution, you need to set the following configuration options: +- `jobmanager.scheduler: AdaptiveBatch` + - Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution. +- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true` + +### Tuning Configuration +To make speculative execution work better for different jobs, you can tune below configuration options of the scheduler: +- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e): +Controls the maximum number of execution attempts of each operator that can execute concurrently, including the original one and speculative ones. +- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node): +Controls how long an detected slow node should be blocked for. + +You can also need to tune below configuration options to control the slow task detector: +- [`slow-task-detector.check-interval`]({{< ref "docs/deployment/config" >}}#slow-task-detector-check-interval): +The interval to check slow tasks. +- [`slow-task-detector.execution-time.baseline-lower-bound`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-lower-bound): +The lower bound of slow task detection baseline. +- [`slow-task-detector.execution-time.baseline-multiplier`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-multiplier): +The multiplier to calculate the slow tasks detection baseline. Given that the parallelism is N and the ratio is R, define T as the median of the first N*R finished tasks' execution time. The baseline will be T*M, where M is the multiplier of the baseline. Review Comment: Same as T*M -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org