This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push:
new 7034f68380 small fix for pipeline execute engine (#1598)
7034f68380 is described below
commit 7034f68380cac341673d732d1a55129d235da7cb
Author: yiguolei <[email protected]>
AuthorDate: Wed Dec 25 15:59:05 2024 +0800
small fix for pipeline execute engine (#1598)
## Versions
- [ x] dev
- [ x] 3.0
- [ x] 2.1
- [ ] 2.0
## Languages
- [ ] Chinese
- [ ] English
## Docs Checklist
- [ ] Checked by AI
- [ ] Test Cases Built
Co-authored-by: yiguolei <[email protected]>
---
.../pipeline-execution-engine.md | 21 ++++++++++---------
.../pipeline-execution-engine.md | 2 +-
.../pipeline-execution-engine.md | 11 +++++-----
.../pipeline-execution-engine.md | 11 +++++-----
.../pipeline-execution-engine.md | 23 +++++++++++----------
.../pipeline-execution-engine.md | 24 +++++++++++-----------
6 files changed, 48 insertions(+), 44 deletions(-)
diff --git
a/docs/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
b/docs/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
index eee47ea029..e5e1618f54 100644
---
a/docs/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
+++
b/docs/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the Hyper paper
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model
fully leverages the computational power of multi-core CPUs while limiting the
number of query threads in Doris, addressing the issue of thread explosion
during execution. For details on its design, implementation, and effectiveness,
refer to [DSIP-027](DSIP-027: Support Pipe [...]
+The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the
[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf) paper. The Pipeline
execution model fully leverages the computational power of multi-core CPUs
while limiting the number of query threads in Doris, addressing the issue of
thread explosion during execution. For details on its design, implementation,
and effectiveness, refer to [DSIP-027](DSIP-027: Support Pip [...]
Starting from Doris 3.0, the Pipeline execution model has completely replaced
the original Volcano model. Based on the Pipeline execution model, Doris
supports the parallel processing of Query, DDL, and DML statements.
@@ -39,7 +39,7 @@ To better understand the Pipeline execution model, it is
first necessary to intr
SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
```
-The FE will first translate it into the following logical plan, where each
node represents a PlanNode. The specific meaning of each type of node can be
found in the introduction to the physical plan.
+FE will first translate it into the following logical plan, each node
represents a PlanNode. The detail meaning of each node type can be found in the
introduction of physical plan.

@@ -49,12 +49,13 @@ After the transformation, each PlanFragment corresponds to
a portion of the Plan

-Doris's planning process is divided into three layers:
-PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
+Doris's plan is divided into three layers:
-FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+- PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
-PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
+- FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+
+- PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
## Pipeline Execution
A PlanFragment is the smallest unit of a task sent by the FE to the BE for
execution. A BE may receive multiple different PlanFragments for the same
query, and each PlanFragment is processed independently. Upon receiving a
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
@@ -63,14 +64,14 @@ A PlanFragment is the smallest unit of a task sent by the
FE to the BE for execu
### Pipeline
-A Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).
+Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).

Multiple Pipelines are actually interdependent. Take the JoinNode as an
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to
build the hash table, while Pipeline-1 reads data from the table to perform the
probe operation. These two Pipelines are connected by a dependency
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has
completed. This dependency relationship is referred to as a Dependency. Once
Pipeline-0 finishes execution, it calls the se [...]
### PipelineTask
-A Pipeline is actually a logical concept; it is not an executable entity. Once
a Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These diffe [...]
+Pipeline is actually a logical concept; it is not an executable entity. Once a
Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These differi [...]
Each PipelineTask is eventually submitted to a thread pool to be executed as
an independent task. With the Dependency trigger mechanism, this approach
allows better utilization of multi-core CPUs and achieves full parallelism.
@@ -89,7 +90,7 @@ Scanning data is a very heavy I/O operation, as it requires
reading large amount
By using parallel scanning technology, we can effectively avoid issues where
certain ScanOperators take an excessively long time due to improper bucketing
or data skew, which would otherwise slow down the entire query latency.
## Local Shuffle
-In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illus [...]
+In the Pipeline execution model, Local Shuffle acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illust [...]
We will further explain how Local Exchange can prevent data skew using
Pipeline-1 from the previous example.
@@ -103,4 +104,4 @@ Now, let's assume the current concurrency level is 3 (each
Pipeline has 3 tasks)
As can be seen from the figure on the right, the amount of data that the
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3),
thereby avoiding data skew.
-In Doris, Local Exchange is planned based on a series of rules. For example,
when a query involves time-consuming operators like Join, Aggregation, or
Window Functions, Local Exchange is used to minimize data skew as much as
possible.
\ No newline at end of file
+Local Shuffle is planned based on a series of rules. For example, when a query
involves time-consuming operators like Join, Aggregation, or Window Functions,
Local Shuffle is used to minimize data skew as much as possible.
\ No newline at end of file
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
index 32df1f590a..d3f848618f 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/query-acceleration/optimization-technology-principle/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-Doris的并行执行模型是一种Pipeline
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
+Doris的并行执行模型是一种Pipeline
执行模型,主要参考了[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf)论文中Pipeline的实现方式,Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML
语句的并行处理。
## 物理计划
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
index 0b483ed239..1bd1473456 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/query-acceleration/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-Doris的并行执行模型是一种Pipeline
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
+Doris的并行执行模型是一种Pipeline
执行模型,主要参考了[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf)论文中Pipeline的实现方式,Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML
语句的并行处理。
## 物理计划
@@ -45,12 +45,13 @@ FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就

-所以Doris的规划分为3层:
-PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
+Doris的规划分为3层:
-FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
+- PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
-PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
+-
FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
+
+- PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
## Pipeline 执行
PlanFragment 是FE 发往BE
执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment
之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
index 0b483ed239..1bd1473456 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/query-acceleration/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-Doris的并行执行模型是一种Pipeline
执行模型,主要参考了Hyper论文中Pipeline的实现方式(https://db.in.tum.de/~leis/papers/morsels.pdf),Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
+Doris的并行执行模型是一种Pipeline
执行模型,主要参考了[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf)论文中Pipeline的实现方式,Pipeline
执行模型能够充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris
的执行线程膨胀的问题。它的具体设计、实现和效果可以参阅 [DSIP-027](DSIP-027: Support Pipeline Exec Engine -
DORIS - Apache Software Foundation) 以及 [DSIP-035](DSIP-035: PipelineX Execution
Engine - DORIS - Apache Software Foundation)。
Doris 3.0 之后,Pipeline 执行模型彻底替换了原有的火山模型,基于Pipeline 执行模型,Doris 实现了 Query、DDL、DML
语句的并行处理。
## 物理计划
@@ -45,12 +45,13 @@ FE 首先会把它翻译成下面这种逻辑计划,计划中每个节点就

-所以Doris的规划分为3层:
-PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
+Doris的规划分为3层:
-FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
+- PLAN:执行计划,一个SQL会被执行规划器翻译成一个执行计划,之后执行计划会提供给执行引擎执行。
-PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
+-
FRAGMENT:由于DORIS是一个分布式执行引擎。一个完整的执行计划会被切分为多个单机的执行片段。一个FRAGMENT表是一个完整的单机执行片段。多个FRAGMENT组合在一起,构成一个完整的PLAN。
+
+- PLAN NODE:算子,是执行计划的最小单位。一个FRAGMENT由多个算子构成。每一个算子负责一个实际的执行逻辑,比如聚合,连接等
## Pipeline 执行
PlanFragment 是FE 发往BE
执行任务的最小单位。BE可能会收到同一个Query的多个不同的PlanFragment,每个PlanFragment都会被单独的处理。在收到PlanFragment
之后,BE会把PlanFragment 拆分为多个Pipeline,进而启动多个PipelineTask 来实现并行执行,提升查询效率。
diff --git
a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
index 788f1d9385..e5e1618f54 100644
--- a/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
+++ b/versioned_docs/version-2.1/query-acceleration/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the Hyper paper
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model
fully leverages the computational power of multi-core CPUs while limiting the
number of query threads in Doris, addressing the issue of thread explosion
during execution. For details on its design, implementation, and effectiveness,
refer to [DSIP-027](DSIP-027: Support Pipe [...]
+The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the
[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf) paper. The Pipeline
execution model fully leverages the computational power of multi-core CPUs
while limiting the number of query threads in Doris, addressing the issue of
thread explosion during execution. For details on its design, implementation,
and effectiveness, refer to [DSIP-027](DSIP-027: Support Pip [...]
Starting from Doris 3.0, the Pipeline execution model has completely replaced
the original Volcano model. Based on the Pipeline execution model, Doris
supports the parallel processing of Query, DDL, and DML statements.
@@ -39,7 +39,7 @@ To better understand the Pipeline execution model, it is
first necessary to intr
SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
```
-The FE will first translate it into the following logical plan, where each
node represents a PlanNode. The specific meaning of each type of node can be
found in the introduction to the physical plan.
+FE will first translate it into the following logical plan, each node
represents a PlanNode. The detail meaning of each node type can be found in the
introduction of physical plan.

@@ -49,12 +49,13 @@ After the transformation, each PlanFragment corresponds to
a portion of the Plan

-Doris's planning process is divided into three layers:
-PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
+Doris's plan is divided into three layers:
-FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+- PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
-PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
+- FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+
+- PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
## Pipeline Execution
A PlanFragment is the smallest unit of a task sent by the FE to the BE for
execution. A BE may receive multiple different PlanFragments for the same
query, and each PlanFragment is processed independently. Upon receiving a
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
@@ -63,14 +64,14 @@ A PlanFragment is the smallest unit of a task sent by the
FE to the BE for execu
### Pipeline
-A Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).
+Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).

Multiple Pipelines are actually interdependent. Take the JoinNode as an
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to
build the hash table, while Pipeline-1 reads data from the table to perform the
probe operation. These two Pipelines are connected by a dependency
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has
completed. This dependency relationship is referred to as a Dependency. Once
Pipeline-0 finishes execution, it calls the se [...]
### PipelineTask
-A Pipeline is actually a logical concept; it is not an executable entity. Once
a Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These diffe [...]
+Pipeline is actually a logical concept; it is not an executable entity. Once a
Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These differi [...]
Each PipelineTask is eventually submitted to a thread pool to be executed as
an independent task. With the Dependency trigger mechanism, this approach
allows better utilization of multi-core CPUs and achieves full parallelism.
@@ -79,7 +80,7 @@ In most cases, each operator in a Pipeline corresponds to a
PlanNode, but there
* JoinNode is split into JoinBuildOperator and JoinProbeOperator.
* AggNode is split into AggSinkOperator and AggSourceOperator.
* SortNode is split into SortSinkOperator and SortSourceOperator.
- The basic principle is that for certain "breaking" operators (those that
need to collect all the data before performing computation), the data ingestion
part is split into a Sink, while the part that retrieves data from the operator
is referred to as the Source.
+The basic principle is that for certain "breaking" operators (those that need
to collect all the data before performing computation), the data ingestion part
is split into a Sink, while the part that retrieves data from the operator is
referred to as the Source.
## Scan 并行化
Scanning data is a very heavy I/O operation, as it requires reading large
amounts of data from local disks (or from HDFS or S3 in the case of data lake
scenarios, which introduces even longer latency), consuming a significant
amount of time. Therefore, we have introduced parallel scanning technology in
the ScanOperator. The ScanOperator dynamically generates multiple Scanners,
each of which scans around 1 to 2 million rows of data. While performing the
scan, each Scanner handles tasks su [...]
@@ -89,7 +90,7 @@ Scanning data is a very heavy I/O operation, as it requires
reading large amount
By using parallel scanning technology, we can effectively avoid issues where
certain ScanOperators take an excessively long time due to improper bucketing
or data skew, which would otherwise slow down the entire query latency.
## Local Shuffle
-In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illus [...]
+In the Pipeline execution model, Local Shuffle acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illust [...]
We will further explain how Local Exchange can prevent data skew using
Pipeline-1 from the previous example.
@@ -103,4 +104,4 @@ Now, let's assume the current concurrency level is 3 (each
Pipeline has 3 tasks)
As can be seen from the figure on the right, the amount of data that the
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3),
thereby avoiding data skew.
-In Doris, Local Exchange is planned based on a series of rules. For example,
when a query involves time-consuming operators like Join, Aggregation, or
Window Functions, Local Exchange is used to minimize data skew as much as
possible.
+Local Shuffle is planned based on a series of rules. For example, when a query
involves time-consuming operators like Join, Aggregation, or Window Functions,
Local Shuffle is used to minimize data skew as much as possible.
\ No newline at end of file
diff --git
a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
index 8239554ba0..e5e1618f54 100644
--- a/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
+++ b/versioned_docs/version-3.0/query-acceleration/pipeline-execution-engine.md
@@ -28,7 +28,7 @@ under the License.
-The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the Hyper paper
(https://db.in.tum.de/~leis/papers/morsels.pdf). The Pipeline execution model
fully leverages the computational power of multi-core CPUs while limiting the
number of query threads in Doris, addressing the issue of thread explosion
during execution. For details on its design, implementation, and effectiveness,
refer to [DSIP-027](DSIP-027: Support Pipe [...]
+The parallel execution model of Doris is a Pipeline execution model, primarily
inspired by the implementation described in the
[Hyper](https://db.in.tum.de/~leis/papers/morsels.pdf) paper. The Pipeline
execution model fully leverages the computational power of multi-core CPUs
while limiting the number of query threads in Doris, addressing the issue of
thread explosion during execution. For details on its design, implementation,
and effectiveness, refer to [DSIP-027](DSIP-027: Support Pip [...]
Starting from Doris 3.0, the Pipeline execution model has completely replaced
the original Volcano model. Based on the Pipeline execution model, Doris
supports the parallel processing of Query, DDL, and DML statements.
@@ -39,7 +39,7 @@ To better understand the Pipeline execution model, it is
first necessary to intr
SELECT k1, SUM(v1) FROM A,B WHERE A.k2 = B.k2 GROUP BY k1 ORDER BY SUM(v1);
```
-The FE will first translate it into the following logical plan, where each
node represents a PlanNode. The specific meaning of each type of node can be
found in the introduction to the physical plan.
+FE will first translate it into the following logical plan, each node
represents a PlanNode. The detail meaning of each node type can be found in the
introduction of physical plan.

@@ -49,12 +49,13 @@ After the transformation, each PlanFragment corresponds to
a portion of the Plan

-Doris's planning process is divided into three layers:
-PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
+Doris's plan is divided into three layers:
-FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+- PLAN:The execution plan. A SQL statement is translated by the query planner
into an execution plan, which is then provided to the execution engine for
execution.
-PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
+- FRAGMENT:Since Doris is a distributed execution engine, a complete execution
plan is divided into multiple single-machine execution fragments. A FRAGMENT
represents a complete single-machine execution fragment. Multiple fragments
combine to form a complete PLAN.
+
+- PLAN NODE:Operators, which are the smallest units of the execution plan. A
FRAGMENT consists of multiple operators, each responsible for a specific
execution logic, such as aggregation or join operations.
## Pipeline Execution
A PlanFragment is the smallest unit of a task sent by the FE to the BE for
execution. A BE may receive multiple different PlanFragments for the same
query, and each PlanFragment is processed independently. Upon receiving a
PlanFragment, the BE splits it into multiple Pipelines and then starts multiple
PipelineTasks to achieve parallel execution, thereby improving query efficiency.
@@ -63,14 +64,14 @@ A PlanFragment is the smallest unit of a task sent by the
FE to the BE for execu
### Pipeline
-A Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).
+Pipeline consists of a SourceOperator, a SinkOperator, and several
intermediate operators. The SourceOperator represents reading data from an
external source, which can be a table (e.g., OlapTable) or a buffer (e.g.,
Exchange). The SinkOperator represents the data output, which can either be
shuffled to other nodes over the network (e.g., DataStreamSinkOperator) or
output to a hash table (e.g., aggregation operators, join build hash tables,
etc.).

Multiple Pipelines are actually interdependent. Take the JoinNode as an
example—it is split into two Pipelines. Pipeline-0 reads data from Exchange to
build the hash table, while Pipeline-1 reads data from the table to perform the
probe operation. These two Pipelines are connected by a dependency
relationship, meaning Pipeline-1 can only execute after Pipeline-0 has
completed. This dependency relationship is referred to as a Dependency. Once
Pipeline-0 finishes execution, it calls the se [...]
### PipelineTask
-A Pipeline is actually a logical concept; it is not an executable entity. Once
a Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These diffe [...]
+Pipeline is actually a logical concept; it is not an executable entity. Once a
Pipeline is defined, it needs to be further instantiated into multiple
PipelineTasks. The data that needs to be read is then distributed to different
PipelineTasks, ultimately achieving parallel processing. The operators within
the multiple PipelineTasks of the same Pipeline are identical, but they differ
in their states. For example, they might read different data or build different
hash tables. These differi [...]
Each PipelineTask is eventually submitted to a thread pool to be executed as
an independent task. With the Dependency trigger mechanism, this approach
allows better utilization of multi-core CPUs and achieves full parallelism.
@@ -79,7 +80,7 @@ In most cases, each operator in a Pipeline corresponds to a
PlanNode, but there
* JoinNode is split into JoinBuildOperator and JoinProbeOperator.
* AggNode is split into AggSinkOperator and AggSourceOperator.
* SortNode is split into SortSinkOperator and SortSourceOperator.
- The basic principle is that for certain "breaking" operators (those that
need to collect all the data before performing computation), the data ingestion
part is split into a Sink, while the part that retrieves data from the operator
is referred to as the Source.
+The basic principle is that for certain "breaking" operators (those that need
to collect all the data before performing computation), the data ingestion part
is split into a Sink, while the part that retrieves data from the operator is
referred to as the Source.
## Scan 并行化
Scanning data is a very heavy I/O operation, as it requires reading large
amounts of data from local disks (or from HDFS or S3 in the case of data lake
scenarios, which introduces even longer latency), consuming a significant
amount of time. Therefore, we have introduced parallel scanning technology in
the ScanOperator. The ScanOperator dynamically generates multiple Scanners,
each of which scans around 1 to 2 million rows of data. While performing the
scan, each Scanner handles tasks su [...]
@@ -89,7 +90,7 @@ Scanning data is a very heavy I/O operation, as it requires
reading large amount
By using parallel scanning technology, we can effectively avoid issues where
certain ScanOperators take an excessively long time due to improper bucketing
or data skew, which would otherwise slow down the entire query latency.
## Local Shuffle
-In the Pipeline execution model, Local Exchange acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illus [...]
+In the Pipeline execution model, Local Shuffle acts as a Pipeline Breaker, a
technique that redistributes data locally across different execution tasks. It
evenly distributes all the data output by the upstream Pipeline to all the
tasks in the downstream Pipeline using methods like HASH or Round Robin. This
helps solve the problem of data skew during execution, ensuring that the
execution model is no longer limited by data storage or the query plan. Let's
now provide an example to illust [...]
We will further explain how Local Exchange can prevent data skew using
Pipeline-1 from the previous example.
@@ -103,5 +104,4 @@ Now, let's assume the current concurrency level is 3 (each
Pipeline has 3 tasks)
As can be seen from the figure on the right, the amount of data that the
HashJoin and Agg operators need to process changes from (1, 1, 7) to (3, 3, 3),
thereby avoiding data skew.
-In Doris, Local Exchange is planned based on a series of rules. For example,
when a query involves time-consuming operators like Join, Aggregation, or
Window Functions, Local Exchange is used to minimize data skew as much as
possible.
-
+Local Shuffle is planned based on a series of rules. For example, when a query
involves time-consuming operators like Join, Aggregation, or Window Functions,
Local Shuffle is used to minimize data skew as much as possible.
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]