zhuzhurk commented on code in PR #26168:
URL: https://github.com/apache/flink/pull/26168#discussion_r1962796362


##########
docs/content.zh/docs/deployment/adaptive_batch.md:
##########
@@ -0,0 +1,172 @@
+---
+title: 自适应批处理
+weight: 5
+type: docs
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 自适应批处理
+本文档描述了自适应批处理的背景,使用方法,以及限制条件。
+
+## 背景
+
+传统的 Flink 批作业执行流程,作业的执行计划是在提交之前确定的。如果想要对执行计划进行调优,就需要用户和 Flink 
静态执行计划优化器在运行作业前,了解作业逻辑并能够准确判断作业运行过程,包括各个节点所处理的数据特性和连接边的数据分发方式。
+
+然而在现实情况中,这些数据特性在作业运行前是无法被预判的。虽然在输入数据有丰富的统计信息的前提下,用户和 Flink 
静态执行计划优化器可以将这些统计信息,与执行计划中的各个算子特性结合起来,进行一些适度的推理优化。
+然而实际生产中,输入数据的统计信息往往不全或者不准确,而且 Flink 
作业的中间节点的输入难以估算。依据静态的信息来优化的作业执行计划并不能很好的在这些场景下进行作业执行计划的优化。
+
+为了解决这个问题,Flink 引入了 **AdaptiveBatchScheduler** 调度器,该调度器是一种可以**自动调整执行计划**的批作业调度器。
+它会随着作业运行逐步确定作业执行计划,并根据确定下来的执行计划来增量式生成 JobGraph。未确定下来的执行计划将允许 Flink 
根据具体的优化策略和中间运行结果的特点,来进行运行时的执行计划动态调整。
+目前,该调度器支持的优化策略有:
+- [自动推导算子并行度](#自动推导并发度)
+- [自动均衡数据分发](#自动均衡数据分发)
+- [自适应 Broadcast Join](#自适应-broadcast-join)
+- [自适应 Skewed Join Optimization](#自适应-skewed-join-优化)
+
+## 自动推导并发度
+
+Adaptive Batch Scheduler 
支持自动推导算子并行度,如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度。这可以带来诸多好处:
+- 批作业用户可以从并行度调优中解脱出来
+- 根据数据量自动推导并行度可以更好地适应每天变化的数据量
+- SQL作业中的算子也可以分配不同的并行度
+
+### 用法
+
+使用 Adaptive Batch Scheduler 自动推导算子的并行度,需要:
+- 启用自动并行度推导:
+
+  Adaptive Batch Scheduler 默认启用了自动并行度推导,你可以通过配置 
[`execution.batch.adaptive.auto-parallelism.enabled`]({{< ref 
"docs/deployment/config" >}}#execution-batch-adaptive-auto-parallelism-enabled) 
来开关此功能。
+  除此之外,你也可以根据作业的情况调整以下配置:
+    - [`execution.batch.adaptive.auto-parallelism.min-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-min-parallelism): 允许自动设置的并行度最小值。
+    - [`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism): 
允许自动设置的并行度最大值,如果该配置项没有配置将使用通过 [`parallelism.default`]({{< ref 
"docs/deployment/config" >}}) 或者 `StreamExecutionEnvironment#setParallelism()` 
设置的默认并行度作为允许自动设置的并行度最大值。
+    - 
[`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-avg-data-volume-per-ta): 
期望每个任务平均处理的数据量大小。请注意,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
+    - 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle): source 
算子可动态推导的最大并行度,若该配置项没有配置将优先使用 
[`execution-batch-adaptive-auto-parallelism-max-parallelism`]({{< ref 
"docs/deployment/config" >}})作为允许动态推导的并行度最大值,若该配置项也没有配置,将使用 
[`parallelism.default`]({{< ref "docs/deployment/config" >}}) 或者 
`StreamExecutionEnvironment#setParallelism()` 设置的默认并行度。
+- 不要指定算子的并行度:
+
+  Adaptive Batch Scheduler 只会为用户未指定并行度的算子推导并行度。 所以如果你想算子的并行度被自动推导,需要避免通过算子的 
`setParallelism()` 方法来为其指定并行度。
+
+  除此之外,对于 DataSet 作业还需要进行以下配置:
+    - 配置 `parallelism.default: -1`
+    - 不要通过 `ExecutionEnvironment` 的 `setParallelism()` 方法来指定并行度
+
+### 让 Source 支持动态并行度推导
+如果你的作业有用到自定义 {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}},
+你需要让 Source 实现接口 {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}}。
+```java
+public interface DynamicParallelismInference {
+    int inferParallelism(Context context);
+}
+```
+其中 Context 会提供可推导并行度上界、期望每个任务平均处理的数据量大小、动态过滤信息来协助并行度推导。
+Adaptive Batch Scheduler 将会在调度 Source 节点之前调用上述接口,需注意实现中应尽量避免高耗时的操作。
+
+若 Source 
未实现上述接口,[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{<
 ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle) 将会作为 
Source 节点的并行度。
+
+需注意,Source 动态并行度推导也只会为用户未指定并行度的 Source 算子推导并行度。
+
+### 性能调优
+
+1. 建议使用 [Sort 
Shuffle](https://flink.apache.org/2021/10/26/sort-shuffle-part1.html) 并且设置 
[`taskmanager.network.memory.buffers-per-channel`]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 为 
`0`。 这会解耦并行度与需要的网络内存,对于大规模作业,这样可以降低遇到 "Insufficient number of network buffers" 
错误的可能性。
+2. 建议将 [`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism) 
设置为最坏情况下预期需要的并行度。不建议配置太大的值,否则可能会影响性能。这个配置项会影响上游任务产出的 subpartition 的数量,过多的 
subpartition 可能会影响 hash shuffle 的性能,或者由于小包影响网络传输的性能。
+
+## 自动均衡数据分发
+
+Adaptive Batch Scheduler 支持自动均衡数据分发。调度器会尝试将数据均匀分配给下游子任务,确保各个下游子任务消耗的数据量大致相同。
+该优化无需用户手动配置,点对点连接类型(如 Rescale)和全联接连接类型(如 Hash、Rebalance、Custom)均适用。
+
+### 局限性
+
+- 
目前仅支持对[自动推导算子并行度](#自动推导并发度)的节点进行自动均衡数据分发。因此,用户需要开启[自动推导算子并行度](#自动推导并发度),并避免手动设置节点的并发度,才能享受到自动均衡数据分发的优化。
+- 目前自动均衡数据分发无法完全解决单 key 数据热点问题。当单个 key 的数据远远多于其他 key 
的数据时,仍然会有热点。然而为了数据的正确性,我们并不能拆分这个 key 的数据,将其分配给不同的子任务处理。不过,在一些特定的情况下,单 key 
问题是可以被解决的,见 [自适应 Skewed Join Optimization](#自适应-skewed-join-优化)。
+
+## 自适应 Broadcast Join
+
+在分布式数据处理中,broadcast join 是一个比较常见的优化,其实现原理是在 join 
的两个表中,如果有一个超小的表(可以放到单个计算节点的内存中),那对于这个超小表可以不做 shuffle,而是直接将其全量数据 broadcast 
到每个处理大表的分布式计算节点上,直接在内存中完成 join 操作。broadcast join 优化能大量减少大表 shuffle 
和排序,非常明显的提升作业运行性能。然而静态优化方法对此优化的判断生效条件往往不准,且在生产中应用有限,原因是:
+- 源表的统计信息**完整性与准确性**在生产中往往不足 
+- 对于**非源表的输入**无法实现很好的判断,因为中间数据的大小往往需要在作业运行过程中才能准确得知。当 join 
操作离源表数据比较远时,这种预判基本是不可用的。 
+- 如果静态优化方法错误的判断了生效条件,则可能造成比较严重的后果。这是因为错误的将大表判定为小表实际不小并无法放进单节点内存,那么 broadcast 
join 算子在试图建立内存中的 hash 表时就会因为 **Out of Memory** 而导致任务失败,从而**需要任务重跑**。
+   
+因此,虽然静态 broadcast join 在正确使用时可以带来较大的性能提升,但实际上优应用有限。而自适应 Broadcast Join 则可以让 
Flink 在运行时根据实际的数据输入来自适应的将 Join 算子转为 Broadcast Join。
+
+**为保证 Join 的正确性语义**,自适应 Broadcsat Join 会根据 Join 类型来决策输入边是否能够被广播,可广播的情况如下:

Review Comment:
   Broadcsat -> Broadcast



##########
docs/content/docs/deployment/adaptive_batch.md:
##########
@@ -0,0 +1,176 @@
+---
+title: Adaptive Batch
+weight: 5
+type: docs
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Adaptive Batch Execution
+This document describes the background, usage, and limitations of adaptive 
batch execution.
+
+## Background
+
+In the traditional Flink batch job execution process, the execution plan of a 
job is determined before submission. To optimize the execution plan, users and 
Flink's static execution plan optimizer need to understand the job logic and 
accurately evaluate how the job will execute, including the data 
characteristics processed by each node and the data distribution of the 
connecting edges.
+
+However, in real-world scenarios, these data characteristics cannot be 
predicted before the job is executed. 
+Although, if there is rich statistical information about the input data, users 
and Flink's static execution plan optimizer can combine these statistics with 
the characteristics of each operator in the execution plan to conduct some 
moderate inferential optimization. But in actual production environments, the 
statistical information on input data is often incomplete or inaccurate, making 
it difficult to estimate the intermediate data in a Flink job.
+
+To address this issue, Flink introduced the **AdaptiveBatchScheduler**, a 
batch job scheduler that can **automatically adjust execution plan**. 
+This scheduler gradually determines the job execution plan as the job executes 
and incrementally generates the JobGraph based on the determined execution 
plan. For the part of undecided execution plans, Flink is allowed to 
dynamically adjust execution plans at runtime based on specific optimization 
strategies and the characteristics of intermediate data. 
+Currently, the optimization strategies supported by the scheduler include:
+- [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators)
+- [Automatic load balancing of data 
distribution](#automatic-balancing-of-data-distribution)
+- [Adaptive Broadcast Join](#adaptive-broadcast-join)
+- [Adaptive Skewed Join Optimization](#adaptive-skewed-join-optimization)
+
+## Automatically decide parallelisms for operators
+
+The Adaptive Batch Scheduler supports automatically deciding parallelisms of 
operators for batch jobs. If an operator is not set with a parallelism, 
+the scheduler will decide parallelism for it according to the size of its 
consumed datasets. This can bring many benefits:
+- Batch job users can be relieved from parallelism tuning
+- Automatically tuned parallelisms can better fit consumed datasets which have 
a varying volume size every day
+- Operators from SQL batch jobs can be assigned with different parallelisms 
which are automatically tuned
+
+### Usage
+
+To automatically decide parallelisms for operators with Adaptive Batch 
Scheduler, you need to:
+- Toggle the feature on:
+
+  Adaptive Batch Scheduler enables automatic parallelism derivation by 
default. You can configure 
[`execution.batch.adaptive.auto-parallelism.enabled`]({{< ref 
"docs/deployment/config" >}}#execution-batch-adaptive-auto-parallelism-enabled) 
to toggle this feature.
+  In addition, there are several related configuration options that may need 
adjustment when using Adaptive Batch Scheduler to automatically decide 
parallelisms for operators:
+    - [`execution.batch.adaptive.auto-parallelism.min-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-min-parallelism): The lower bound 
of allowed parallelism to set adaptively.
+    - [`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism): The upper bound 
of allowed parallelism to set adaptively. The default parallelism set via 
[`parallelism.default`]({{< ref "docs/deployment/config" >}}) or 
`StreamExecutionEnvironment#setParallelism()` will be used as upper bound of 
allowed parallelism if this configuration is not configured.
+    - 
[`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-avg-data-volume-per-ta): The 
average size of data volume to expect each task instance to process. Note that 
when data skew occurs, or the decided parallelism reaches the max parallelism 
(due to too much data), the data actually processed by some tasks may far 
exceed this value.
+    - 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle): The 
default parallelism of data source or the upper bound of source parallelism to 
set adaptively. The upper bound of allowed parallelism set via 
[`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism) will be used if 
this configuration is not configured. If the upper bound of allowed parallelism 
is also not configured, the default parallelism set via 
[`parallelism.default`]({{< ref "docs/deployment/config" >}}) or 
`StreamExecutionEnvironment#setParallelism()` will be used instead.
+
+- Avoid setting the parallelism of operators:
+
+  The Adaptive Batch Scheduler only decides the parallelism for operators 
which do not have a parallelism set. So if you want the parallelism of an 
operator to be automatically decided, you need to avoid setting the parallelism 
for the operator through the 'setParallelism()' method.
+
+#### Enable dynamic parallelism inference support for Sources
+New {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}}
+can implement the interface {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}} to enable dynamic parallelism inference.
+```java
+public interface DynamicParallelismInference {
+    int inferParallelism(Context context);
+}
+```
+The Context will provide the upper bound for the inferred parallelism, the 
expected average data size to be processed by each task, and dynamic filtering 
information to assist with parallelism inference.
+
+The Adaptive Batch Scheduler will invoke the interface before scheduling the 
source vertices, and it should be noted that implementations should avoid 
time-consuming operations as much as possible.
+
+If the Source does not implement the interface, the configuration setting 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle) will be 
used as the parallelism of the source vertices.
+
+Note that the dynamic source parallelism inference only decides the 
parallelism for source operators which do not already have a specified 
parallelism.
+
+#### Performance tuning
+
+1. It's recommended to use [Sort 
Shuffle](https://flink.apache.org/2021/10/26/sort-shuffle-part1.html) and set 
[`taskmanager.network.memory.buffers-per-channel`]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to 
`0`. This can decouple the required network memory from parallelism, so that 
for large scale jobs, the "Insufficient number of network buffers" errors are 
less likely to happen.
+2. It's recommended to set 
[`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism) to the 
parallelism you expect to need in the worst case. Values larger than that are 
not recommended, because excessive value may affect the performance. This 
option can affect the number of subpartitions produced by upstream tasks, large 
number of subpartitions may degrade the performance of hash shuffle and the 
performance of network transmission due to small packets.
+
+## Automatic Balancing of Data Distribution
+
+The Adaptive Batch Scheduler supports automatic balancing of data 
distribution. The scheduler will attempt to evenly distribute data to 
downstream subtasks, ensuring that the amount of data consumed by each 
downstream subtask is roughly the same. 
+This optimization requires no manual configuration by the user and is 
applicable to various connect edges, including point-wise connections (e.g., 
Rescale) and all-to-all connections (e.g., Hash, Rebalance, Custom).
+
+### Limitations
+
+- Currently, automatic balancing of data distribution only supports operators 
with [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators). Therefore, users 
need to enable [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators) and avoid manually 
setting the parallelism of operators in order to benefit from the optimization 
of automatic data distribution balancing.
+- This optimization does not fully address scenarios involving single-key 
hotspots. When the data for a single key far exceeds the data for other keys, 
hotspots may still occur. However, for the correctness of the data, we cannot 
split the data for this key and assign it to different subtasks for processing. 
However, in certain specific situations, the single key issue can be addressed, 
as seen in the [Adaptive Skewed Join 
Optimization](#adaptive-skewed-join-optimization).
+
+## Adaptive Broadcast Join
+
+In distributed data processing, broadcast join is a common optimization. A 
broadcast join works on the principle that if one of the two tables is very 
small—small enough to fit in the memory of a single compute node—then its 
entire dataset can be broadcast to each distributed compute node. 
+This join operation can be performed directly in memory. The broadcast join 
optimization can significantly reduce shuffling and sorting of the large table. 
+However, static optimizers often misjudge the conditions that determine when 
this optimization is effective, which limits their application in production 
for the following reasons:
+- The **completeness and accuracy** of statistical information about the 
source tables is often inadequate in production environments.
+- It is difficult to make accurate judgments regarding inputs that are **not 
from the source tables**. This is because the size of intermediate data cannot 
be accurately determined until the job is running. When the join operation is 
far from the source table data, such early evaluation is usually not feasible.
+- If static optimization incorrectly assesses the effective conditions, it can 
lead to serious problems. For instance, if a large table is mistakenly 
classified as small and cannot fit in a single node's memory, the broadcast 
join operator may fail due to an **Out of Memory** error when trying to create 
a hash table in memory, resulting in the need to **restart the task**.
+
+Therefore, while broadcast joins can provide significant performance 
improvements when used correctly, their practical applications are limited. In 
contrast, adaptive Broadcast Join allows Flink to dynamically convert the Join 
operator to a Broadcast Join at runtime based on the actual data input. 
+
+**To ensure the correctness of the join semantics**, adaptive Broadcast Join 
will decide whether the input edge can be broadcast based on the type of join. 
The conditions under which broadcasting is possible are as follows:
+
+| **Join Type** | **Left Input** | **Right Input** |
+|:--------------|:---------------|:----------------|
+| Inner         | ✅              | ✅               |
+| FullOuter     | ❌              | ❌               |
+| Semi          | ❌              | ✅               |
+| Anti          | ❌              | ✅               |
+| LeftOuter     | ❌              | ✅               |
+| RightOuter    | ✅              | ❌               |
+
+### Usage
+
+The Adaptive Batch Scheduler defaults to **simultaneously enabling** both 
compile-time static Broadcast Join and runtime dynamic adaptive Broadcast Join. 
You can control the timing of the Broadcast Join by configuring 
[`table.optimizer.adaptive-broadcast-join.strategy`]({{< ref 
"docs/dev/table/config" >}}#table-optimizer-adaptive-broadcast-join-strategy). 
For example, you can set the value to be RUNTIME_ONLY so that the adaptive 
Broadcast Join is effective only at runtime. 
+Additionally, you can adjust the following configuration based on the job's 
specific requirements:
+- [`table.optimizer.join.broadcast-threshold`]({{< ref "docs/dev/table/config" 
>}}#table-optimizer-join-broadcast-threshold):The threshold for the amount of 
data that can be broadcast. When the memory of the TaskManager is large, this 
value can be increased appropriately; conversely, it can be decreased if the 
memory is limited.
+
+### Limitations
+
+- Adaptive Broadcast Join does not support optimization of Join operators 
contained within MultiInput operators.
+- Adaptive Broadcast Join does not support being enabled simultaneously with 
[Batch Job Recovery Progress]({{< ref 
"docs/ops/batch/recovery_from_job_master_failure" >}}). Therefore, after [Batch 
Job Recovery Progress]({{< ref 
"docs/ops/batch/recovery_from_job_master_failure" >}}) enabled, Adaptive 
Broadcast Join will not take effect.
+
+## Adaptive Skewed Join Optimization
+
+In Join queries, when certain keys appear frequently, it may lead to 
significant variations in the amount of data processed by each Join task. 
+This can lead to a significant reduction in the performance of individual 
processing tasks, thereby degrading the overall performance of the job. 
+However, because the two input sides of the Join operator are related, the 
same keyGroup needs to be processed by the same downstream sub-task. 
+Therefore, simply relying on automatic load balancing to solve data skew 
issues in Join operations is not enough. 
+And Adaptive Skewed Join optimization allows the Join operator to 
**dynamically split skewed and splittable data partitions** based on runtime 
input statistics, thereby alleviating the tail latency problem caused by skewed 
data. 
+
+**To ensure the correctness of the Join semantics**, the Adaptive Skewed Join 
optimization decides whether the input edges can be dynamically split based on 
the Join type. 
+The scenarios where splitting is possible are as follows:
+
+| **Join Type** | **Left Input** | **Right Input** |
+|:--------------|:---------------|:----------------|
+| Inner         | ✅              | ✅               |
+| FullOuter     | ✅              | ❌               |

Review Comment:
   IIUC, It should also be ❌  for the Left Input side of a FullOuterJoin.



##########
docs/content.zh/docs/deployment/adaptive_batch.md:
##########
@@ -0,0 +1,172 @@
+---
+title: 自适应批处理
+weight: 5
+type: docs
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 自适应批处理
+本文档描述了自适应批处理的背景,使用方法,以及限制条件。
+
+## 背景
+
+传统的 Flink 批作业执行流程,作业的执行计划是在提交之前确定的。如果想要对执行计划进行调优,就需要用户和 Flink 
静态执行计划优化器在运行作业前,了解作业逻辑并能够准确判断作业运行过程,包括各个节点所处理的数据特性和连接边的数据分发方式。
+
+然而在现实情况中,这些数据特性在作业运行前是无法被预判的。虽然在输入数据有丰富的统计信息的前提下,用户和 Flink 
静态执行计划优化器可以将这些统计信息,与执行计划中的各个算子特性结合起来,进行一些适度的推理优化。
+然而实际生产中,输入数据的统计信息往往不全或者不准确,而且 Flink 
作业的中间节点的输入难以估算。依据静态的信息来优化的作业执行计划并不能很好的在这些场景下进行作业执行计划的优化。
+
+为了解决这个问题,Flink 引入了 **AdaptiveBatchScheduler** 调度器,该调度器是一种可以**自动调整执行计划**的批作业调度器。
+它会随着作业运行逐步确定作业执行计划,并根据确定下来的执行计划来增量式生成 JobGraph。未确定下来的执行计划将允许 Flink 
根据具体的优化策略和中间运行结果的特点,来进行运行时的执行计划动态调整。
+目前,该调度器支持的优化策略有:
+- [自动推导算子并行度](#自动推导并发度)
+- [自动均衡数据分发](#自动均衡数据分发)
+- [自适应 Broadcast Join](#自适应-broadcast-join)
+- [自适应 Skewed Join Optimization](#自适应-skewed-join-优化)
+
+## 自动推导并发度

Review Comment:
   并发度 -> 并行度 
   to be consistent
   
   This also applies to a few other statements



##########
docs/content/docs/deployment/adaptive_batch.md:
##########
@@ -0,0 +1,176 @@
+---
+title: Adaptive Batch
+weight: 5
+type: docs
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Adaptive Batch Execution
+This document describes the background, usage, and limitations of adaptive 
batch execution.
+
+## Background
+
+In the traditional Flink batch job execution process, the execution plan of a 
job is determined before submission. To optimize the execution plan, users and 
Flink's static execution plan optimizer need to understand the job logic and 
accurately evaluate how the job will execute, including the data 
characteristics processed by each node and the data distribution of the 
connecting edges.
+
+However, in real-world scenarios, these data characteristics cannot be 
predicted before the job is executed. 
+Although, if there is rich statistical information about the input data, users 
and Flink's static execution plan optimizer can combine these statistics with 
the characteristics of each operator in the execution plan to conduct some 
moderate inferential optimization. But in actual production environments, the 
statistical information on input data is often incomplete or inaccurate, making 
it difficult to estimate the intermediate data in a Flink job.
+
+To address this issue, Flink introduced the **AdaptiveBatchScheduler**, a 
batch job scheduler that can **automatically adjust execution plan**. 
+This scheduler gradually determines the job execution plan as the job executes 
and incrementally generates the JobGraph based on the determined execution 
plan. For the part of undecided execution plans, Flink is allowed to 
dynamically adjust execution plans at runtime based on specific optimization 
strategies and the characteristics of intermediate data. 

Review Comment:
   the part of -> parts of



##########
docs/content/docs/deployment/adaptive_batch.md:
##########
@@ -0,0 +1,176 @@
+---
+title: Adaptive Batch
+weight: 5
+type: docs
+
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Adaptive Batch Execution
+This document describes the background, usage, and limitations of adaptive 
batch execution.
+
+## Background
+
+In the traditional Flink batch job execution process, the execution plan of a 
job is determined before submission. To optimize the execution plan, users and 
Flink's static execution plan optimizer need to understand the job logic and 
accurately evaluate how the job will execute, including the data 
characteristics processed by each node and the data distribution of the 
connecting edges.
+
+However, in real-world scenarios, these data characteristics cannot be 
predicted before the job is executed. 
+Although, if there is rich statistical information about the input data, users 
and Flink's static execution plan optimizer can combine these statistics with 
the characteristics of each operator in the execution plan to conduct some 
moderate inferential optimization. But in actual production environments, the 
statistical information on input data is often incomplete or inaccurate, making 
it difficult to estimate the intermediate data in a Flink job.
+
+To address this issue, Flink introduced the **AdaptiveBatchScheduler**, a 
batch job scheduler that can **automatically adjust execution plan**. 
+This scheduler gradually determines the job execution plan as the job executes 
and incrementally generates the JobGraph based on the determined execution 
plan. For the part of undecided execution plans, Flink is allowed to 
dynamically adjust execution plans at runtime based on specific optimization 
strategies and the characteristics of intermediate data. 
+Currently, the optimization strategies supported by the scheduler include:
+- [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators)
+- [Automatic load balancing of data 
distribution](#automatic-balancing-of-data-distribution)
+- [Adaptive Broadcast Join](#adaptive-broadcast-join)
+- [Adaptive Skewed Join Optimization](#adaptive-skewed-join-optimization)
+
+## Automatically decide parallelisms for operators
+
+The Adaptive Batch Scheduler supports automatically deciding parallelisms of 
operators for batch jobs. If an operator is not set with a parallelism, 
+the scheduler will decide parallelism for it according to the size of its 
consumed datasets. This can bring many benefits:
+- Batch job users can be relieved from parallelism tuning
+- Automatically tuned parallelisms can better fit consumed datasets which have 
a varying volume size every day
+- Operators from SQL batch jobs can be assigned with different parallelisms 
which are automatically tuned
+
+### Usage
+
+To automatically decide parallelisms for operators with Adaptive Batch 
Scheduler, you need to:
+- Toggle the feature on:
+
+  Adaptive Batch Scheduler enables automatic parallelism derivation by 
default. You can configure 
[`execution.batch.adaptive.auto-parallelism.enabled`]({{< ref 
"docs/deployment/config" >}}#execution-batch-adaptive-auto-parallelism-enabled) 
to toggle this feature.
+  In addition, there are several related configuration options that may need 
adjustment when using Adaptive Batch Scheduler to automatically decide 
parallelisms for operators:
+    - [`execution.batch.adaptive.auto-parallelism.min-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-min-parallelism): The lower bound 
of allowed parallelism to set adaptively.
+    - [`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism): The upper bound 
of allowed parallelism to set adaptively. The default parallelism set via 
[`parallelism.default`]({{< ref "docs/deployment/config" >}}) or 
`StreamExecutionEnvironment#setParallelism()` will be used as upper bound of 
allowed parallelism if this configuration is not configured.
+    - 
[`execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-avg-data-volume-per-ta): The 
average size of data volume to expect each task instance to process. Note that 
when data skew occurs, or the decided parallelism reaches the max parallelism 
(due to too much data), the data actually processed by some tasks may far 
exceed this value.
+    - 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle): The 
default parallelism of data source or the upper bound of source parallelism to 
set adaptively. The upper bound of allowed parallelism set via 
[`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism) will be used if 
this configuration is not configured. If the upper bound of allowed parallelism 
is also not configured, the default parallelism set via 
[`parallelism.default`]({{< ref "docs/deployment/config" >}}) or 
`StreamExecutionEnvironment#setParallelism()` will be used instead.
+
+- Avoid setting the parallelism of operators:
+
+  The Adaptive Batch Scheduler only decides the parallelism for operators 
which do not have a parallelism set. So if you want the parallelism of an 
operator to be automatically decided, you need to avoid setting the parallelism 
for the operator through the 'setParallelism()' method.
+
+#### Enable dynamic parallelism inference support for Sources
+New {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}}
+can implement the interface {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}} to enable dynamic parallelism inference.
+```java
+public interface DynamicParallelismInference {
+    int inferParallelism(Context context);
+}
+```
+The Context will provide the upper bound for the inferred parallelism, the 
expected average data size to be processed by each task, and dynamic filtering 
information to assist with parallelism inference.
+
+The Adaptive Batch Scheduler will invoke the interface before scheduling the 
source vertices, and it should be noted that implementations should avoid 
time-consuming operations as much as possible.
+
+If the Source does not implement the interface, the configuration setting 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle) will be 
used as the parallelism of the source vertices.
+
+Note that the dynamic source parallelism inference only decides the 
parallelism for source operators which do not already have a specified 
parallelism.
+
+#### Performance tuning
+
+1. It's recommended to use [Sort 
Shuffle](https://flink.apache.org/2021/10/26/sort-shuffle-part1.html) and set 
[`taskmanager.network.memory.buffers-per-channel`]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to 
`0`. This can decouple the required network memory from parallelism, so that 
for large scale jobs, the "Insufficient number of network buffers" errors are 
less likely to happen.
+2. It's recommended to set 
[`execution.batch.adaptive.auto-parallelism.max-parallelism`]({{< ref 
"docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-max-parallelism) to the 
parallelism you expect to need in the worst case. Values larger than that are 
not recommended, because excessive value may affect the performance. This 
option can affect the number of subpartitions produced by upstream tasks, large 
number of subpartitions may degrade the performance of hash shuffle and the 
performance of network transmission due to small packets.
+
+## Automatic Balancing of Data Distribution
+
+The Adaptive Batch Scheduler supports automatic balancing of data 
distribution. The scheduler will attempt to evenly distribute data to 
downstream subtasks, ensuring that the amount of data consumed by each 
downstream subtask is roughly the same. 
+This optimization requires no manual configuration by the user and is 
applicable to various connect edges, including point-wise connections (e.g., 
Rescale) and all-to-all connections (e.g., Hash, Rebalance, Custom).
+
+### Limitations
+
+- Currently, automatic balancing of data distribution only supports operators 
with [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators). Therefore, users 
need to enable [Automatically decide parallelisms for 
operators](#automatically-decide-parallelisms-for-operators) and avoid manually 
setting the parallelism of operators in order to benefit from the optimization 
of automatic data distribution balancing.
+- This optimization does not fully address scenarios involving single-key 
hotspots. When the data for a single key far exceeds the data for other keys, 
hotspots may still occur. However, for the correctness of the data, we cannot 
split the data for this key and assign it to different subtasks for processing. 
However, in certain specific situations, the single key issue can be addressed, 
as seen in the [Adaptive Skewed Join 
Optimization](#adaptive-skewed-join-optimization).
+
+## Adaptive Broadcast Join
+
+In distributed data processing, broadcast join is a common optimization. A 
broadcast join works on the principle that if one of the two tables is very 
small—small enough to fit in the memory of a single compute node—then its 
entire dataset can be broadcast to each distributed compute node. 
+This join operation can be performed directly in memory. The broadcast join 
optimization can significantly reduce shuffling and sorting of the large table. 
+However, static optimizers often misjudge the conditions that determine when 
this optimization is effective, which limits their application in production 
for the following reasons:
+- The **completeness and accuracy** of statistical information about the 
source tables is often inadequate in production environments.
+- It is difficult to make accurate judgments regarding inputs that are **not 
from the source tables**. This is because the size of intermediate data cannot 
be accurately determined until the job is running. When the join operation is 
far from the source table data, such early evaluation is usually not feasible.
+- If static optimization incorrectly assesses the effective conditions, it can 
lead to serious problems. For instance, if a large table is mistakenly 
classified as small and cannot fit in a single node's memory, the broadcast 
join operator may fail due to an **Out of Memory** error when trying to create 
a hash table in memory, resulting in the need to **restart the task**.
+
+Therefore, while broadcast joins can provide significant performance 
improvements when used correctly, their practical applications are limited. In 
contrast, adaptive Broadcast Join allows Flink to dynamically convert the Join 
operator to a Broadcast Join at runtime based on the actual data input. 
+
+**To ensure the correctness of the join semantics**, adaptive Broadcast Join 
will decide whether the input edge can be broadcast based on the type of join. 
The conditions under which broadcasting is possible are as follows:
+
+| **Join Type** | **Left Input** | **Right Input** |
+|:--------------|:---------------|:----------------|
+| Inner         | ✅              | ✅               |
+| FullOuter     | ❌              | ❌               |
+| Semi          | ❌              | ✅               |
+| Anti          | ❌              | ✅               |
+| LeftOuter     | ❌              | ✅               |
+| RightOuter    | ✅              | ❌               |
+
+### Usage
+
+The Adaptive Batch Scheduler defaults to **simultaneously enabling** both 
compile-time static Broadcast Join and runtime dynamic adaptive Broadcast Join. 
You can control the timing of the Broadcast Join by configuring 
[`table.optimizer.adaptive-broadcast-join.strategy`]({{< ref 
"docs/dev/table/config" >}}#table-optimizer-adaptive-broadcast-join-strategy). 
For example, you can set the value to be RUNTIME_ONLY so that the adaptive 
Broadcast Join is effective only at runtime. 
+Additionally, you can adjust the following configuration based on the job's 
specific requirements:
+- [`table.optimizer.join.broadcast-threshold`]({{< ref "docs/dev/table/config" 
>}}#table-optimizer-join-broadcast-threshold):The threshold for the amount of 
data that can be broadcast. When the memory of the TaskManager is large, this 
value can be increased appropriately; conversely, it can be decreased if the 
memory is limited.
+
+### Limitations
+
+- Adaptive Broadcast Join does not support optimization of Join operators 
contained within MultiInput operators.
+- Adaptive Broadcast Join does not support being enabled simultaneously with 
[Batch Job Recovery Progress]({{< ref 
"docs/ops/batch/recovery_from_job_master_failure" >}}). Therefore, after [Batch 
Job Recovery Progress]({{< ref 
"docs/ops/batch/recovery_from_job_master_failure" >}}) enabled, Adaptive 
Broadcast Join will not take effect.
+
+## Adaptive Skewed Join Optimization
+
+In Join queries, when certain keys appear frequently, it may lead to 
significant variations in the amount of data processed by each Join task. 
+This can lead to a significant reduction in the performance of individual 
processing tasks, thereby degrading the overall performance of the job. 
+However, because the two input sides of the Join operator are related, the 
same keyGroup needs to be processed by the same downstream sub-task. 
+Therefore, simply relying on automatic load balancing to solve data skew 
issues in Join operations is not enough. 
+And Adaptive Skewed Join optimization allows the Join operator to 
**dynamically split skewed and splittable data partitions** based on runtime 
input statistics, thereby alleviating the tail latency problem caused by skewed 
data. 
+
+**To ensure the correctness of the Join semantics**, the Adaptive Skewed Join 
optimization decides whether the input edges can be dynamically split based on 
the Join type. 
+The scenarios where splitting is possible are as follows:
+
+| **Join Type** | **Left Input** | **Right Input** |
+|:--------------|:---------------|:----------------|
+| Inner         | ✅              | ✅               |
+| FullOuter     | ✅              | ❌               |
+| Semi          | ✅              | ❌               |
+| Anti          | ✅              | ❌               |
+| LeftOuter     | ❌              | ✅               |
+| RightOuter    | ❌              | ❌               |

Review Comment:
   It's better to place LeftOuter/RightOuter and FullOuter together.
   
   Maybe order the types as below:
   Inner
   LeftOuter
   RightOuter
   FullOuter
   Semi
   Anti
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to