This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git


The following commit(s) were added to refs/heads/master by this push:
     new cdffc330088 [doc] Add blog 'principle of Doris Stream Load' (#88)
cdffc330088 is described below

commit cdffc330088a4b33f1c54c6e5f0d7282e2b24ea3
Author: TaoZex <45089228+tao...@users.noreply.github.com>
AuthorDate: Sun Sep 25 10:32:41 2022 +0800

    [doc] Add blog 'principle of Doris Stream Load' (#88)
    
    * principle-of-Doris-Stream-Load
---
 blog/principle-of-Doris-Stream-Load.md             | 155 +++++++++++++++++++++
 .../principle-of-Doris-Stream-Load.md              | 155 +++++++++++++++++++++
 .../principle-of-Doris-Stream-Load/Figure_1_cn.png | Bin 0 -> 58221 bytes
 .../principle-of-Doris-Stream-Load/Figure_1_en.png | Bin 0 -> 58221 bytes
 .../principle-of-Doris-Stream-Load/Figure_2_cn.png | Bin 0 -> 160362 bytes
 .../principle-of-Doris-Stream-Load/Figure_2_en.png | Bin 0 -> 309493 bytes
 .../principle-of-Doris-Stream-Load/Figure_3_cn.png | Bin 0 -> 16237 bytes
 .../principle-of-Doris-Stream-Load/Figure_3_en.png | Bin 0 -> 16237 bytes
 .../principle-of-Doris-Stream-Load/Figure_4_cn.png | Bin 0 -> 134398 bytes
 .../principle-of-Doris-Stream-Load/Figure_4_en.png | Bin 0 -> 134398 bytes
 .../principle-of-Doris-Stream-Load/Figure_5_cn.png | Bin 0 -> 85638 bytes
 .../principle-of-Doris-Stream-Load/Figure_5_en.png | Bin 0 -> 85638 bytes
 .../principle-of-Doris-Stream-Load/Figure_6_cn.png | Bin 0 -> 96350 bytes
 .../principle-of-Doris-Stream-Load/Figure_6_en.png | Bin 0 -> 96350 bytes
 .../principle-of-Doris-Stream-Load/Figure_7_cn.png | Bin 0 -> 109718 bytes
 .../principle-of-Doris-Stream-Load/Figure_7_en.png | Bin 0 -> 109718 bytes
 .../principle-of-Doris-Stream-Load/Figure_8_cn.png | Bin 0 -> 238659 bytes
 .../principle-of-Doris-Stream-Load/Figure_8_en.png | Bin 0 -> 238659 bytes
 18 files changed, 310 insertions(+)

diff --git a/blog/principle-of-Doris-Stream-Load.md 
b/blog/principle-of-Doris-Stream-Load.md
new file mode 100644
index 00000000000..1ccde7c0d69
--- /dev/null
+++ b/blog/principle-of-Doris-Stream-Load.md
@@ -0,0 +1,155 @@
+---
+{
+'title': 'Doris Stream Load Principle Analysis',
+'summary': "Stream Load, one of the most commonly used data import methods for 
Doris users, is a synchronous import method. It allows users to import data 
into Doris in batch through HTTP access and returns the results of data 
import.",
+'date': '2022-09-08',
+'author': 'Apache Doris',
+'tags': ['Tech Sharing'],
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+**Lead:**
+
+Stream Load, one of the most commonly used data import methods for Doris 
users, is a synchronous import method. It allows users to import data into 
Doris in batch through HTTP access and returns the results of data import. The 
user can not only directly judge whether the data import is successful through 
the return body of the HTTP request, but also query the results of historical 
tasks by executing query SQL on the client.
+
+#  **Introduction to Stream Load**
+
+The Doris import (Load) function is to import the user's original data into 
the Doris table. And Doris realizes a unified streaming import framework at the 
bottom. On this basis, Doris provides a very rich import mode to adapt to 
different data sources and data import requirements. Stream Load is one of the 
most commonly used data import methods for Doris users. It is a synchronous 
import method that allows users to import data in CSV format or JSON format 
into Doris in batch through HTT [...]
+
+# 1 Implementation Process
+
+The user submits the HTTP request of Stream Load to the FE, and the FE will 
forward the data import request to a BE node through HTTP Redirect, which will 
be the Coordinator of this Stream Load task. In this process, the FE node 
receiving the request only provides forwarding service. The BE node as the 
Coordinator is actually responsible for the entire import job, such as sending 
transaction requests to the Master FE, obtaining import execution plans from 
the FE, receiving real-time data [...]
+
+In the Coordinator BE, all HTTP requests, including Stream Load requests, are 
processed through a thread pool. A Stream Load task is uniquely identified by 
the imported Label. The principle block diagram of Stream Load is shown in 
Figure 1.
+
+<div align=center>
+<img alt=">Figure 1 The principle block diagram of Stream Load" width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_en.png"/> 
+</div>
+<p align="center">Figure 1 The principle block diagram of Stream Load</p>      
        
+
+The complete execution process of Stream Load is shown in Figure 2:
+
+(1)The user submits the HTTP request of Stream Load to the FE (the user can 
also directly submit the HTTP request of Stream Load to the Coordinator BE).
+
+(2)FE, after receiving the Stream Load request submitted by the user, will 
perform HTTP Header parsing (including the library, table, Label and other 
information imported by parsing data), and then perform user authentication. If 
the HTTP Header is successfully resolved and the user authentication passes, 
the FE will forward the HTTP request of Stream Load to a BE node, which will be 
the Coordinator of this Stream Load. Otherwise, the FE will directly return the 
failure information of St [...]
+
+(3)After receiving the HTTP request from Stream Load, the Coordinator BE will 
first perform HTTP Header parsing and data verification, including the file 
format of the parsed data, the size of the data body, the HTTP timeout, and 
user authentication. If the Header data verification fails, the Stream Load 
failure information will be directly returned to the user.
+
+(4)After the HTTP Header data verification is passed, the Coordinator BE will 
send a Begin Transaction request to the FE through Thrift RPC.
+
+(5)After the FE receives the Begin Transaction request sent by the Coordinator 
BE, it will start a transaction and return the Transaction ID to the 
Coordinator BE.
+
+(6)After the Coordinator BE receives the Begin Transaction success 
information, it will send a request to get the import plan to the FE through 
Thrift RPC.
+
+(7)After receiving the request for obtaining the import plan sent by the 
Coordinator BE, the FE will generate the import plan for the Stream Load task 
and return it to the Coordinator BE.
+
+(8)After receiving the import plan, the Coordinator BE starts to execute the 
import plan, including receiving the real-time data from HTTP and distributing 
the real-time data to other Executor BE through BRPC.
+
+(9)After receiving the real-time data distributed by the Coordinator BE, the 
Executor BE is responsible for writing the data to the storage layer.
+
+(10)After the Executor BE completes data writing, the Coordinator BE sends a 
Commit Transaction request to the FE through Thrift RPC.
+
+(11)After receiving the Commit Transaction request sent by the Coordinator BE, 
the FE will commit transaction, send the Publish Version task to the Executor 
BE, and wait for the Executor BE to execute the Publish Version.
+
+(12)The Executor BE asynchronously executes the Publish Version to change the 
Rowset generated by data import into a visible data version.
+
+(13)After the Publish Version completes normally or the execution timeout, the 
FE returns the results of the Commit Transaction and the Publish Version to the 
Coordinator BE.
+
+(14)The Coordinator BE returns the final result of Stream Load to the user.
+
+<div align=center>
+<img alt=">Figure 2 The complete execution process of Stream Load" width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_en.png"/> 
+</div>
+<p align="center">Figure 2 The complete execution process of Stream Load</p>  
+
+# 2 Transaction Management
+
+Doris ensures the atomicity of data import through Transaction. One Stream 
Load task corresponds to one transaction. The FE is responsible for the 
transaction management of Stream Load. The FE receives the Thrift RPC 
transaction request sent by the Coordinator BE node through the 
FrontendService. Transaction request types include Begin Transaction, Commit 
Transaction and Rollback Transaction. The transaction states of Doris include 
PREPARE, COMMITTED, VISIBLE, and ABORTED. The status flo [...]
+
+<div align=center>
+<img alt=">Figure 3 The status flow process of the Stream Load transaction" 
width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_en.png"/> 
+</div>
+<p align="center">Figure 3 The status flow process of the Stream Load 
transaction</p> 
+
+The Coordinator BE node will send a Begin Transaction request to the FE before 
data import. The FE will check whether the label requested by the Begin 
Transaction already exists. If the label does not exist in the system, it will 
open a new transaction for the current label, assign a Transaction ID to the 
transaction, and set the transaction status to PREPARE, then returns the 
Transaction ID and the success information of the Begin Transaction to the 
Coordinator BE. Otherwise, this trans [...]
+
+After the data is written in all Executor BE nodes, the Coordinator BE node 
will send a Commit Transaction request to the FE. After receiving the Commit 
Transaction request, the FE will execute the Commit Transaction and Publish 
Version operations. First, the FE will judge whether the number of replicas of 
data successfully written by each Tablet exceeds half of the total number of 
replicas of the tablet. If the number of replicas of data successfully written 
by each Tablet exceeds half  [...]
+
+The FE will have a separate thread to execute the Publish Version on the 
Transaction with successful Commit. When the Publish Version is executed, the 
FE will send the Publish Version request to all Executor BE nodes related to 
the Transaction through Thrift RPC. The Publish Version task is executed 
asynchronously on each Executor BE node, and the Rowset generated by data 
import is changed into a visible data version. When all the Publish Version 
tasks on the Executor BE are successfully [...]
+
+When obtaining the import plan from the FE fails, executing data import fails, 
or Commit Transaction fails, the Coordinator BE node will send a Rollback 
Transaction request to the FE to execute transaction rollback. After receiving 
the transaction rollback request, the FE will set the transaction status to 
ABORTED, and send a Clear Transaction request to the Executor BE through Thrift 
RPC. The Clear Transaction task is asynchronously executed at the BE node, 
marking the Rowset generated  [...]
+
+# 3 Execution of the import plan
+In Doris BE, all execution plans are managed by FragmentMgr, and the execution 
of each import plan is managed by PlanFragmentExecutor. After the BE obtains 
the import execution plan from the FE, it will submit the import plan to the 
thread pool of FragmentMgr for execution. The import execution plan of Stream 
Load has only one Fragment, including one BrokerScanNode and one OlapTableSink. 
BrokerScanNode is responsible for reading streaming data in real time and 
converting the data lines i [...]
+
+After importing the execution plan and submitting it to the thread pool of 
FragmentMgr, the Stream Load thread will receive the real-time data transmitted 
through HTTP in chunks and write it to the StreamLoadPipe. The BrokerScanNode 
will read the real-time data in batches from the StreamLoadPipe. OlapTableSink 
will send the batch data read by the BrokerScanNode to the Executor BE through 
BRPC for data writing. After all real-time data is written to the 
StreamLoadPipe, the Stream Load thr [...]
+
+The PlanFragmentExecutor executes a specific import plan process, which 
consists of three stages: Prepare, Open, and Close. In the Prepare stage, the 
import execution plan from the FE is mainly analyzed; In the Open stage, 
BrokerScanNode and OlapTableSink will be opened. BrokerScanNode is responsible 
for reading the real-time data of one Batch at a time, and OlapTableSink is 
responsible for calling BRPC to send the data of each Batch to other Executor 
BE nodes; In the Close stage, it is  [...]
+
+<div align=center>
+<img alt=">Figure 4 The import execution plan of Stream Load" width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_en.png"/> 
+</div>
+<p align="center">Figure 4 The import execution plan of Stream Load</p> 
+
+OlapTableSink is responsible for the data distribution of the Stream Load 
task. Tables in Doris may have Rollup or Materialized view. Each Table and its 
Rollup and Materialized view are called an Index. In the process of data 
distribution, the IndexChannel will maintain a data distribution channel of the 
Index. The Tablet under the Index may have multiple replicas and are 
distributed on different BE nodes. The NodeChannel will maintain the data 
distribution channel of an Executor BE node [...]
+
+<div align=center>
+<img alt=">Figure 5 The Data distribution channel for Stream Load task" 
width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_en.png"/> 
+</div>
+<p align="center">Figure 5 The Data distribution channel for Stream Load 
task</p> 
+
+When OlapTableSink distributes data, it will read the data Batch obtained by 
BrokerScanNode row by row, and add the data row to the IndexChannel of each 
Index. The Partition and Tablet of the data row can be determined according to 
the PartitionKey and DistributionKey, and then the corresponding Tablet of the 
data row in other Index can be calculated according to the order of the Tablet 
in the Partition. Each Tablet may have multiple replicas distributed on 
different BE nodes. Therefore, [...]
+
+<div align=center>
+<img alt=">Figure 6 The data distribution process of the Stream Load task" 
width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_en.png"/> 
+</div>
+<p align="center">Figure 6 The data distribution process of the Stream Load 
task</p> 
+
+# 4 **Data Write**
+
+After receiving the data Batch sent by the Coordinator BE, the BRPC server of 
the Executor BE will submit the data writing task to the thread pool for 
asynchronous execution. In Doris BE, data is written to the storage layer in a 
hierarchical manner. Each Stream Load task corresponds to a LoadChannel on each 
Executor BE. The LoadChannel maintains the data writing channel of a Stream 
Load task and is responsible for the data writing of a Stream Load task on the 
current Executor BE node, L [...]
+
+<div align=center>
+<img alt=">Figure 7 The data write channel of the Stream Load task" 
width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_en.png"/> 
+</div>
+<p align="center">Figure 7 The data write channel of the Stream Load task</p> 
+
+The Flush operation of MemTable is performed asynchronously by 
MemtableFlushExecutor. After the MemTable Flush task is submitted to the thread 
pool, a new MemTable will be generated to receive the subsequent data writing 
of the current Tablet. When the MemtableFlushExecutor performs data Flush, the 
RowsetWriter will read out all the data in the MemTable and write out multiple 
Segment files through the SegmentWriter. The size of each Segment file is no 
more than 256MB. For a Tablet, each  [...]
+
+<div align=center>
+<img alt=">Figure 8 The data writing process of the Stream Load task" 
width="80%" 
src="../static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_en.png"/> 
+</div>
+<p align="center">Figure 8 The data writing process of the Stream Load 
task</p> 
+
+The TxnManager on the Executor BE node is responsible for transaction 
management of Tablet level data import. When the Delta Writer is initialized, 
the PrepareTransaction will be executed to add the data write transaction of 
the corresponding Tablet in the current Stream Load task to the TxnManager for 
management. When the data write Tablet is completed and the DeltaWriter is 
closed, the Commit Transaction will be executed to add the new Rowset generated 
by the data import to the TxnMana [...]
+
+After the data import is completed, when the Executor BE executes the Publish 
Version task issued by the FE, it will execute the Publish Transaction to 
change the new Rowset generated by the data import into a visible version, and 
delete the data writing task of the corresponding Tablet in the current Stream 
Load task from the TxnManager. This means that the data writing transaction of 
the Tablet in the current Stream Load task ends.
+
+# 5 **Stream Load Operation Audit**
+
+Doris adds the operation audit function to Stream Load. After each Stream Load 
task is completed and the results are returned to the user, the Coordinator BE 
will persistently store the detailed information of this Stream Load task on 
the local RocksDB. The Master FE periodically pulls the information of the 
completed Stream Load task from each BE node of the cluster through Thrift RPC, 
pulls a batch of Stream Load operation records from one BE node at a time, and 
writes the pulled Strea [...]
+
+When the FE writes the pulled Stream Load task information into the Audit log, 
it will keep a copy in the memory. In order to prevent memory expansion, a 
fixed number of Stream Load task information will be kept in the memory. As the 
subsequent data pulling continues, the early Stream Load task information will 
be gradually eliminated from the FE memory. The user can query the latest 
Stream Load task information by executing the SHOW STREAM LOAD command at the 
client.
+
+# **Summary**
+
+In this paper, the implementation principle of Stream Load is deeply analyzed 
from the aspects of execution process, transaction management, implementation 
of import plan, data writing and operation audit of Stream Load. Stream Load is 
one of the most commonly used data import methods for Doris users. It is a 
synchronous import method that allows users to import data into Doris in batch 
through HTTP access and return the results of data import. The user can not 
only directly judge whethe [...]
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-blog/principle-of-Doris-Stream-Load.md 
b/i18n/zh-CN/docusaurus-plugin-content-blog/principle-of-Doris-Stream-Load.md
new file mode 100644
index 00000000000..b8329eda4b2
--- /dev/null
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-blog/principle-of-Doris-Stream-Load.md
@@ -0,0 +1,155 @@
+---
+{
+'title': 'Doris Stream Load原理解析',
+'summary': "Stream Load是Doris用户最常用的数据导入方式之一,它是一种同步的导入方式, 
允许用户通过Http访问的方式批量地将数据导入Doris,并返回数据导入的结果。",
+'date': '2022-09-08',
+'author': 'Apache Doris',
+'tags': ['技术解析'],
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+**导读:**
+
+Stream Load是Doris用户最常用的数据导入方式之一,它是一种同步的导入方式, 
允许用户通过Http访问的方式批量地将数据导入Doris,并返回数据导入的结果。用户可以直接通过Http请求的返回体判断数据导入是否成功,也可以通过在客户端执行查询SQL来查询历史任务的结果。
+
+#  **Stream Load简介**
+
+Doris的导入(Load)功能就是将用户的原始数据导入到 
Doris表中。Doris底层实现了统一的流式导入框架,在这个框架之上,Doris提供了非常丰富的导入方式以适应不同的数据源和数据导入需求。Stream 
Load是Doris用户最常用的数据导入方式之一,它是一种同步的导入方式, 
允许用户通过Http访问的方式批量地将CSV格式或JSON格式的数据导入Doris,并返回数据导入的结果。用户可以直接通过Http请求的返回体判断数据导入是否成功,也可以通过在客户端执行查询SQL来查询历史任务的结果。另外,Doris还为Stream
 Load提供了操作审计功能,可以通过审计日志对历史的Stream Load任务信息进行审计。本文将从Stream 
Load的执行流程、事务管理、导入计划的执行、数据写入以及操作审计等方面对Stream Load的实现原理进行深入地解析。
+
+# 1 **执行流程**
+
+用户将Stream Load的Http请求提交给FE,FE会通过 Http 
重定向(Redirect)将数据导入请求转发给某一个BE节点,该BE节点将作为本次Stream 
Load任务的Coordinator。在这个过程中,接收请求的FE节点仅仅提供转发服务,由作为 
Coordinator的BE节点实际负责整个导入作业,比如负责向Master 
FE发送事务请求、从FE获取导入执行计划、接收实时数据、分发数据到其他Executor BE节点以及数据导入结束后返回结果给用户。用户也可以将Stream 
Load的Http请求直接提交给某一个指定的BE节点,并由该节点作为本次Stream Load任务的Coordinator。在Stream 
Load过程中,Executor BE节点负责将数据写入存储层。
+
+在Coordinator BE中,通过一个线程池来处理所有的Http请求,其中包括Stream Load请求。一次Stream 
Load任务通过导入的Label唯一标识。Stream Load的原理框图如图1所示。
+
+<div align=center>
+<img alt=">图 1 Stream Load的原理框图" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_cn.png"/>
 
+</div>
+<p align="center">图 1 Stream Load的原理框图</p>              
+
+Stream Load完整执行流程如图2所示:
+
+(1)用户提交Stream Load的Http请求到FE(用户也可以直接提交Stream Load的Http请求到Coordinator BE)。
+
+(2)FE接收到用户提交的Stream 
Load请求后,会进行Http的Header解析(其中包括解析数据导入的库、表、Label等信息),然后进行用户鉴权。如果Http的Header解析成功并且用户鉴权通过,FE会将Stream
 Load的Http请求转发到一台BE节点,该BE节点将作为本次Stream Load的Coordinator;否则,FE会直接向用户返回Stream 
Load的失败信息。
+
+(3)Coordinator BE接收到Stream 
Load的Http请求后,会首先进行Http的Header解析和数据校验,其中包括解析数据的文件格式、数据body的大小、Http超时时间、进行用户鉴权等。如果Header数据校验失败,会直接向用户返回Stream
 Load的失败信息。
+
+(4)Http Header数据校验通过之后,Coordinator BE会通过Thrift RPC向FE发送Begin Transaction的请求。
+
+(5)FE收到Coordinator BE发送的Begin Transaction的请求之后,会开启一个事务,并向Coordinator 
BE返回Transaction Id。
+
+(6)Coordinator BE收到Begin Transaction成功信息之后,会通过Thrift RPC向 FE发送获取导入计划的请求。
+
+(7)FE收到Coordinator BE发送的获取导入计划的请求之后,会为Stream Load任务生成导入计划,并返回给Coordinator BE。
+
+(8)Coordinator 
BE接收到导入计划之后,开始执行导入计划,其中包括接收Http传来的实时数据以及将实时数据通过BRPC分发到其他Executor BE。
+
+(9)Executor BE接收到Coordinator BE分发的实时数据之后,负责将数据写入存储层。
+
+(10)Executor BE完成数据写入之后,Coordinator BE通过Thrift RPC 向FE发送Commit Transaction的请求。
+
+(11)FE收到Coordinator BE发送的Commit Transaction的请求之后,会对事务进行提交, 并向Executor BE发送 
Publish Version的任务,同时等待Executor BE执行Publish Version完成。
+
+(12)Executor BE异步执行Publish Version,将数据导入生成的Rowset变为可见数据版本。
+
+(13)Publish Version正常完成或执行超时之后,FE向Coordinator BE返回Commit Transaction和Publish 
Version的结果。
+
+(14)Coordinator BE向用户返回Stream Load的最终结果。
+
+<div align=center>
+<img alt=">图 2 Stream Load完整执行流程图" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_cn.png"/>
 
+</div>
+<p align="center">图 2 Stream Load完整执行流程图</p>  
+
+# 2 事务管理
+
+Doris通过事务(Transaction)来保证数据导入的原子性,一次Stream Load任务对应一个事务。Stream 
Load的事务管理由FE负责,FE通过FrontendService接收Coordinator BE节点发送来的Thrift 
RPC事务请求,事务请求类型包括Begin Transaction、Commit Transaction和Rollback 
Transaction。Doris的事务状态包括:PREPARE、COMMITTED、VISIBLE和ABORTED。Stream 
Load事务的状态流转过程如图3所示。
+
+<div align=center>
+<img alt=">图 3 Stream Load事务的状态流转图" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_cn.png"/>
 
+</div>
+<p align="center">图 3 Stream Load事务的状态流转图</p> 
+
+数据导入开始之前,Coordinator BE节点会向FE发送Begin Transaction请求,FE会检查本次Begin 
Transaction请求的label是否已经存在,如果label在系统中不存在,则会为当前label开启一个新的事务,并为事务分配Transaction 
Id,同时将事务状态设置为PREPARE,然后将Transaction Id以及Begin Transaction成功的信息返回给Coordinator 
BE;否则,本次事务可能是一次重复的数据导入,FE向Coordinator BE返回Begin Transaction失败的信息,Stream 
Load任务退出。
+
+当数据在所有Executor BE节点完成写入之后,Coordinator BE节点会向FE发送Commit 
Transaction请求,FE收到Commit Transaction请求之后会执行Commit Transaction以及Publish 
Version两个操作。首先,FE会判断每一个Tablet成功写入数据的副本数量是否超过了Tablet副本总数的一半,如果每一个Tablet成功写入数据的副本数量都超过Tablet副本总数的一半(多数成功),则Commit
 Transaction成功,并将事务状态设置为COMMITTED;否则,向Coordinator BE返回Commit 
Transaction失败的信息。COMMITTED状态表示数据已经成功写入,但是数据还不可见,需要继续执行Publish 
Version任务,此后,事务不可被回滚。
+
+FE会有一个单独的线程对Commit成功的Transaction执行Publish Version,FE执行Publish 
Version时会通过Thrift RPC向Transaction相关的所有Executor BE节点下发Publish Version请求,Publish 
Version任务在各个Executor BE节点异步执行,将数据导入生成的Rowset变为可见的数据版本。当Executor BE上所有的Publish 
Version任务执行成功,FE会将事务状态设置为VISIBLE,并向Coordinator BE返回Commit Transaction以及Publish 
Version成功的信息。如果存在某些Publish Version任务失败,FE会向Executor BE节点重复下发Publish 
Version请求直到之前失败的Publish 
Version任务成功。如果在一定超时时间之后,事务状态还没有被设置为VISIBLE,FE就会向Coordinator BE返回Commit 
Transaction成功但Publish Version超时 [...]
+
+当从FE获取导入计划失败、执行数据导入失败或Commit Transaction失败时,Coordinator BE节点会向FE发送Rollback 
Transaction请求,执行事务回滚。FE收到事务回滚的请求之后,会将事务的状态设置为ABORTED,并通过Thrift RPC向Executor 
BE发送Clear Transaction的请求,Clear 
Transaction任务在BE节点异步执行,将数据导入生成的Rowset标记为不可用,这些Rowset在之后会从BE上被删除。状态为COMMITTED的事务(Commit
 Transaction成功但Publish Version超时的事务)不能被回滚。
+
+# 3 导入计划的执行
+在Doris的BE中,所有执行计划由FragmentMgr管理,每一个导入计划的执行由PlanFragmentExecutor负责。BE从FE获取到导入执行计划之后,会将导入计划提交到FragmentMgr的线程池执行。Stream
 Load 的导入执行计划只有一个Fragment, 其中包含一个BrokerScanNode 和 一个 
OlapTableSink。BrokerScanNode负责实时读取流式数据,并将 CSV 格式或JSON格式的数据行转为 Doris 
的Tuple格式;OlapTableSink 负责将实时数据发送到对应的Executor BE节点,每个数据行对应哪个Executor 
BE节点是由数据行所在的Tablet存储在哪些BE上决定的,可以根据数据行的 
PartitionKey和DistributionKey确定该行数据所在的Partition和Tablet,每个Tablet及其副本存储在哪台BE节点上是在Table或Partition创建时就已经确定的。
+
+导入执行计划提交到FragmentMgr的线程池之后,Stream 
Load线程会按块(chunk)接收通过Http传输的实时数据并写入StreamLoadPipe中,BrokerScanNode会从StreamLoadPipe中批量读取实时数据,OlapTableSink会将BrokerScanNode读取的批量数据通过BRPC发送到Executor
 BE进行数据写入。所有实时数据都写入StreamLoadPipe之后,Stream Load线程会等待导入计划执行结束。
+
+PlanFragmentExecutor执行一个具体的导入计划过程由Prepare、Open和Close三个阶段组成。在Prepare阶段,主要对来自FE的导入执行计划进行解析;在Open阶段,会打开BrokerScanNode和OlapTableSink,BrokerScanNode负责每次读取一个Batch的实时数据,OlapTableSink负责调用BRPC将每一个Batch的数据发送到其他Executor
 BE节点;在Close阶段,负责等待数据导入结束,并关闭BrokerScanNode和OlapTableSink。Stream 
Load的导入执行计划如图4所示。
+
+<div align=center>
+<img alt=">图 4 Stream Load的导入执行计划" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_cn.png"/>
 
+</div>
+<p align="center">图 4 Stream Load的导入执行计划</p> 
+
+OlapTableSink负责Stream 
Load任务的数据分发。Doris中的Table可能会有Rollup或物化视图,每一个Table及其Rollup、物化视图都称为一个Index。数据分发过程中,IndexChannel会维护一个Index的数据分发通道,Index下的Tablet可能会有多个副本(Replica),并分布在不同的BE节点上,NodeChannel会在IndexChannel下维护一个Executor
 
BE节点的数据分发通道,因此,OlapTableSink下包含多个IndexChannel,每一个IndexChannel下包含多个NodeChannel,如图5所示。
+
+<div align=center>
+<img alt=">图 5 Stream Load任务的数据分发通道" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_cn.png"/>
 
+</div>
+<p align="center">图 5 Stream Load任务的数据分发通道</p> 
+
+OlapTableSink分发数据时,会逐行读取BrokerScanNode获取到的数据Batch,并将数据行添加到每一个Index的IndexChannel中。可以根据
 
PartitionKey和DistributionKey确定数据行所在的Partition和Tablet,进而根据Tablet在Partition中的顺序计算出数据行在其他Index中对应的Tablet。每一个Tablet可能会有多个副本,并分布在不同的BE节点上,因此,在IndexChannel中会将每一个数据行添加到其所在Tablet的每一个副本对应的NodeChannel中。每一个NodeChannel中都会有一个发送队列,当NodeChannel中新增的数据行累积到一定的大小就会作为一个数据Batch被添加到发送队列中。OlapTableSink中会有一个固定的线程依次轮训每一个IndexChannel下的每一个NodeChannel,并调用BRPC将发送队列中的一个数据Batch发送到对应的Executor
 BE上。Stream Load任务的数据分发过程如图6所示。
+
+<div align=center>
+<img alt=">图 6 Stream Load任务的数据分发过程" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_cn.png"/>
 
+</div>
+<p align="center">图 6 Stream Load任务的数据分发过程</p> 
+
+# 4 **数据写入**
+
+Executor BE的BRPC server接收到Coordinator 
BE发送来的数据Batch之后,会将数据写入任务提交到线程池来异步执行。在Doris的BE中,数据采用分层的方式写入存储层,每一个Stream 
Load任务在每个Executor BE上都对应一个LoadChannel,LoadChannel维护一次Stream 
Load任务的数据写入通道,负责一次Stream Load任务在当前Executor BE节点的数据写入,LoadChannel可以将一次Stream 
Load任务在当前BE节点的数据分批写入存储层,直到Stream Load任务完成。每一个LoadChannel由Load 
Id唯一标识,BE节点上的所有LoadChannel由LoadChannelMgr进行管理。一次Stream 
Load任务对应的Table可能会有多个Index,每一个Index对应一个TabletsChannel,由Index 
Id唯一标识,因此,每一个LoadChannel下会有多个TabletsChannel。TabletsChannel维护一个Index的数 [...]
+
+<div align=center>
+<img alt=">图 7 Stream Load任务的数据写入通道" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_cn.png"/>
 
+</div>
+<p align="center">图 7 Stream Load任务的数据写入通道</p> 
+
+MemTable的刷写操作由MemtableFlushExecutor异步执行,当MemTable的刷写任务提交到线程池之后,会生成一个新的MemTable来接收当前Tablet的后续数据写入。MemtableFlushExecutor执行数据刷写时,RowsetWriter会读出MemTable中的所有数据,并通过SegmentWriter刷写出多个Segment文件,每个Segment文件大小不超过256MB。对于一个Tablet,每次Stream
 Load任务都会生成一个新的Rowset,生成的Rowset中可以包含多个Segment文件。Stream Load任务的数据写入过程如图8所示。
+
+<div align=center>
+<img alt=">图 8 Stream Load任务的数据写入过程" width="80%" 
src="../../../static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_cn.png"/>
 
+</div>
+<p align="center">图 8 Stream Load任务的数据写入过程</p> 
+
+Executor BE节点上的TxnManager负责Tablet级别数据导入的事务管理,DeltaWriter初始化时,会执行Prepare 
Transaction将对应Tablet在本次Stream 
Load任务中的数据写入事务添加到TxnManager中进行管理;数据写入Tablet完成并关闭DeltaWriter时,会执行Commit 
Transaction将数据导入生成的新的Rowset添加到TxnManager中进行管理。注意,这里的TxnManager只是负责单个BE上的事务,而FE中的事务管理是负责整体导入事务的。
+
+数据导入结束之后,Executor BE执行FE下发的Publish  Version任务时,会执行Publish 
Transaction将数据导入生成的新的Rowset变为可见版本,并从TxnManager中将对应Tablet在本次Stream 
Load任务中的数据写入事务删除,这意味着Tablet在本次Stream Load任务中的数据写入事务结束。
+
+# 5 **Stream Load操作审计**
+
+Doris为Stream Load增加了操作审计功能,每一次Stream Load任务结束并将结果返回给用户之后,Coordinator 
BE会将本次Stream Load任务的详细信息持久化地存储在本地RocksDB上。Master FE定时地通过Thrift 
RPC从集群的各个BE节点上拉取已经结束的Stream Load任务的信息,每次从一个BE节点上拉取一个批次的Stream 
Load操作记录,并将拉取到的Stream Load任务信息写入审计日志(fe.audit.log)中。存储在BE上的每一条Stream 
Load任务信息会设有过期时间(TTL),RocksDB执行Compaction时会将过期的Stream 
Load任务信息进行删除。用户可以通过FE的审计日志对历史的Stream Load任务信息进行审计。
+
+FE将拉取的Stream Load任务信息写入Audit日志的同时,会在内存中保留一份。为防止内存膨胀,内存中会保留固定数量的Stream 
Load任务的信息,随着后续拉取数据地持续进行,会从FE内存中逐渐淘汰掉早期的Stream Load任务信息。用户可以通过客户端执行SHOW STREAM 
LOAD命令来查询最近的Stream Load任务信息。
+
+# 总结
+
+本文从Stream Load的执行流程、事务管理、导入计划的执行、数据写入以及操作审计等方面对Stream Load的实现原理进行了深入地解析。Stream 
Load是Doris用户最常用的数据导入方式之一,它是一种同步的导入方式, 
允许用户通过Http访问的方式批量地将数据导入Doris,并返回数据导入的结果。用户可以直接通过Http请求的返回体判断数据导入是否成功,也可以通过在客户端执行查询SQL来查询历史任务的结果。另外,Doris还为Stream
 Load提供了结果审计功能,可以通过审计日志对历史的Stream Load任务信息进行审计。
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_cn.png
new file mode 100644
index 00000000000..a5ba2ed8988
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_en.png
new file mode 100644
index 00000000000..a5ba2ed8988
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_1_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_cn.png
new file mode 100644
index 00000000000..2ecedde084b
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_en.png
new file mode 100644
index 00000000000..92aafd354fe
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_2_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_cn.png
new file mode 100644
index 00000000000..35b58340f9e
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_en.png
new file mode 100644
index 00000000000..35b58340f9e
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_3_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_cn.png
new file mode 100644
index 00000000000..49c56a10c3e
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_en.png
new file mode 100644
index 00000000000..49c56a10c3e
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_4_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_cn.png
new file mode 100644
index 00000000000..90d6be6a5a1
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_en.png
new file mode 100644
index 00000000000..90d6be6a5a1
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_5_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_cn.png
new file mode 100644
index 00000000000..2b48a9e49fa
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_en.png
new file mode 100644
index 00000000000..2b48a9e49fa
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_6_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_cn.png
new file mode 100644
index 00000000000..35ffac68677
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_en.png
new file mode 100644
index 00000000000..35ffac68677
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_7_en.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_cn.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_cn.png
new file mode 100644
index 00000000000..bb1d8b3bb83
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_cn.png differ
diff --git a/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_en.png 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_en.png
new file mode 100644
index 00000000000..bb1d8b3bb83
Binary files /dev/null and 
b/static/images/blogs/principle-of-Doris-Stream-Load/Figure_8_en.png differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to