This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b3a52a0  [Update] Support update syntax (#6230)
b3a52a0 is described below

commit b3a52a05d535bbf1f6d644fe46e71901d55d4482
Author: EmmyMiao87 <[email protected]>
AuthorDate: Tue Jul 27 13:38:15 2021 +0800

    [Update] Support update syntax (#6230)
    
     [Update] Support update syntax
    
        The current update syntax only supports updating the filtered data of a 
single table.
    
        Syntax:
    
         * UPDATE table_reference
         *     SET assignment_list
         *     [WHERE where_condition]
         *
         * value:
         *     {expr}
         *
         * assignment:
         *     col_name = value
         *
         * assignment_list:
         *     assignment [, assignment] ...
    
        Example
        Update unique_table
             set v1=1
             where k1=1
    
        New Frontend Config: enable_concurrent_update
        This configuration is used to control whether multi update stmt can be 
executed concurrently in one table.
        Default value is false which means A table can only have one update 
task being executed at the same time.
        If users want to update the same table concurrently,
          they need to modify the configuration value to true and restart the 
master frontend.
        Concurrent updates may cause write conflicts, the result is uncertain, 
please be careful.
    
        The main realization principle:
        1. Read the rows that meet the conditions according to the conditions 
set by where clause.
        2. Modify the result of the row according to the set clause.
        3. Write the modified row back to the table.
    
        Some restrictions on the use of update syntax.
        1. Only the unique table can be updated
        2. Only the value column of the unique table can be updated
        3. The where clause currently only supports single tables
    
        Possible risks:
        1. Since the current implementation method is a row update,
             when the same table is updated concurrently, there may be 
concurrency conflicts which may cause the incorrect result.
        2. Once the conditions of the where clause are unsatisfactory, it is 
likely to cause a full table scan and affect query performance.
           Please pay attention to whether the column in the where clause can 
match the index when using it.
    
        [Docs][Update] Add update document and sql-reference
    
        Fixed #6229
---
 docs/.vuepress/sidebar/en.js                       |   2 +
 docs/.vuepress/sidebar/zh-CN.js                    |   2 +
 docs/en/administrator-guide/update.md              | 126 ++++++++++
 .../Data Manipulation/SHOW EXPORT.md               |  26 +--
 .../sql-statements/Data Manipulation/UPDATE.md     |  75 ++++++
 docs/zh-CN/administrator-guide/update.md           | 126 ++++++++++
 .../Data Manipulation/SHOW EXPORT.md               |   2 +-
 .../sql-statements/Data Manipulation/UPDATE.md     |  75 ++++++
 fe/fe-core/src/main/cup/sql_parser.cup             |  13 +-
 .../java/org/apache/doris/analysis/Analyzer.java   |  29 ++-
 .../main/java/org/apache/doris/analysis/Expr.java  |  38 +++
 .../java/org/apache/doris/analysis/InsertStmt.java |  80 +------
 .../java/org/apache/doris/analysis/IntLiteral.java |   1 -
 .../org/apache/doris/analysis/LargeIntLiteral.java |   3 +-
 .../java/org/apache/doris/analysis/UpdateStmt.java | 216 +++++++++++++++++
 .../java/org/apache/doris/catalog/Catalog.java     |   7 +
 .../main/java/org/apache/doris/catalog/Column.java |  12 +
 .../java/org/apache/doris/catalog/OlapTable.java   |   4 +
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../main/java/org/apache/doris/load/ExportJob.java |   6 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   2 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |  22 +-
 .../apache/doris/load/update/UpdateManager.java    |  87 +++++++
 .../apache/doris/load/update/UpdatePlanner.java    | 191 +++++++++++++++
 .../doris/load/update/UpdateStmtExecutor.java      | 256 +++++++++++++++++++++
 .../java/org/apache/doris/planner/EsScanNode.java  |   2 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  20 ++
 .../org/apache/doris/planner/OlapTableSink.java    |  11 +-
 .../java/org/apache/doris/planner/PlanNode.java    |   7 +
 .../java/org/apache/doris/planner/Planner.java     |   2 +-
 .../apache/doris/planner/PredicatePushDown.java    | 122 ++++++++++
 .../java/org/apache/doris/planner/ScanNode.java    | 112 ++++++++-
 .../apache/doris/planner/SingleNodePlanner.java    |  80 +------
 .../apache/doris/planner/StreamLoadPlanner.java    |  65 +++---
 .../java/org/apache/doris/qe/ConnectContext.java   |   4 +
 .../java/org/apache/doris/qe/ConnectProcessor.java |   1 +
 .../main/java/org/apache/doris/qe/Coordinator.java |   7 +-
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../main/java/org/apache/doris/qe/QueryState.java  |   3 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   4 +
 .../apache/doris/transaction/TransactionState.java |   2 +-
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 .../org/apache/doris/analysis/InsertStmtTest.java  |  13 +-
 .../org/apache/doris/analysis/UpdateStmtTest.java  |  55 +++++
 .../apache/doris/common/ThreadPoolManagerTest.java |   2 -
 .../doris/load/update/UpdateManagerTest.java       |  67 ++++++
 .../doris/load/update/UpdateStmtExecutorTest.java  |  92 ++++++++
 .../org/apache/doris/planner/QueryPlanTest.java    |  12 +-
 .../apache/doris/planner/UpdatePlannerTest.java    | 185 +++++++++++++++
 49 files changed, 2019 insertions(+), 257 deletions(-)

diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 98da472..8f26f16 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -187,6 +187,7 @@ module.exports = [
       "sql-mode",
       "time-zone",
       "variables",
+      "update",
     ],
     sidebarDepth: 1,
   },
@@ -534,6 +535,7 @@ module.exports = [
               "STREAM LOAD",
               "alter-routine-load",
               "insert",
+              "UPDATE",
             ],
           },
           {
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index c3f7bcd..3bfdf15 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -189,6 +189,7 @@ module.exports = [
       "sql-mode",
       "time-zone",
       "variables",
+      "update",
     ],
     sidebarDepth: 1,
   },
@@ -538,6 +539,7 @@ module.exports = [
               "STREAM LOAD",
               "alter-routine-load",
               "insert",
+              "UPDATE",
             ],
           },
           {
diff --git a/docs/en/administrator-guide/update.md 
b/docs/en/administrator-guide/update.md
new file mode 100644
index 0000000..b3efc45
--- /dev/null
+++ b/docs/en/administrator-guide/update.md
@@ -0,0 +1,126 @@
+---
+{
+    "title": "update",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Update
+
+If we need to modify or update the data in Doris, we can use the UPDATE 
command.
+
+## Applicable scenarios
+
++ To modify the value of a row that meets certain conditions.
++ Point updates, small updates, where the rows to be updated are preferably a 
very small part of the entire table.
++ Only could be used in Unique table
+
+## Explanation of terms
+
+1. Unique model: A data model in the Doris system. When the user imports rows 
with the same Key, the Value of the latter overrides the existing Value, in the 
same sense as Unique in Mysql.
+
+## Fundamentals
+
+Use the query engine's own where filtering logic to filter the rows that need 
to be updated from the table to be updated. Then use the Unique model's own 
Value column replacement logic to change the rows to be updated and reinsert 
them into the table. This enables row-level updates.
+
+### Example
+
+Suppose there is an order table in Doris, where order id is the Key column, 
order status, and order amount are the Value columns. The data state is as 
follows.
+
+| order id | order amount | order status |
+|--|--|--|
+| 1 | 100| Pending Payment |
+
+At this time, after the user clicks the payment, Doris system needs to change 
the order id to '1' order status to 'pending shipment', you need to use the 
Update function.
+
+```
+UPDATE order SET order status='To be shipped' WHERE order id=1;
+```
+
+After the user executes the UPDATE command, the system performs the following 
three steps.
+
++ Step 1: Read the rows that satisfy WHERE order id=1
+        (1, 100, 'pending payment')
++ Step 2: Change the order status of the row from 'Pending Payment' to 
'Pending Shipping'
+        (1, 100, 'Pending shipment')
++ Step 3: Insert the updated row back into the table to achieve the updated 
effect.
+        | order id | order amount | order status |
+        | ---| ---| ---|
+        | 1 | 100| Pending Payment |
+        | 1 | 100 | Pending shipments |
+        Since the table order is a UNIQUE model, the rows with the same Key, 
after which the latter will take effect, so the final effect is as follows.
+        | order id | order amount | order status |
+        |--|--|--|
+        | 1 | 100 | Pending shipments |
+
+## Basic operations
+
+### UPDATE syntax
+
+```UPDATE table_name SET value=xxx WHERE condition;```
+
++ ``table_name``: the table to be updated, must be a UNIQUE model table to 
update.
+
++ value=xxx: The column to be updated, the left side of the equation must be 
the value column of the table. The right side of the equation can be a constant 
or an expression transformation of a column in a table.
+        For example, if value = 1, then the value of the column to be updated 
will be 1.
+        For example, if value = value + 1, the value of the column to be 
updated is incremented by 1.
+
++ condition: Only rows that satisfy the condition will be updated. condition 
must be an expression that results in a Boolean type.
+        For example, if k1 = 1, only rows with a k1 column value of 1 will be 
updated.
+        For example, if k1 = k2, only rows with the same value in column k1 as 
in column k2 will be updated.
+        No support for unfilled condition, i.e., no support for full table 
updates.
+
+### Synchronization
+
+The Update syntax is a synchronization syntax in Doris. If the Update 
statement succeeds, the update succeeds and the data is visible.
+
+### Performance
+
+The performance of the Update statement is closely related to the number of 
rows to be updated and the retrieval efficiency of the condition.
+
++ Number of rows to be updated: The more rows to be updated, the slower the 
Update statement will be. This is consistent with the principle of importing.
+        Doris updates are more suitable for occasional update scenarios, such 
as changing the values of individual rows.
+        Doris is not suitable for large batches of data changes. Large 
modifications can make Update statements take a long time to run.
+
++ Condition retrieval efficiency: Doris Update implements the principle of 
reading the rows that satisfy the condition first, so if the condition 
retrieval efficiency is high, the Update will be faster.
+        The condition column should ideally be hit, indexed, or bucket 
clipped. This way Doris does not need to scan the entire table and can quickly 
locate the rows that need to be updated. This improves update efficiency.
+        It is strongly discouraged to include the UNIQUE model value column in 
the condition column.
+
+### Concurrency Control
+
+By default, multiple concurrent Update operations on the same table are not 
allowed at the same time.
+
+The main reason for this is that Doris currently supports row updates, which 
means that even if the user declares ``SET v2 = 1``, virtually all other Value 
columns will be overwritten (even though the values are not changed).
+
+This presents a problem in that if two Update operations update the same row 
at the same time, the behavior may be indeterminate. That is, there may be 
dirty data.
+
+However, in practice, the concurrency limit can be turned on manually if the 
user himself can guarantee that even if concurrent updates are performed, they 
will not operate on the same row at the same time. This is done by modifying 
the FE configuration ``enable_concurrent_update``. When the configuration value 
is true, there is no limit on concurrent updates.
+
+## Risks of use
+
+Since Doris currently supports row updates and uses a two-step read-and-write 
operation, there is uncertainty about the outcome of an Update statement if it 
modifies the same row as another Import or Delete statement.
+
+Therefore, when using Doris, you must be careful to control the concurrency of 
Update statements and other DML statements on the *user side itself*.
+
+## Version
+
+Doris Version 0.15.x +
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW 
EXPORT.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW 
EXPORT.md
index 0df0431..1cda8f8 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md     
@@ -28,15 +28,15 @@ under the License.
 ## Description
 This statement is used to show the execution of the specified export task
 Grammar:
-SHOW EXPORT
-[FROM both names]
-[
-WHERE
-[ID = your_job_id]
-[STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]]
-]
-[ORDER BY ...]
-[LIMIT limit];
+        SHOW EXPORT
+        [FROM db_name]
+        [
+            WHERE
+            [ID = your_job_id]
+            [STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]]
+        ]
+        [ORDER BY ...]
+        [LIMIT limit];
 
 Explain:
 1) If db_name is not specified, use the current default DB
@@ -46,16 +46,16 @@ Explain:
 
 ## example
 1. Show all export tasks of default DB
-SHOW EXPORT;
+    SHOW EXPORT;
 
 2. Show the export tasks of the specified db, sorted in descending order by 
StartTime
-SHOW EXPORT FROM example_db ORDER BY StartTime DESC;
+    SHOW EXPORT FROM example_db ORDER BY StartTime DESC;
 
 3. Show the export task of the specified db, state is "exporting" and sorted 
in descending order by StartTime
-SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY StartTime DESC;
+    SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY StartTime 
DESC;
 
 4. Show the export task of specifying dB and job_id
-SHOW EXPORT FROM example_db WHERE ID = job_id;
+    SHOW EXPORT FROM example_db WHERE ID = job_id;
 
 ## keyword
 SHOW,EXPORT
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md 
b/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md
new file mode 100644
index 0000000..d087659
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md  
@@ -0,0 +1,75 @@
+---
+{
+    "title": "UPDATE",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# UPDATE
+## description
+### Syntax
+
+```
+UPDATE table_name 
+    SET assignment_list
+    WHERE expression
+
+value:
+    {expr | DEFAULT}
+
+assignment:
+    col_name = value
+
+assignment_list:
+    assignment [, assignment] ...
+```
+
+### Parameters
+
++ table_name: The target table of the data to be updated. Can be in the form 
of 'db_name.table_name'
++ assignment_list: The target column to be updated. Can be in the form of 
'col_name = value, col_name = value'
++ where expression: The condition to be updated is an expression that returns 
true or false
+
+### Note
+
+The current UPDATE statement only supports row updates on the Unique model, 
and there may be data conflicts caused by concurrent updates.
+Currently Doris does not deal with such problems, and users are required to 
avoid such problems from the business side.
+
+## example
+
+The `test` table is a unique model table, which contains four columns: k1, k2, 
v1, v2. Among them, k1, k2 are keys, v1, v2 are values, and the aggregation 
method is Replace.
+
+1. Update the v1 column that satisfies the conditions k1 =1 and k2 = 2 in 
the'test' table to 1
+
+```
+UPDATE test SET v1 = 1 WHERE k1=1 and k2=2;
+```
+
+2. Increment the v1 column of the column with k1=1 in the'test' table by 1
+
+```
+UPDATE test SET v1 = v1+1 WHERE k1=1;
+```
+
+## keyword
+
+    UPDATE
diff --git a/docs/zh-CN/administrator-guide/update.md 
b/docs/zh-CN/administrator-guide/update.md
new file mode 100644
index 0000000..c994c8a
--- /dev/null
+++ b/docs/zh-CN/administrator-guide/update.md
@@ -0,0 +1,126 @@
+---
+{
+    "title": "更新",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 更新 
+
+如果我们需要修改或更新 Doris 中的数据,就可以使用 UPDATE 命令来操作。
+
+## 适用场景
+
++ 对满足某些条件的行,修改他的取值。
++ 点更新,小范围更新,待更新的行最好是整个表的非常小一部分。
++ update 命令只能在 Unique 数据模型的表中操作。
+
+## 名词解释
+
+1. Unique 模型:Doris 系统中的一种数据模型。将列分为两类,Key 和 Value。当用户导入相同 Key 的行时,后者的 Value 
会覆盖已有的 Value。与 Mysql 中的 Unique 含义一致。
+
+## 基本原理
+
+利用查询引擎自身的 where 过滤逻辑,从待更新表中筛选出需要被更新的行。再利用 Unique 模型自带的 Value 
列新数据替换旧数据的逻辑,将待更新的行变更后,再重新插入到表中。从而实现行级别更新。
+
+举例说明
+
+假设 Doris 中存在一张订单表,其中 订单id 是 Key 列,订单状态,订单金额是 Value 列。数据状态如下:
+
+|订单id | 订单金额| 订单状态|
+|---|---|---|
+| 1 | 100| 待付款 |
+
+这时候,用户点击付款后,Doris 系统需要将订单id 为 '1' 的订单状态变更为 '待发货', 就需要用到 Update 功能。
+
+```
+UPDATE order SET 订单状态='待发货' WHERE 订单id=1;
+```
+
+用户执行 UPDATE 命令后,系统会进行如下三步:
+
++ 第一步:读取满足 WHERE 订单id=1 的行
+       (1,100,'待付款')
++ 第二步:变更该行的订单状态,从'待付款'改为'待发货'
+       (1,100,'待发货')
++ 第三步:将更新后的行再插入回表中,从而达到更新的效果。
+        |订单id | 订单金额| 订单状态|
+        |---|---|---|
+        | 1 | 100| 待付款 |
+        | 1 | 100 | 待发货 |
+        由于表 order 是 UNIQUE 模型,所以相同 Key 的行,之后后者才会生效,所以最终效果如下:
+        |订单id | 订单金额| 订单状态|
+        |---|---|---|
+        | 1 | 100 | 待发货 |
+
+## 基本操作
+
+### UPDATE 语法
+
+```UPDATE table_name SET value=xxx WHERE condition;```
+
++ `table_name`: 待更新的表,必须是 UNIQUE 模型的表才能进行更新。
+
++ value=xxx: 待更新的列,等式左边必须是表的 value 列。等式右边可以是常量,也可以是某个表中某列的表达式变换。
+       比如 value = 1, 则待更新的列值会变为1。
+       比如 value = value +1, 则待更新的列值会自增1。
+
++ condition:只有满足 condition 的行才会被更新。condition 必须是一个结果为 Boolean 类型的表达式。
+       比如 k1 = 1, 则只有当 k1 列值为1的行才会被更新。
+       比如 k1 = k2, 则只有 k1 列值和 k2 列一样的行才会被更新。
+       不支持不填写condition,也就是不支持全表更新。 
+
+### 同步
+
+Update 语法在 Doris 中是一个同步语法,既 Update 语句成功,更新就成功了,数据可见。
+
+### 性能
+
+Update 语句的性能和待更新的行数,以及 condition 的检索效率密切相关。
+
++ 待更新的行数:待更新的行数越多,Update 语句的速度就会越慢。这和导入的原理是一致的。
+       Doris 的更新比较合适偶发更新的场景,比如修改个别行的值。
+       Doris 并不适合大批量的修改数据。大批量修改会使得 Update 语句运行时间很久。
+
++ condition 的检索效率:Doris 的 Update 实现原理是先将满足 condition 的行读取处理,所以如果 condition 
的检索效率高,则 Update 的速度也会快。
+       condition 列最好能命中,索引或者分区分桶裁剪。这样 Doris 就不需要扫全表,可以快速定位到需要更新的行。从而提升更新效率。
+       强烈不推荐 condition 列中包含 UNIQUE 模型的 value 列。
+
+### 并发控制
+
+默认情况下,并不允许同一时间对同一张表并发进行多个 Update 操作。
+
+主要原因是,Doris 目前支持的是行更新,这意味着,即使用户声明的是 ```SET v2 = 1```,实际上,其他所有的 Value 
列也会被覆盖一遍(尽管值没有变化)。
+
+这就会存在一个问题,如果同时有两个 Update 操作对同一行进行更新,那么其行为可能是不确定的。也就是可能存在脏数据。
+
+但在实际应用中,如果用户自己可以保证即使并发更新,也不会同时对同一行进行操作的话,就可以手动打开并发限制。通过修改 FE 配置 
```enable_concurrent_update```。当配置值为 true 时,则对更新并发无限制。
+
+## 使用风险
+
+由于 Doris 目前支持的是行更新,并且采用的是读取后再写入的两步操作,则如果 Update 语句和其他导入或 Delete 
语句刚好修改的是同一行时,存在不确定的数据结果。
+
+所以用户在使用的时候,一定要注意*用户侧自己*进行 Update 语句和其他 DML 语句的并发控制。
+
+## 版本
+
+Doris Version 0.15.x +
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW 
EXPORT.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW 
EXPORT.md
index 91d9238..d4ad10b 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md  
@@ -55,7 +55,7 @@ under the License.
         SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY 
StartTime DESC;
     
     4. 展示指定db,指定job_id的导出任务
-            SHOW EXPORT FROM example_db WHERE ID = job_id;
+        SHOW EXPORT FROM example_db WHERE ID = job_id;
 
 ## keyword
     SHOW,EXPORT
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data 
Manipulation/UPDATE.md b/docs/zh-CN/sql-reference/sql-statements/Data 
Manipulation/UPDATE.md
new file mode 100644
index 0000000..861cd4b
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/UPDATE.md       
@@ -0,0 +1,75 @@
+---
+{
+    "title": "UPDATE",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# UPDATE
+## description
+### Syntax
+
+```
+UPDATE table_name 
+    SET assignment_list
+    WHERE expression
+
+value:
+    {expr | DEFAULT}
+
+assignment:
+    col_name = value
+
+assignment_list:
+    assignment [, assignment] ...
+```
+
+### Parameters
+
++ table_name: 待更新数据的目标表。可以是 'db_name.table_name' 形式
++ assignment_list: 待更新的目标列,形如 'col_name = value, col_name = value' 格式
++ where expression: 期望更新的条件,一个返回 true 或者 false 的表达式即可
+
+### Note
+
+当前 UPDATE 语句仅支持在 Unique 模型上的行更新,存在并发更新导致的数据冲突可能。
+目前 Doris 并不处理这类问题,需要用户从业务侧规避这类问题。
+
+## example
+
+`test` 表是一个 unique 模型的表,包含: k1, k2, v1, v2  四个列。其中 k1, k2 是 key,v1, v2 
是value,聚合方式是 Replace。
+
+1. 将 'test' 表中满足条件 k1 =1 , k2 =2 的 v1 列更新为 1
+
+```
+UPDATE test SET v1 = 1 WHERE k1=1 and k2=2;
+```
+
+2. 将 'test' 表中 k1=1 的列的 v1 列自增1
+
+```
+UPDATE test SET v1 = v1+1 WHERE k1=1;
+```
+
+## keyword
+
+    UPDATE
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 68cfca3..5b670f3 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -239,7 +239,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, 
KW_ALL, KW_ALTER, KW_A
     KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, 
KW_COMMIT, KW_COMMITTED,
     KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, 
KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER,
     KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, 
KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
-    KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, 
KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, 
KW_DUPLICATE,
+    KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, 
KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, 
KW_DROP, KW_DROPP, KW_DUPLICATE,
     KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, 
KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE,
     KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT,
     KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, 
KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, 
KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
@@ -331,6 +331,7 @@ nonterminal QueryStmt set_operation_with_order_by_or_limit;
 nonterminal InsertStmt insert_stmt;
 nonterminal InsertTarget insert_target;
 nonterminal InsertSource insert_source;
+nonterminal UpdateStmt update_stmt;
 
 nonterminal BackupStmt backup_stmt;
 nonterminal AbstractBackupTableRefClause opt_backup_table_ref_list;
@@ -686,6 +687,8 @@ stmt ::=
     {: RESULT = stmt; :}
     | insert_stmt : stmt
     {: RESULT = stmt; :}
+    | update_stmt : stmt
+    {: RESULT = stmt; :}
     | backup_stmt : stmt
     {: RESULT = stmt; :}
     | restore_stmt : stmt
@@ -3121,6 +3124,14 @@ insert_source ::=
     :}
     ;
 
+// update stmt
+update_stmt ::=
+    KW_UPDATE table_name:tbl KW_SET expr_list:setExprs where_clause:whereClause
+    {:
+        RESULT = new UpdateStmt(tbl, setExprs, whereClause);
+    :}
+    ;
+
 // backup stmt
 backup_stmt ::=
     KW_BACKUP KW_SNAPSHOT job_label:label
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 4230c18..7c7888c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -457,7 +457,7 @@ public class Analyzer {
         result.setAliases(aliases, ref.hasExplicitAlias());
 
         // Register all legal aliases.
-        for (String alias: aliases) {
+        for (String alias : aliases) {
             // TODO(zc)
             // aliasMap_.put(alias, result);
             tupleByAlias.put(alias, result);
@@ -467,6 +467,33 @@ public class Analyzer {
         return result;
     }
 
+    /**
+     * Create an new tuple descriptor for the given table, register all table 
columns.
+     * Using this method requires external table read locks in advance.
+     */
+    public TupleDescriptor registerOlapTable(Table table, TableName tableName, 
List<String> partitions) {
+        TableRef ref = new TableRef(tableName, null, partitions == null ? null 
: new PartitionNames(false, partitions));
+        BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
+        TupleDescriptor result = globalState.descTbl.createTupleDescriptor();
+        result.setTable(table);
+        result.setRef(tableRef);
+        result.setAliases(tableRef.getAliases(), ref.hasExplicitAlias());
+        for (Column col : table.getBaseSchema(true)) {
+            SlotDescriptor slot = 
globalState.descTbl.addSlotDescriptor(result);
+            slot.setIsMaterialized(true);
+            slot.setColumn(col);
+            slot.setIsNullable(col.isAllowNull());
+            String key = tableRef.aliases_[0] + "." + col.getName();
+            slotRefMap.put(key, slot);
+        }
+        globalState.descTbl.computeStatAndMemLayout();
+        tableRefMap_.put(result.getId(), ref);
+        for (String alias : tableRef.getAliases()) {
+            tupleByAlias.put(alias, result);
+        }
+        return result;
+    }
+
     public List<TupleId> getAllTupleIds() {
         return new ArrayList<>(tableRefMap_.keySet());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index b6b503f..6b29f8d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Function;
 import org.apache.doris.catalog.FunctionSet;
+import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
@@ -1234,6 +1235,43 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         }
     }
 
+    public Expr checkTypeCompatibility(Type targetType) throws 
AnalysisException {
+        if (targetType.getPrimitiveType().equals(type.getPrimitiveType())) {
+            return this;
+        }
+        // bitmap must match exactly
+        if (targetType.getPrimitiveType() == PrimitiveType.BITMAP) {
+            throw new AnalysisException("bitmap column require the function 
return type is BITMAP");
+        }
+        // TargetTable's hll column must be hll_hash's result
+        if (targetType.getPrimitiveType() == PrimitiveType.HLL) {
+            checkHllCompatibility();
+            return this;
+        }
+        Expr newExpr = castTo(targetType);
+        newExpr.checkValueValid();
+        return newExpr;
+    }
+
+    private void checkHllCompatibility() throws AnalysisException {
+        final String hllMismatchLog = "Column's type is HLL,"
+                + " SelectList must contains HLL or hll_hash or hll_empty 
function's result";
+        if (this instanceof SlotRef) {
+            final SlotRef slot = (SlotRef) this;
+            if (!slot.getType().equals(Type.HLL)) {
+                throw new AnalysisException(hllMismatchLog);
+            }
+        } else if (this instanceof FunctionCallExpr) {
+            final FunctionCallExpr functionExpr = (FunctionCallExpr) this;
+            if 
(!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") &&
+                    
!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
+                throw new AnalysisException(hllMismatchLog);
+            }
+        } else {
+            throw new AnalysisException(hllMismatchLog);
+        }
+    }
+
     /**
      * Checks validity of cast, and
      * calls uncheckedCastTo() to
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 3b961bd..3a3cd91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -18,7 +18,6 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.alter.SchemaChangeHandler;
-import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.BrokerTable;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
@@ -29,7 +28,6 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -92,8 +90,7 @@ public class InsertStmt extends DdlStmt {
     private final TableName tblName;
     private final PartitionNames targetPartitionNames;
     // parsed from targetPartitionNames.
-    // if targetPartitionNames is not set, add all formal partitions' id of 
the table into it
-    private List<Long> targetPartitionIds = Lists.newArrayList();
+    private List<Long> targetPartitionIds;
     private final List<String> targetColumnNames;
     private QueryStmt queryStmt;
     private final List<String> planHints;
@@ -344,6 +341,7 @@ public class InsertStmt extends DdlStmt {
 
             // partition
             if (targetPartitionNames != null) {
+                targetPartitionIds = Lists.newArrayList();
                 if (olapTable.getPartitionInfo().getType() == 
PartitionType.UNPARTITIONED) {
                     
ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
                 }
@@ -355,14 +353,6 @@ public class InsertStmt extends DdlStmt {
                     }
                     targetPartitionIds.add(part.getId());
                 }
-            } else {
-                for (Partition partition : olapTable.getPartitions()) {
-                    targetPartitionIds.add(partition.getId());
-                }
-                if (targetPartitionIds.isEmpty()) {
-                    ErrorReport.reportAnalysisException(
-                            ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, 
targetTable.getName());
-                }
             }
             // need a descriptor
             DescriptorTable descTable = analyzer.getDescTbl();
@@ -559,15 +549,8 @@ public class InsertStmt extends DdlStmt {
             // check compatibility
             for (int i = 0; i < targetColumns.size(); ++i) {
                 Column column = targetColumns.get(i);
-                if (column.getType().isHllType()) {
-                    Expr expr = queryStmt.getResultExprs().get(i);
-                    checkHllCompatibility(column, expr);
-                }
-
-                if (column.getAggregationType() == AggregateType.BITMAP_UNION) 
{
-                    Expr expr = queryStmt.getResultExprs().get(i);
-                    checkBitmapCompatibility(column, expr);
-                }
+                Expr expr = queryStmt.getResultExprs().get(i);
+                queryStmt.getResultExprs().set(i, 
expr.checkTypeCompatibility(column.getType()));
             }
         }
 
@@ -647,11 +630,6 @@ public class InsertStmt extends DdlStmt {
             Expr expr = row.get(i);
             Column col = targetColumns.get(i);
 
-            // TargetTable's hll column must be hll_hash's result
-            if (col.getType().equals(Type.HLL)) {
-                checkHllCompatibility(col, expr);
-            }
-
             if (expr instanceof DefaultValueExpr) {
                 if (targetColumns.get(i).getDefaultValue() == null) {
                     throw new AnalysisException("Column has no default value, 
column=" + targetColumns.get(i).getName());
@@ -661,11 +639,7 @@ public class InsertStmt extends DdlStmt {
 
             expr.analyze(analyzer);
 
-            if (col.getAggregationType() == AggregateType.BITMAP_UNION) {
-                checkBitmapCompatibility(col, expr);
-            }
-
-            row.set(i, checkTypeCompatibility(col, expr));
+            row.set(i, expr.checkTypeCompatibility(col.getType()));
         }
     }
 
@@ -697,41 +671,6 @@ public class InsertStmt extends DdlStmt {
             }
         }
     }
-    private void checkHllCompatibility(Column col, Expr expr) throws 
AnalysisException {
-        final String hllMismatchLog = "Column's type is HLL,"
-                + " SelectList must contains HLL or hll_hash or hll_empty 
function's result, column=" + col.getName();
-        if (expr instanceof SlotRef) {
-            final SlotRef slot = (SlotRef) expr;
-            if (!slot.getType().equals(Type.HLL)) {
-                throw new AnalysisException(hllMismatchLog);
-            }
-        } else if (expr instanceof FunctionCallExpr) {
-            final FunctionCallExpr functionExpr = (FunctionCallExpr) expr;
-            if 
(!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") &&
-                    
!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_empty")) {
-                throw new AnalysisException(hllMismatchLog);
-            }
-        } else {
-            throw new AnalysisException(hllMismatchLog);
-        }
-    }
-
-    private void checkBitmapCompatibility(Column col, Expr expr) throws 
AnalysisException {
-        String errorMsg = String.format("bitmap column %s require the function 
return type is BITMAP",
-                col.getName());
-        if (!expr.getType().isBitmapType()) {
-            throw new AnalysisException(errorMsg);
-        }
-    }
-
-    private Expr checkTypeCompatibility(Column col, Expr expr) throws 
AnalysisException {
-        if (col.getDataType().equals(expr.getType().getPrimitiveType())) {
-            return expr;
-        }
-        Expr newExpr = expr.castTo(col.getType());
-        newExpr.checkValueValid();
-        return newExpr;
-    }
 
     public void prepareExpressions() throws UserException {
         List<Expr> selectList = 
Expr.cloneList(queryStmt.getBaseTblResultExprs());
@@ -739,7 +678,7 @@ public class InsertStmt extends DdlStmt {
         int numCols = targetColumns.size();
         for (int i = 0; i < numCols; ++i) {
             Column col = targetColumns.get(i);
-            Expr expr = checkTypeCompatibility(col, selectList.get(i));
+            Expr expr = 
selectList.get(i).checkTypeCompatibility(col.getType());
             selectList.set(i, expr);
             exprByName.put(col.getName(), expr);
         }
@@ -758,7 +697,8 @@ public class InsertStmt extends DdlStmt {
                     resultExprs.add(NullLiteral.create(col.getType()));
                 }
                 else {
-                    resultExprs.add(checkTypeCompatibility(col, new 
StringLiteral(col.getDefaultValue())));
+                    StringLiteral defaultValueExpr = new 
StringLiteral(col.getDefaultValue());
+                    
resultExprs.add(defaultValueExpr.checkTypeCompatibility(col.getType()));
                 }
             }
         }
@@ -814,7 +754,9 @@ public class InsertStmt extends DdlStmt {
     @Override
     public void reset() {
         super.reset();
-        targetPartitionIds.clear();
+        if (targetPartitionIds != null) {
+            targetPartitionIds.clear();
+        }
         queryStmt.reset();
         resultExprs.clear();
         exprByName.clear();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
index 65aee31..9a43c82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java
@@ -126,7 +126,6 @@ public class IntLiteral extends LiteralExpr {
                 valid = false;
                 break;
         }
-
         if (!valid) {
             throw new AnalysisException("Number out of range[" + longValue + 
"]. type: " + type);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
index 3e0b241..0e3fd0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java
@@ -20,7 +20,6 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TExprNodeType;
@@ -204,7 +203,7 @@ public class LargeIntLiteral extends LiteralExpr {
     }
 
     @Override
-    public void swapSign() throws NotImplementedException {
+    public void swapSign() {
         // swapping sign does not change the type
         value = value.negate();
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
new file mode 100644
index 0000000..4853b3f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * UPDATE is a DML statement that modifies rows in a table.
+ * The current update syntax only supports updating the filtered data of a 
single table.
+ *
+ * UPDATE table_reference
+ *     SET assignment_list
+ *     [WHERE where_condition]
+ *
+ * value:
+ *     {expr}
+ *
+ * assignment:
+ *     col_name = value
+ *
+ * assignment_list:
+ *     assignment [, assignment] ...
+ */
+public class UpdateStmt extends DdlStmt {
+
+    private TableName tableName;
+    private List<Expr> setExprs;
+    private Expr whereExpr;
+
+    // After analyzed
+    private Table targetTable;
+    private TupleDescriptor srcTupleDesc;
+
+    public UpdateStmt(TableName tableName, List<Expr> setExprs, Expr 
whereExpr) {
+        this.tableName = tableName;
+        this.setExprs = setExprs;
+        this.whereExpr = whereExpr;
+    }
+
+    public TableName getTableName() {
+        return tableName;
+    }
+
+    public List<Expr> getSetExprs() {
+        return setExprs;
+    }
+
+    public Expr getWhereExpr() {
+        return whereExpr;
+    }
+
+    public Table getTargetTable() {
+        return targetTable;
+    }
+
+    public TupleDescriptor getSrcTupleDesc() {
+        return srcTupleDesc;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        analyzeTargetTable(analyzer);
+        analyzeSetExprs(analyzer);
+        analyzeWhereExpr(analyzer);
+    }
+
+    private void analyzeTargetTable(Analyzer analyzer) throws 
AnalysisException {
+        // step1: analyze table name
+        tableName.analyze(analyzer);
+        // step2: resolve table name with catalog, only unique olap table 
could be update
+        String dbName = tableName.getDb();
+        String targetTableName = tableName.getTbl();
+        Preconditions.checkNotNull(dbName);
+        Preconditions.checkNotNull(targetTableName);
+        Database database = Catalog.getCurrentCatalog().getDb(dbName);
+        if (database == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, 
dbName);
+        }
+        targetTable = database.getTable(tableName.getTbl());
+        if (targetTable == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_TABLE_ERROR, 
tableName.getTbl());
+        }
+        if (targetTable.getType() != Table.TableType.OLAP
+                || ((OlapTable) targetTable).getKeysType() != 
KeysType.UNIQUE_KEYS) {
+            throw new AnalysisException("Only unique olap table could be 
updated.");
+        }
+        // step3: register tuple desc
+        targetTable.readLock();
+        try {
+            srcTupleDesc = analyzer.registerOlapTable(targetTable, tableName, 
null);
+        } finally {
+            targetTable.readUnlock();
+        }
+    }
+
+    private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException {
+        // step1: analyze set exprs
+        Set<String> columnMappingNames = new 
TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+        // the column expr only support binary predicate which's child(0) must 
be a SloRef.
+        // the duplicate column name of SloRef is forbidden.
+        for (Expr setExpr : setExprs) {
+            if (!(setExpr instanceof BinaryPredicate)) {
+                throw new AnalysisException("Set function expr only support eq 
binary predicate. "
+                        + "Expr: " + setExpr.toSql());
+            }
+            BinaryPredicate predicate = (BinaryPredicate) setExpr;
+            if (predicate.getOp() != BinaryPredicate.Operator.EQ) {
+                throw new AnalysisException("Set function expr only support eq 
binary predicate. "
+                        + "The predicate operator error, op: " + 
predicate.getOp());
+            }
+            Expr lhs = predicate.getChild(0);
+            if (!(lhs instanceof SlotRef)) {
+                throw new AnalysisException("Set function expr only support eq 
binary predicate "
+                        + "which's child(0) must be a column name. "
+                        + "The child(0) expr error. expr: " + lhs.toSql());
+            }
+            String column = ((SlotRef) lhs).getColumnName();
+            if (!columnMappingNames.add(column)) {
+                throw new AnalysisException("Duplicate column setting: " + 
column);
+            }
+        }
+        // step2: resolve target columns with catalog,
+        //        only value columns which belong to target table could be 
updated.
+        for (Expr setExpr : setExprs) {
+            Preconditions.checkState(setExpr instanceof BinaryPredicate);
+            // check target column
+            // 1. columns must belong to target table
+            // 2. only value columns could be updated
+            Expr lhs = setExpr.getChild(0);
+            if (!(lhs instanceof SlotRef)) {
+                throw new AnalysisException("The left side of the set expr 
must be the column name");
+            }
+            lhs.analyze(analyzer);
+            if (((SlotRef) lhs).getColumn().getAggregationType() != 
AggregateType.REPLACE) {
+                throw new AnalysisException("Only value columns of unique 
table could be updated.");
+            }
+            // check set expr of target column
+            Expr rhs = setExpr.getChild(1);
+            checkLargeIntOverflow(rhs);
+            rhs.analyze(analyzer);
+            if (lhs.getType() != rhs.getType()) {
+                setExpr.setChild(1, rhs.checkTypeCompatibility(lhs.getType()));
+            }
+        }
+    }
+
+    /*
+   The overflow detection of LargeInt needs to be verified again here.
+   The reason is: the first overflow detection(in constructor) cannot filter 
2^127.
+   Therefore, a second verification is required here.
+    */
+    private void checkLargeIntOverflow(Expr expr) throws AnalysisException {
+        if (expr instanceof LargeIntLiteral) {
+            expr.analyzeImpl(analyzer);
+        }
+    }
+
+    private void analyzeWhereExpr(Analyzer analyzer) throws AnalysisException {
+        if (whereExpr == null) {
+            throw new AnalysisException("Where clause is required");
+        }
+        whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
+        whereExpr.analyze(analyzer);
+        if (!whereExpr.getType().equals(Type.BOOLEAN)) {
+            throw new AnalysisException("Where clause is not a valid statement 
return bool");
+        }
+        analyzer.registerConjunct(whereExpr, srcTupleDesc.getId());
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder("UPDATE ");
+        sb.append(tableName.toSql()).append("\n");
+        sb.append("  ").append("SET ");
+        for (Expr setExpr : setExprs) {
+            sb.append(setExpr.toSql()).append(", ");
+        }
+        sb.append("\n");
+        if (whereExpr != null) {
+            sb.append("  ").append("WHERE ").append(whereExpr.toSql());
+        }
+        return sb.toString();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7dc43c6..63ab000 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -161,6 +161,7 @@ import org.apache.doris.load.loadv2.LoadTimeoutChecker;
 import org.apache.doris.load.routineload.RoutineLoadManager;
 import org.apache.doris.load.routineload.RoutineLoadScheduler;
 import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
+import org.apache.doris.load.update.UpdateManager;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.master.MetaHelper;
 import org.apache.doris.master.PartitionInMemoryInfoCollector;
@@ -306,6 +307,7 @@ public class Catalog {
     private ConsistencyChecker consistencyChecker;
     private BackupHandler backupHandler;
     private PublishVersionDaemon publishVersionDaemon;
+    private UpdateManager updateManager;
     private DeleteHandler deleteHandler;
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
     private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
@@ -493,6 +495,7 @@ public class Catalog {
         this.backupHandler = new BackupHandler(this);
         this.metaDir = Config.meta_dir;
         this.publishVersionDaemon = new PublishVersionDaemon();
+        this.updateManager = new UpdateManager();
         this.deleteHandler = new DeleteHandler();
         this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
         this.partitionInMemoryInfoCollector = new 
PartitionInMemoryInfoCollector();
@@ -4805,6 +4808,10 @@ public class Catalog {
         return this.backupHandler;
     }
 
+    public UpdateManager getUpdateManager() {
+        return updateManager;
+    }
+
     public DeleteHandler getDeleteHandler() {
         return this.deleteHandler;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index fc2511b..48d85b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -20,6 +20,8 @@ package org.apache.doris.catalog;
 import org.apache.doris.alter.SchemaChangeHandler;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
@@ -273,6 +275,16 @@ public class Column implements Writable {
         return this.defaultValue;
     }
 
+    public Expr getDefaultValueExpr() throws AnalysisException {
+        StringLiteral defaultValueLiteral = new StringLiteral(defaultValue);
+        if (getDataType() == PrimitiveType.VARCHAR) {
+            return defaultValueLiteral;
+        }
+        Expr result = defaultValueLiteral.castTo(getType());
+        result.checkValueValid();
+        return result;
+    }
+
     public void setStats(ColumnStats stats) {
         this.stats = stats;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 844c2b7..d409acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -826,6 +826,10 @@ public class OlapTable extends Table {
         return Sets.newHashSet(nameToPartition.keySet());
     }
 
+    public List<Long> getPartitionIds() {
+        return getPartitions().stream().map(entity -> 
entity.getId()).collect(Collectors.toList());
+    }
+
     public Set<String> getCopiedBfColumns() {
         if (bfColumns == null) {
             return null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 3f0155e..e1012dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1450,4 +1450,7 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = false, masterOnly = true)
     public static int partition_in_memory_update_interval_secs = 300;
+    
+    @ConfField(masterOnly = true)
+    public static boolean enable_concurrent_update = false;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index a07286b..30c0f45 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -376,9 +376,7 @@ public class ExportJob implements Writable {
         switch (exportTable.getType()) {
             case OLAP:
                 scanNode = new OlapScanNode(new PlanNodeId(0), 
exportTupleDesc, "OlapScanNodeForExport");
-                ((OlapScanNode) scanNode).setColumnFilters(Maps.newHashMap());
-                ((OlapScanNode) scanNode).setIsPreAggregation(false, "This an 
export operation");
-                ((OlapScanNode) scanNode).setCanTurnOnPreAggr(false);
+                ((OlapScanNode) scanNode).closePreAggregation("This an export 
operation");
                 ((OlapScanNode) 
scanNode).selectBestRollupByRollupSelector(analyzer);
                 break;
             case ODBC:
@@ -461,7 +459,7 @@ public class ExportJob implements Writable {
             ScanNode scanNode = nodes.get(i);
             TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + 
i, uuid.getLeastSignificantBits());
             Coordinator coord = new Coordinator(
-                    id, queryId, desc, Lists.newArrayList(fragment), 
Lists.newArrayList(scanNode), clusterName,
+                    id, queryId, desc, Lists.newArrayList(fragment), 
Lists.newArrayList(scanNode),
                     TimeUtils.DEFAULT_TIME_ZONE);
             coord.setExecMemoryLimit(getExecMemLimit());
             this.coordList.add(coord);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 4e0054c..7510e61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -115,7 +115,7 @@ public class LoadLoadingTask extends LoadTask {
     private void executeOnce() throws Exception {
         // New one query id,
         Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), 
loadId, planner.getDescTable(),
-                planner.getFragments(), planner.getScanNodes(), 
db.getClusterName(), planner.getTimezone());
+                planner.getFragments(), planner.getScanNodes(), 
planner.getTimezone());
         curCoordinator.setQueryType(TQueryType.LOAD);
         curCoordinator.setExecMemoryLimit(execMemLimit);
         /*
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 2d8d574..4d2dbe4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -26,7 +26,6 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.NotImplementedException;
@@ -171,30 +170,19 @@ public class LoadingTaskPlanner {
     }
 
     private List<Long> getAllPartitionIds() throws LoadException, 
MetaNotFoundException {
-        Set<Long> partitionIds = Sets.newHashSet();
+        Set<Long> specifiedPartitionIds = Sets.newHashSet();
         for (BrokerFileGroup brokerFileGroup : fileGroups) {
             if (brokerFileGroup.getPartitionIds() != null) {
-                partitionIds.addAll(brokerFileGroup.getPartitionIds());
+                
specifiedPartitionIds.addAll(brokerFileGroup.getPartitionIds());
             }
             // all file group in fileGroups should have same partitions, so 
only need to get partition ids
             // from one of these file groups
             break;
         }
-
-        if (partitionIds.isEmpty()) {
-            for (Partition partition : table.getPartitions()) {
-                partitionIds.add(partition.getId());
-            }
+        if (specifiedPartitionIds.isEmpty()) {
+            return null;
         }
-
-        // If this is a dynamic partitioned table, it will take some time to 
create the partition after the
-        // table is created, a exception needs to be thrown here
-        if (partitionIds.isEmpty()) {
-            throw new LoadException("data cannot be inserted into table with 
empty partition. " +
-                    "Use `SHOW PARTITIONS FROM " + table.getName() + "` to see 
the currently partitions of this table. ");
-        }
-
-        return Lists.newArrayList(partitionIds);
+        return Lists.newArrayList(specifiedPartitionIds);
     }
 
     // when retry load by reusing this plan in load process, the load_id 
should be changed
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java
new file mode 100644
index 0000000..3199df7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.update;
+
+import org.apache.doris.analysis.UpdateStmt;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class UpdateManager {
+    private final boolean enableConcurrentUpdate = 
Config.enable_concurrent_update;
+    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private Map<Long, List<UpdateStmtExecutor>> tableIdToCurrentUpdate = 
Maps.newConcurrentMap();
+
+    private void writeLock() {
+        rwLock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        rwLock.writeLock().unlock();
+    }
+
+    public void handleUpdate(UpdateStmt updateStmt) throws UserException {
+        UpdateStmtExecutor updateStmtExecutor = addUpdateExecutor(updateStmt);
+        try {
+            updateStmtExecutor.execute();
+        } finally {
+            removeUpdateExecutor(updateStmtExecutor);
+        }
+    }
+
+    private UpdateStmtExecutor addUpdateExecutor(UpdateStmt updateStmt) throws 
AnalysisException, DdlException {
+        writeLock();
+        try {
+            List<UpdateStmtExecutor> currentUpdateList = 
tableIdToCurrentUpdate.get(updateStmt.getTargetTable().getId());
+            if (!enableConcurrentUpdate && currentUpdateList != null && 
currentUpdateList.size() > 0) {
+                throw new DdlException("There is an update operation in 
progress for the current table. "
+                        + "Please try again later, or set 
enable_concurrent_update in fe.conf to true");
+            }
+            UpdateStmtExecutor updateStmtExecutor = 
UpdateStmtExecutor.fromUpdateStmt(updateStmt);
+            if (currentUpdateList == null) {
+                currentUpdateList = Lists.newArrayList();
+                
tableIdToCurrentUpdate.put(updateStmtExecutor.getTargetTableId(), 
currentUpdateList);
+            }
+            currentUpdateList.add(updateStmtExecutor);
+            return updateStmtExecutor;
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void removeUpdateExecutor(UpdateStmtExecutor updateStmtExecutor) {
+        writeLock();
+        try {
+            List<UpdateStmtExecutor> currentUpdateList = 
tableIdToCurrentUpdate.get(updateStmtExecutor.getTargetTableId());
+            if (currentUpdateList == null) {
+                return;
+            }
+            currentUpdateList.remove(updateStmtExecutor);
+        } finally {
+            writeUnlock();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
new file mode 100644
index 0000000..0ffe906
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.update;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.NullLiteral;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ScanNode;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.doris.alter.SchemaChangeHandler.SHADOW_NAME_PRFIX;
+
+
+public class UpdatePlanner extends Planner {
+
+    private final IdGenerator<PlanNodeId> nodeIdGenerator_ = 
PlanNodeId.createGenerator();
+    private final IdGenerator<PlanFragmentId> fragmentIdGenerator_ =
+            PlanFragmentId.createGenerator();
+
+    private long targetDBId;
+    private OlapTable targetTable;
+    private List<Expr> setExprs;
+    private TupleDescriptor srcTupleDesc;
+    private Analyzer analyzer;
+
+    private List<ScanNode> scanNodeList = Lists.newArrayList();
+
+    public UpdatePlanner(long dbId, OlapTable targetTable, List<Expr> setExprs,
+                         TupleDescriptor srcTupleDesc, Analyzer analyzer) {
+        this.targetDBId = dbId;
+        this.targetTable = targetTable;
+        this.setExprs = setExprs;
+        this.srcTupleDesc = srcTupleDesc;
+        this.analyzer = analyzer;
+    }
+
+    @Override
+    public List<ScanNode> getScanNodes() {
+        return scanNodeList;
+    }
+
+    public void plan(long txnId) throws UserException {
+        // 1. gen scan node
+        OlapScanNode olapScanNode = new 
OlapScanNode(nodeIdGenerator_.getNextId(), srcTupleDesc, "OlapScanNode");
+        /* BEGIN: Temporary code, this part of the code needs to be refactored 
*/
+        olapScanNode.closePreAggregation("This an update operation");
+        olapScanNode.useBaseIndexId();
+        /* END */
+        olapScanNode.init(analyzer);
+        olapScanNode.finalize(analyzer);
+        scanNodeList.add(olapScanNode);
+        // 2. gen olap table sink
+        OlapTableSink olapTableSink = new OlapTableSink(targetTable, 
computeTargetTupleDesc(), null);
+        olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId,
+                analyzer.getContext().getSessionVariable().queryTimeoutS);
+        olapTableSink.complete();
+        // 3. gen plan fragment
+        PlanFragment planFragment = new 
PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode,
+                DataPartition.RANDOM);
+        planFragment.setSink(olapTableSink);
+        planFragment.setOutputExprs(computeOutputExprs());
+        planFragment.finalize(analyzer, false);
+        fragments.add(planFragment);
+    }
+
+    private TupleDescriptor computeTargetTupleDesc() {
+        DescriptorTable descTable = analyzer.getDescTbl();
+        TupleDescriptor targetTupleDesc = descTable.createTupleDescriptor();
+        for (Column col : targetTable.getFullSchema()) {
+            SlotDescriptor slotDesc = 
descTable.addSlotDescriptor(targetTupleDesc);
+            slotDesc.setIsMaterialized(true);
+            slotDesc.setType(col.getType());
+            slotDesc.setColumn(col);
+            if (col.isAllowNull()) {
+                slotDesc.setIsNullable(true);
+            } else {
+                slotDesc.setIsNullable(false);
+            }
+        }
+        targetTupleDesc.computeStatAndMemLayout();
+        return targetTupleDesc;
+    }
+
+    /**
+     * There are three Rules of output exprs:
+     * RuleA: columns that need to be updated,
+     * use the right child of a set expr
+     *     base column: (k1, v1)
+     *     update stmt: set v1=1
+     *     output expr: k1, 1(use 1 as output expr)
+     * RuleB: columns that do not need to be updated,
+     * just add the original value of column -> slot ref
+     *     base column: (k1, v1)
+     *     update stmt: set v1 = 1
+     *     output expr: k1(use k1 slot ref as output expr), 1
+     * RuleC: the output columns is being added by the schema change job,
+     * need to add default value expr in output expr
+     *     base column: (k1, v1)
+     *     schema change job: add v2 column
+     *     full column: (k1, v1, v2)
+     *     output expr: k1, v1, default_value(v2)
+     */
+    private List<Expr> computeOutputExprs() throws AnalysisException {
+        Map<String, Expr> columnNameToSetExpr = Maps.newHashMap();
+        for (Expr setExpr : setExprs) {
+            Preconditions.checkState(setExpr instanceof BinaryPredicate);
+            Preconditions.checkState(setExpr.getChild(0) instanceof SlotRef);
+            SlotRef slotRef = (SlotRef) setExpr.getChild(0);
+            // pay attention to case ignore of column name
+            columnNameToSetExpr.put(slotRef.getColumnName().toLowerCase(), 
setExpr.getChild(1));
+        }
+        Map<String, SlotDescriptor> columnNameToSrcSlotDesc = 
Maps.newHashMap();
+        for (SlotDescriptor srcSlotDesc : srcTupleDesc.getSlots()) {
+            // pay attention to case ignore of column name
+            
columnNameToSrcSlotDesc.put(srcSlotDesc.getColumn().getName().toLowerCase(), 
srcSlotDesc);
+        }
+
+        // compute output expr
+        List<Expr> outputExprs = Lists.newArrayList();
+        for (int i = 0; i < targetTable.getFullSchema().size(); i++) {
+            Column column = targetTable.getFullSchema().get(i);
+            // pay attention to case ignore of column name
+            String originColumnName = 
(column.getName().startsWith(SHADOW_NAME_PRFIX) ?
+                    column.getName().substring(SHADOW_NAME_PRFIX.length()) : 
column.getName())
+                    .toLowerCase();
+            Expr setExpr = columnNameToSetExpr.get(originColumnName);
+            SlotDescriptor srcSlotDesc = 
columnNameToSrcSlotDesc.get(originColumnName);
+            if (setExpr != null) {
+                // RuleA
+                outputExprs.add(setExpr);
+            } else if (srcSlotDesc != null) {
+                // RuleB
+                SlotRef slotRef = new SlotRef(srcSlotDesc);
+                outputExprs.add(slotRef);
+            } else {
+                // RuleC
+                Expr defaultExpr;
+                if (column.getDefaultValue() != null) {
+                    defaultExpr = column.getDefaultValueExpr();
+                } else {
+                    if (column.isAllowNull()) {
+                        defaultExpr = NullLiteral.create(column.getType());
+                    } else {
+                        throw new AnalysisException("column has no source 
field, column=" + column.getName());
+                    }
+                }
+                defaultExpr.analyze(analyzer);
+                outputExprs.add(defaultExpr);
+            }
+        }
+        return outputExprs;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
new file mode 100644
index 0000000..1e8988d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.update;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.UpdateStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.TQueryType;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionCommitFailedException;
+import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+import org.apache.doris.transaction.TransactionState.TxnCoordinator;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class UpdateStmtExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(UpdateStmtExecutor.class);
+
+    private OlapTable targetTable;
+    private Expr whereExpr;
+    private List<Expr> setExprs;
+    private long dbId;
+    private TUniqueId queryId;
+    private int timeoutSecond;
+    private Analyzer analyzer;
+    private UpdatePlanner updatePlanner;
+
+    private String label;
+    private long txnId;
+    private Coordinator coordinator;
+    private long effectRows;
+
+
+    public long getTargetTableId() {
+        return targetTable.getId();
+    }
+
+    public void execute() throws UserException {
+        // 0. empty set
+        // A where clause with a constant equal to false will not execute the 
update directly
+        // Example: update xxx set v1=0 where 1=2
+        if (analyzer.hasEmptyResultSet()) {
+            QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
+            analyzer.getContext().getState().setOk();
+            return;
+        }
+
+        // 1. begin txn
+        beginTxn();
+
+        // 2. plan
+        targetTable.readLock();
+        try {
+            updatePlanner.plan(txnId);
+        } catch (Throwable e) {
+            LOG.warn("failed to plan update stmt, query id:{}", 
DebugUtil.printId(queryId), e);
+            Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
txnId, e.getMessage());
+            QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
+            throw new DdlException("failed to execute update stmt, query id:" 
+ DebugUtil.printId(queryId), e);
+        } finally {
+            targetTable.readUnlock();
+        }
+
+        // 3. execute plan
+        try {
+            executePlan();
+        } catch (DdlException e) {
+            LOG.warn("failed to execute update stmt, query id:{}", 
DebugUtil.printId(queryId), e);
+            Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
txnId, e.getMessage());
+            throw e;
+        } catch (Throwable e) {
+            LOG.warn("failed to execute update stmt, query id:{}", 
DebugUtil.printId(queryId), e);
+            Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
txnId, e.getMessage());
+            throw new DdlException("failed to execute update stmt, query id:" 
+ DebugUtil.printId(queryId), e);
+        } finally {
+            QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
+        }
+        
+        // 4. commit and publish
+        commitAndPublishTxn();
+    }
+
+    private void beginTxn() throws LabelAlreadyUsedException, 
AnalysisException, BeginTransactionException,
+            DuplicatedRequestException, QuotaExceedException, 
MetaNotFoundException {
+        LOG.info("begin transaction for update stmt, query id:{}", 
DebugUtil.printId(queryId));
+        MetricRepo.COUNTER_LOAD_ADD.increase(1L);
+        label = "update_" + DebugUtil.printId(queryId);
+        txnId = Catalog.getCurrentGlobalTransactionMgr()
+                .beginTransaction(dbId, 
Lists.newArrayList(targetTable.getId()), label,
+                        new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                        LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
+    }
+
+    // TODO(ML): Abstract the logic of executing the coordinater and retrying.
+    //           It makes stmt such as insert, load, update and export can be 
reused
+    private void executePlan() throws Exception {
+        LOG.info("begin execute update stmt, query id:{}", 
DebugUtil.printId(queryId));
+        coordinator = new Coordinator(Catalog.getCurrentCatalog().getNextId(), 
queryId, analyzer.getDescTbl(),
+                updatePlanner.getFragments(), updatePlanner.getScanNodes(), 
TimeUtils.DEFAULT_TIME_ZONE);
+        coordinator.setQueryType(TQueryType.LOAD);
+        QeProcessorImpl.INSTANCE.registerQuery(queryId, coordinator);
+        analyzer.getContext().getExecutor().setCoord(coordinator);
+
+        // execute
+        coordinator.setTimeout(timeoutSecond);
+        coordinator.exec();
+        if (coordinator.join(timeoutSecond)) {
+            if (!coordinator.isDone()) {
+                coordinator.cancel();
+                ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
+            }
+            if (!coordinator.getExecStatus().ok()) {
+                String errMsg = "update failed: " + 
coordinator.getExecStatus().getErrorMsg();
+                LOG.warn(errMsg);
+                throw new DdlException(errMsg);
+            }
+            LOG.info("finish to execute update stmt, query id:{}", 
DebugUtil.printId(queryId));
+        } else {
+            String errMsg = "coordinator could not finished before update 
timeout: "
+                    + coordinator.getExecStatus().getErrorMsg();
+            LOG.warn(errMsg);
+            throw new DdlException(errMsg);
+        }
+
+        // counter
+        if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != 
null) {
+            effectRows = 
Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
+            if 
(Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)) 
!= 0) {
+                throw new DdlException("update failed, some rows did not take 
effect");
+            }
+        }
+    }
+
+    private void commitAndPublishTxn() throws UserException {
+        GlobalTransactionMgr globalTransactionMgr = 
Catalog.getCurrentGlobalTransactionMgr();
+        // situation1: no data is updated, abort transaction
+        if (effectRows == 0) {
+            LOG.info("abort transaction for update stmt, query id:{}, reason: 
{}", DebugUtil.printId(queryId),
+                    TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+            globalTransactionMgr.abortTransaction(dbId, txnId, 
TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
+            StringBuilder sb = new StringBuilder();
+            sb.append("{'label':'").append(label);
+            sb.append(", 'txnId':'").append(txnId).append("'");
+            sb.append(", 
'queryId':'").append(DebugUtil.printId(queryId)).append("'");
+            sb.append("}");
+            analyzer.getContext().getState().setOk(effectRows, 0, 
sb.toString());
+            return;
+        }
+        TransactionStatus txnStatus;
+        // situation2: data is updated, commit and publish transaction
+        boolean isPublished;
+        try {
+            LOG.info("commit and publish transaction for update stmt, query 
id: {}", DebugUtil.printId(queryId));
+            isPublished = 
globalTransactionMgr.commitAndPublishTransaction(Catalog.getCurrentCatalog().getDb(dbId),
+                    Lists.newArrayList(targetTable), txnId,
+                    TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
+                    
analyzer.getContext().getSessionVariable().getInsertVisibleTimeoutMs());
+        } catch (Throwable e) {
+            // situation2.1: publish error, throw exception
+            String errMsg = "failed to commit and publish transaction for 
update stmt, query id:"
+                    + DebugUtil.printId(queryId);
+            LOG.warn(errMsg, e);
+            globalTransactionMgr.abortTransaction(dbId, txnId, e.getMessage());
+            throw new DdlException(errMsg, e);
+        }
+        String errMsg = null;
+        if (isPublished) {
+            // situation2.2: publish successful
+            txnStatus = TransactionStatus.VISIBLE;
+            MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+        } else {
+            // situation2.3: be published later
+            txnStatus = TransactionStatus.COMMITTED;
+            errMsg = "transaction will be published later, data will be 
visible later";
+            LOG.warn("transaction will be published later, query id: {}", 
DebugUtil.printId(queryId));
+        }
+
+        // set context
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(label).append("', 
'status':'").append(txnStatus.name()).append("'");
+        sb.append(", 'txnId':'").append(txnId).append("'");
+        sb.append(", 
'queryId':'").append(DebugUtil.printId(queryId)).append("'");
+        if (errMsg != null) {
+            sb.append(", 'err':'").append(errMsg).append("'");
+        }
+        sb.append("}");
+        analyzer.getContext().getState().setOk(effectRows, 0, sb.toString());
+    }
+
+    public static UpdateStmtExecutor fromUpdateStmt(UpdateStmt updateStmt) 
throws AnalysisException {
+        UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor();
+        updateStmtExecutor.targetTable = (OlapTable) 
updateStmt.getTargetTable();
+        updateStmtExecutor.whereExpr = updateStmt.getWhereExpr();
+        updateStmtExecutor.setExprs = updateStmt.getSetExprs();
+        Database database = 
Catalog.getCurrentCatalog().getDb(updateStmt.getTableName().getDb());
+        if (database == null) {
+            String errMsg = "Database does not exists in update stmt, db:" + 
updateStmt.getTableName().getDb();
+            LOG.info(errMsg);
+            throw new AnalysisException(errMsg);
+        }
+        updateStmtExecutor.dbId = database.getId();
+        updateStmtExecutor.analyzer = updateStmt.getAnalyzer();
+        updateStmtExecutor.queryId = 
updateStmtExecutor.analyzer.getContext().queryId();
+        updateStmtExecutor.timeoutSecond = 
updateStmtExecutor.analyzer.getContext()
+                .getSessionVariable().getQueryTimeoutS();
+        updateStmtExecutor.updatePlanner = new 
UpdatePlanner(updateStmtExecutor.dbId, updateStmtExecutor.targetTable,
+                updateStmt.getSetExprs(), updateStmt.getSrcTupleDesc(),
+                updateStmt.getAnalyzer());
+        return updateStmtExecutor;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
index e8c2b71..2ed1a4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -80,7 +80,7 @@ public class EsScanNode extends ScanNode {
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
-
+        computeColumnFilter();
         assignBackends();
         computeStats(analyzer);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index aab988f..7916dca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -162,6 +162,11 @@ public class OlapScanNode extends ScanNode {
         this.canTurnOnPreAggr = canChangePreAggr;
     }
 
+    public void closePreAggregation(String reason) {
+        setIsPreAggregation(false, reason);
+        setCanTurnOnPreAggr(false);
+    }
+
     public boolean getForceOpenPreAgg() {
         return forceOpenPreAgg;
     }
@@ -175,6 +180,20 @@ public class OlapScanNode extends ScanNode {
     }
 
     /**
+     * The function is used to directly select the index id of the base table 
as the selectedIndexId.
+     * It makes sure that the olap scan node must scan the base data rather 
than scan the materialized view data.
+     *
+     * This function is mainly used to update stmt.
+     * Update stmt also needs to scan data like normal queries.
+     * But its syntax is different from ordinary queries,
+     *   so planner cannot use the logic of query to automatically match the 
best index id.
+     * So, here it need to manually specify the index id to scan the base 
table directly.
+     */
+    public void useBaseIndexId() {
+        this.selectedIndexId = olapTable.getBaseIndexId();
+    }
+
+    /**
      * This method is mainly used to update scan range info in OlapScanNode by 
the new materialized selector.
      * Situation1:
      * If the new scan range is same as the old scan range which determined by 
the old materialized selector,
@@ -295,6 +314,7 @@ public class OlapScanNode extends ScanNode {
         super.init(analyzer);
 
         filterDeletedRows(analyzer);
+        computeColumnFilter();
         computePartitionInfo();
         computeTupleState(analyzer);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 8ca0cca..3268374 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -67,7 +67,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -82,7 +81,7 @@ public class OlapTableSink extends DataSink {
     // input variables
     private OlapTable dstTable;
     private TupleDescriptor tupleDescriptor;
-    // specified partition ids. this list should not be empty and should 
contains all related partition ids
+    // specified partition ids.
     private List<Long> partitionIds;
 
     // set after init called
@@ -91,8 +90,6 @@ public class OlapTableSink extends DataSink {
     public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, 
List<Long> partitionIds) {
         this.dstTable = dstTable;
         this.tupleDescriptor = tupleDescriptor;
-        Preconditions.checkState(!CollectionUtils.isEmpty(partitionIds),
-            "The specified partition ids is empty.");
         this.partitionIds = partitionIds;
     }
 
@@ -106,6 +103,12 @@ public class OlapTableSink extends DataSink {
         tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK);
         tDataSink.setOlapTableSink(tSink);
 
+        if (partitionIds == null) {
+            partitionIds = dstTable.getPartitionIds();
+            if (partitionIds.isEmpty()) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, 
dstTable.getName());
+            }
+        }
         for (Long partitionId : partitionIds) {
             Partition part = dstTable.getPartition(partitionId);
             if (part == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 983a1a7..e7b82c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -293,6 +293,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         this.conjuncts.addAll(conjuncts);
     }
 
+    public void addConjunct(Expr conjunct) {
+        if (conjuncts == null) {
+            conjuncts = Lists.newArrayList();
+        }
+        conjuncts.add(conjunct);
+    }
+
     public void setAssignedConjuncts(Set<ExprId> conjuncts) {
         assignedConjuncts = conjuncts;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 86b2fd2..bb2efa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -56,7 +56,7 @@ public class Planner {
 
     private boolean isBlockQuery = false;
 
-    private ArrayList<PlanFragment> fragments = Lists.newArrayList();
+    protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
 
     private PlannerContext plannerContext;
     private SingleNodePlanner singleNodePlanner;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
new file mode 100644
index 0000000..28abd58
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.JoinOperator;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleId;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class PredicatePushDown {
+    private final static Logger LOG = 
LogManager.getLogger(PredicatePushDown.class);
+
+    public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator 
joinOp, Analyzer analyzer) {
+        switch (joinOp) {
+            case INNER_JOIN:
+            case LEFT_OUTER_JOIN:
+                predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer);
+                break;
+            // TODO
+            default:
+                break;
+        }
+        return scanNode;
+    }
+
+    private static void predicateFromLeftSidePropagatesToRightSide(ScanNode 
scanNode, Analyzer analyzer) {
+        List<TupleId> tupleIdList = scanNode.getTupleIds();
+        if (tupleIdList.size() != 1) {
+            LOG.info("The predicate pushdown is not reflected "
+                            + "because the scan node involves more then one 
tuple:{}",
+                    Strings.listToString(tupleIdList));
+            return;
+        }
+        TupleId rightSideTuple = tupleIdList.get(0);
+        List<Expr> unassignedRightSideConjuncts = 
analyzer.getUnassignedConjuncts(scanNode);
+        List<Expr> eqJoinPredicates = 
analyzer.getEqJoinConjuncts(rightSideTuple);
+        if (eqJoinPredicates != null) {
+            List<Expr> allConjuncts = 
analyzer.getConjuncts(analyzer.getAllTupleIds());
+            allConjuncts.removeAll(unassignedRightSideConjuncts);
+            for (Expr conjunct : allConjuncts) {
+                if (!Predicate.canPushDownPredicate(conjunct)) {
+                    continue;
+                }
+                for (Expr eqJoinPredicate : eqJoinPredicates) {
+                    // we can ensure slot is left node, because 
NormalizeBinaryPredicatesRule
+                    SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef();
+
+                    // ensure the children for eqJoinPredicate both be SlotRef
+                    if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null
+                            || eqJoinPredicate.getChild(1).unwrapSlotRef() == 
null) {
+                        continue;
+                    }
+
+                    SlotRef leftSlot = 
eqJoinPredicate.getChild(0).unwrapSlotRef();
+                    SlotRef rightSlot = 
eqJoinPredicate.getChild(1).unwrapSlotRef();
+                    // ensure the type is match
+                    if 
(!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) {
+                        continue;
+                    }
+
+                    // example: t1.id = t2.id and t1.id = 1  => t2.id =1
+                    if (otherSlot.isBound(leftSlot.getSlotId())
+                            && rightSlot.isBound(rightSideTuple)) {
+                        Expr pushDownConjunct = rewritePredicate(analyzer, 
conjunct, rightSlot);
+                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
+                        scanNode.addConjunct(pushDownConjunct);
+                    } else if (otherSlot.isBound(rightSlot.getSlotId())
+                            && leftSlot.isBound(rightSideTuple)) {
+                        Expr pushDownConjunct = rewritePredicate(analyzer, 
conjunct, leftSlot);
+                        LOG.debug("pushDownConjunct: {}", pushDownConjunct);
+                        scanNode.addConjunct(pushDownConjunct);
+                    }
+                }
+            }
+        }
+    }
+
+    // Rewrite the oldPredicate with new leftChild
+    // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return 
t2.id = 1
+    private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, 
Expr leftChild) {
+        if (oldPredicate instanceof BinaryPredicate) {
+            BinaryPredicate oldBP = (BinaryPredicate) oldPredicate;
+            BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, 
oldBP.getChild(1));
+            bp.analyzeNoThrow(analyzer);
+            return bp;
+        }
+
+        if (oldPredicate instanceof InPredicate) {
+            InPredicate oldIP = (InPredicate) oldPredicate;
+            InPredicate ip = new InPredicate(leftChild, 
oldIP.getListChildren(), oldIP.isNotIn());
+            ip.analyzeNoThrow(analyzer);
+            return ip;
+        }
+
+        return oldPredicate;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 649d1c5..a8bce22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -18,15 +18,26 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.IsNullPredicate;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.UserException;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
@@ -35,8 +46,9 @@ import java.util.Map;
  * Representation of the common elements of all scan nodes.
  */
 abstract public class ScanNode extends PlanNode {
+    private final static Logger LOG = LogManager.getLogger(ScanNode.class);
     protected final TupleDescriptor desc;
-    protected Map<String, PartitionColumnFilter> columnFilters;
+    protected Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
     protected String sortColumn = null;
 
     public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
@@ -65,10 +77,6 @@ abstract public class ScanNode extends PlanNode {
 
     public TupleDescriptor getTupleDesc() { return desc; }
 
-    public void setColumnFilters(Map<String, PartitionColumnFilter> 
columnFilters) {
-        this.columnFilters = columnFilters;
-    }
-
     public void setSortColumn(String column) {
         sortColumn = column;
     }
@@ -96,6 +104,100 @@ abstract public class ScanNode extends PlanNode {
      */
     abstract public List<TScanRangeLocations> getScanRangeLocations(long 
maxScanRangeLength);
 
+    // TODO(ML): move it into PrunerOptimizer
+    public void computeColumnFilter() {
+        for (Column column : desc.getTable().getBaseSchema()) {
+            SlotDescriptor slotDesc = desc.getColumnSlot(column.getName());
+            if (null == slotDesc) {
+                continue;
+            }
+            PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, 
conjuncts);
+            if (null != keyFilter) {
+                columnFilters.put(column.getName(), keyFilter);
+            }
+        }
+    }
+
+    private PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, 
List<Expr> conjuncts) {
+        PartitionColumnFilter partitionColumnFilter = null;
+        for (Expr expr : conjuncts) {
+            if (!expr.isBound(desc.getId())) {
+                continue;
+            }
+            if (expr instanceof BinaryPredicate) {
+                BinaryPredicate binPredicate = (BinaryPredicate) expr;
+                Expr slotBinding = binPredicate.getSlotBinding(desc.getId());
+                if (slotBinding == null || !slotBinding.isConstant()) {
+                    continue;
+                }
+                if (binPredicate.getOp() == BinaryPredicate.Operator.NE
+                        || !(slotBinding instanceof LiteralExpr)) {
+                    continue;
+                }
+
+                if (null == partitionColumnFilter) {
+                    partitionColumnFilter = new PartitionColumnFilter();
+                }
+                LiteralExpr literal = (LiteralExpr) slotBinding;
+                BinaryPredicate.Operator op = binPredicate.getOp();
+                if (!binPredicate.slotIsLeft()) {
+                    op = op.commutative();
+                }
+                switch (op) {
+                    case EQ:
+                        partitionColumnFilter.setLowerBound(literal, true);
+                        partitionColumnFilter.setUpperBound(literal, true);
+                        break;
+                    case LE:
+                        partitionColumnFilter.setUpperBound(literal, true);
+                        partitionColumnFilter.lowerBoundInclusive = true;
+                        break;
+                    case LT:
+                        partitionColumnFilter.setUpperBound(literal, false);
+                        partitionColumnFilter.lowerBoundInclusive = true;
+                        break;
+                    case GE:
+                        partitionColumnFilter.setLowerBound(literal, true);
+                        break;
+                    case GT:
+                        partitionColumnFilter.setLowerBound(literal, false);
+                        break;
+                    default:
+                        break;
+                }
+            } else if (expr instanceof InPredicate) {
+                InPredicate inPredicate = (InPredicate) expr;
+                if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) 
{
+                    continue;
+                }
+                if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof 
SlotRef)) {
+                    // If child(0) of the in predicate is not a SlotRef,
+                    // then other children of in predicate should not be used 
as a condition for partition prune.
+                    continue;
+                }
+                if (null == partitionColumnFilter) {
+                    partitionColumnFilter = new PartitionColumnFilter();
+                }
+                partitionColumnFilter.setInPredicate(inPredicate);
+            } else if (expr instanceof IsNullPredicate) {
+                IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
+                if (!isNullPredicate.isSlotRefChildren() || 
isNullPredicate.isNotNull()) {
+                    continue;
+                }
+
+                // If we meet a IsNull predicate on partition column, then 
other predicates are useless
+                // eg: (xxxx) and (col is null), only the IsNull predicate has 
an effect on partition pruning.
+                partitionColumnFilter = new PartitionColumnFilter();
+                NullLiteral nullLiteral = new NullLiteral();
+                partitionColumnFilter.setLowerBound(nullLiteral, true);
+                partitionColumnFilter.setUpperBound(nullLiteral, true);
+                break;
+            }
+        }
+        LOG.debug("partitionColumnFilter: {}", partitionColumnFilter);
+        return partitionColumnFilter;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("tid", 
desc.getId().asInt()).add("tblName",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index f11fbf7..801ba0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1228,7 +1228,6 @@ public class SingleNodePlanner {
                         || !(slotBinding instanceof LiteralExpr)) {
                     continue;
                 }
-
                 if (null == partitionColumnFilter) {
                     partitionColumnFilter = new PartitionColumnFilter();
                 }
@@ -1696,65 +1695,8 @@ public class SingleNodePlanner {
                 break;
         }
         if (scanNode instanceof OlapScanNode || scanNode instanceof 
EsScanNode) {
-            Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
-            List<Expr> conjuncts = analyzer.getUnassignedConjuncts(scanNode);
-
-            // push down join predicate
-            List<Expr> pushDownConjuncts = Lists.newArrayList();
-            TupleId tupleId = tblRef.getId();
-            List<Expr> eqJoinPredicates = analyzer.getEqJoinConjuncts(tupleId);
-            if (eqJoinPredicates != null) {
-                // only inner and left outer join
-                if ((tblRef.getJoinOp().isInnerJoin() || 
tblRef.getJoinOp().isLeftOuterJoin())) {
-                    List<Expr> allConjuncts = 
analyzer.getConjuncts(analyzer.getAllTupleIds());
-                    allConjuncts.removeAll(conjuncts);
-                    for (Expr conjunct : allConjuncts) {
-                        if 
(org.apache.doris.analysis.Predicate.canPushDownPredicate(conjunct)) {
-                            for (Expr eqJoinPredicate : eqJoinPredicates) {
-                                // we can ensure slot is left node, because 
NormalizeBinaryPredicatesRule
-                                SlotRef otherSlot = 
conjunct.getChild(0).unwrapSlotRef();
-
-                                // ensure the children for eqJoinPredicate 
both be SlotRef
-                                if 
(eqJoinPredicate.getChild(0).unwrapSlotRef() == null || 
eqJoinPredicate.getChild(1).unwrapSlotRef() == null) {
-                                    continue;
-                                }
-
-                                SlotRef leftSlot = 
eqJoinPredicate.getChild(0).unwrapSlotRef();
-                                SlotRef rightSlot = 
eqJoinPredicate.getChild(1).unwrapSlotRef();
-
-                                // ensure the type is match
-                                if 
(!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) {
-                                    continue;
-                                }
-
-                                // example: t1.id = t2.id and t1.id = 1  => 
t2.id =1
-                                if (otherSlot.isBound(leftSlot.getSlotId()) && 
rightSlot.isBound(tupleId)) {
-                                    
pushDownConjuncts.add(rewritePredicate(analyzer, conjunct, rightSlot));
-                                } else if 
(otherSlot.isBound(rightSlot.getSlotId()) && leftSlot.isBound(tupleId)) {
-                                    
pushDownConjuncts.add(rewritePredicate(analyzer, conjunct, leftSlot));
-                                }
-                            }
-                        }
-                    }
-                }
-
-                LOG.debug("pushDownConjuncts: {}", pushDownConjuncts);
-                conjuncts.addAll(pushDownConjuncts);
-            }
-
-            for (Column column : tblRef.getTable().getBaseSchema()) {
-                SlotDescriptor slotDesc = 
tblRef.getDesc().getColumnSlot(column.getName());
-                if (null == slotDesc) {
-                    continue;
-                }
-                PartitionColumnFilter keyFilter = 
createPartitionFilter(slotDesc, conjuncts);
-                if (null != keyFilter) {
-                    columnFilters.put(column.getName(), keyFilter);
-                }
-            }
-            scanNode.setColumnFilters(columnFilters);
+            PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), 
analyzer);
             scanNode.setSortColumn(tblRef.getSortColumn());
-            scanNode.addConjuncts(pushDownConjuncts);
         }
 
         scanNodes.add(scanNode);
@@ -1767,26 +1709,6 @@ public class SingleNodePlanner {
         return scanNode;
     }
 
-    // Rewrite the oldPredicate with new leftChild
-    // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return 
t2.id = 1
-    private Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr 
leftChild) {
-        if (oldPredicate instanceof BinaryPredicate) {
-            BinaryPredicate oldBP = (BinaryPredicate) oldPredicate;
-            BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, 
oldBP.getChild(1));
-            bp.analyzeNoThrow(analyzer);
-            return bp;
-        }
-
-        if (oldPredicate instanceof InPredicate) {
-            InPredicate oldIP = (InPredicate) oldPredicate;
-            InPredicate ip = new InPredicate(leftChild, 
oldIP.getListChildren(), oldIP.isNotIn());
-            ip.analyzeNoThrow(analyzer);
-            return ip;
-        }
-
-        return oldPredicate;
-    }
-
     /**
      * Return join conjuncts that can be used for hash table lookups. - for 
inner joins, those are equi-join predicates
      * in which one side is fully bound by lhsIds and the other by rhs' id; - 
for outer joins: same type of conjuncts as
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 3377330..f232334 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -211,7 +211,7 @@ public class StreamLoadPlanner {
     }
 
     // get all specified partition ids.
-    // if no partition specified, return all partitions
+    // if no partition specified, return null
     private List<Long> getAllPartitionIds() throws DdlException, 
AnalysisException {
         List<Long> partitionIds = Lists.newArrayList();
 
@@ -224,46 +224,37 @@ public class StreamLoadPlanner {
                 }
                 partitionIds.add(part.getId());
             }
-        } else {
-            List<Expr> conjuncts = scanNode.getConjuncts();
-            if (destTable.getPartitionInfo().getType() != 
PartitionType.UNPARTITIONED && !conjuncts.isEmpty()) {
-                PartitionInfo partitionInfo = destTable.getPartitionInfo();
-                Map<Long, PartitionItem> itemById = 
partitionInfo.getIdToItem(false);
-                Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
-                for (Column column : partitionInfo.getPartitionColumns()) {
-                    SlotDescriptor slotDesc = 
tupleDesc.getColumnSlot(column.getName());
-                    if (null == slotDesc) {
-                        continue;
-                    }
-                    PartitionColumnFilter keyFilter = 
SingleNodePlanner.createPartitionFilter(slotDesc, conjuncts);
-                    if (null != keyFilter) {
-                        columnFilters.put(column.getName(), keyFilter);
-                    }
-                }
-                if (columnFilters.isEmpty()) {
-                    partitionIds.addAll(itemById.keySet());
-                } else {
-                    PartitionPruner partitionPruner = null;
-                    if (destTable.getPartitionInfo().getType() == 
PartitionType.RANGE) {
-                        partitionPruner = new RangePartitionPruner(itemById,
-                                partitionInfo.getPartitionColumns(), 
columnFilters);
-                    } else if (destTable.getPartitionInfo().getType() == 
PartitionType.LIST) {
-                        partitionPruner = new ListPartitionPruner(itemById,
-                                partitionInfo.getPartitionColumns(), 
columnFilters);
-                    }
-                    partitionIds.addAll(partitionPruner.prune());
+            return partitionIds;
+        }
+        List<Expr> conjuncts = scanNode.getConjuncts();
+        if (destTable.getPartitionInfo().getType() != 
PartitionType.UNPARTITIONED && !conjuncts.isEmpty()) {
+            PartitionInfo partitionInfo = destTable.getPartitionInfo();
+            Map<Long, PartitionItem> itemById = 
partitionInfo.getIdToItem(false);
+            Map<String, PartitionColumnFilter> columnFilters = 
Maps.newHashMap();
+            for (Column column : partitionInfo.getPartitionColumns()) {
+                SlotDescriptor slotDesc = 
tupleDesc.getColumnSlot(column.getName());
+                if (null == slotDesc) {
+                    continue;
                 }
-            } else {
-                for (Partition partition : destTable.getPartitions()) {
-                    partitionIds.add(partition.getId());
+                PartitionColumnFilter keyFilter = 
SingleNodePlanner.createPartitionFilter(slotDesc, conjuncts);
+                if (null != keyFilter) {
+                    columnFilters.put(column.getName(), keyFilter);
                 }
             }
-
-            if (partitionIds.isEmpty()) {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, 
destTable.getName());
+            if (columnFilters.isEmpty()) {
+                return null;
             }
+            PartitionPruner partitionPruner = null;
+            if (destTable.getPartitionInfo().getType() == PartitionType.RANGE) 
{
+                partitionPruner = new RangePartitionPruner(itemById,
+                        partitionInfo.getPartitionColumns(), columnFilters);
+            } else if (destTable.getPartitionInfo().getType() == 
PartitionType.LIST) {
+                partitionPruner = new ListPartitionPruner(itemById,
+                        partitionInfo.getPartitionColumns(), columnFilters);
+            }
+            partitionIds.addAll(partitionPruner.prune());
+            return partitionIds;
         }
-
-        return partitionIds;
+        return null;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 8f1dad8..0aeadd8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -369,6 +369,10 @@ public class ConnectContext {
         this.executor = executor;
     }
 
+    public StmtExecutor getExecutor() {
+        return executor;
+    }
+
     public void cleanup() {
         mysqlChannel.close();
         threadLocalInfo.remove();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 0f567ab..6ce8906 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -482,6 +482,7 @@ public class ConnectProcessor {
             // 0 for compatibility.
             int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0;
             executor = new StmtExecutor(ctx, new 
OriginStatement(request.getSql(), idx), true);
+            ctx.setExecutor(executor);
             TUniqueId queryId; // This query id will be set in ctx
             if (request.isSetQueryId()) {
                 queryId = request.getQueryId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 11375f4..9cefa7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -201,7 +201,6 @@ public class Coordinator {
     private TResourceInfo tResourceInfo;
     private boolean needReport;
 
-    private String clusterName;
     // parallel execute
     private final TUniqueId nextInstanceId;
 
@@ -235,16 +234,15 @@ public class Coordinator {
         this.tResourceInfo = new TResourceInfo(context.getQualifiedUser(),
                 context.getSessionVariable().getResourceGroup());
         this.needReport = context.getSessionVariable().isReportSucc();
-        this.clusterName = context.getClusterName();
         this.nextInstanceId = new TUniqueId();
         nextInstanceId.setHi(queryId.hi);
         nextInstanceId.setLo(queryId.lo + 1);
         this.assignedRuntimeFilters = analyzer.getAssignedRuntimeFilter();
     }
 
-    // Used for broker load task/export task coordinator
+    // Used for broker load task/export task/update coordinator
     public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable 
descTable,
-            List<PlanFragment> fragments, List<ScanNode> scanNodes, String 
cluster, String timezone) {
+            List<PlanFragment> fragments, List<ScanNode> scanNodes, String 
timezone) {
         this.isBlockQuery = true;
         this.jobId = jobId;
         this.queryId = queryId;
@@ -257,7 +255,6 @@ public class Coordinator {
         this.queryGlobals.setTimeZone(timezone);
         this.tResourceInfo = new TResourceInfo("", "");
         this.needReport = true;
-        this.clusterName = cluster;
         this.nextInstanceId = new TUniqueId();
         nextInstanceId.setHi(queryId.hi);
         nextInstanceId.setLo(queryId.lo + 1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index e3b082a..1045caf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -78,6 +78,7 @@ import org.apache.doris.analysis.StopRoutineLoadStmt;
 import org.apache.doris.analysis.SyncStmt;
 import org.apache.doris.analysis.TruncateTableStmt;
 import org.apache.doris.analysis.UninstallPluginStmt;
+import org.apache.doris.analysis.UpdateStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.EncryptKeyHelper;
 import org.apache.doris.common.Config;
@@ -163,6 +164,8 @@ public class DdlExecutor {
             
catalog.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) 
ddlStmt);
         } else if (ddlStmt instanceof AlterRoutineLoadStmt) {
             
catalog.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) 
ddlStmt);
+        } else if (ddlStmt instanceof UpdateStmt) {
+            catalog.getUpdateManager().handleUpdate((UpdateStmt) ddlStmt);
         } else if (ddlStmt instanceof DeleteStmt) {
             catalog.getDeleteHandler().process((DeleteStmt) ddlStmt);
         } else if (ddlStmt instanceof CreateUserStmt) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
index f753e4a..cf0376a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java
@@ -70,6 +70,9 @@ public class QueryState {
     }
 
     public void setOk() {
+        if (stateType == MysqlStateType.OK) {
+            return;
+        }
         setOk(0, 0, null);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 5938712..4cf5033 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -181,6 +181,10 @@ public class StmtExecutor implements ProfileWriter {
         this.isProxy = false;
     }
 
+    public void setCoord(Coordinator coord) {
+        this.coord = coord;
+    }
+
     // At the end of query execution, we begin to add up profile
     private void initProfile(QueryPlannerProfile plannerProfile, boolean 
waiteBeReport) {
         long currentTimestamp = System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 28a77e3..426d0be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -65,7 +65,7 @@ public class TransactionState implements Writable {
     public enum LoadJobSourceType {
         FRONTEND(1),        // old dpp load, mini load, insert stmt(not 
streaming type) use this type
         BACKEND_STREAMING(2),         // streaming load use this type
-        INSERT_STREAMING(3), // insert stmt (streaming type) use this type
+        INSERT_STREAMING(3), // insert stmt (streaming type), update stmt use 
this type
         ROUTINE_LOAD_TASK(4), // routine load task use this type
         BATCH_LOAD_JOB(5); // load job v2 for broker load
         
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex 
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 2fede3b..a8c1d7e 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -382,6 +382,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("user", new Integer(SqlParserSymbols.KW_USER));
         keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING));
         keywordMap.put("uninstall", new 
Integer(SqlParserSymbols.KW_UNINSTALL));
+        keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE));
         keywordMap.put("value", new Integer(SqlParserSymbols.KW_VALUE));
         keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES));
         keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
index cbd4b67..095b882 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java
@@ -32,11 +32,6 @@ import org.apache.doris.utframe.UtFrameUtils;
 
 import com.google.common.collect.Lists;
 
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
@@ -44,6 +39,10 @@ import java.util.UUID;
 
 import mockit.Expectations;
 import mockit.Injectable;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class InsertStmtTest {
     private static String runningDir = "fe/mocked/DemoTest/" + 
UUID.randomUUID().toString() + "/";
@@ -259,7 +258,9 @@ public class InsertStmtTest {
         List<Expr> slots = Lists.newArrayList();
         expr4.collect(SlotRef.class, slots);
         Assert.assertEquals(1, slots.size());
-        Assert.assertEquals(queryStmtSubstitue.getResultExprs().get(0), 
slots.get(0));
+        Assert.assertTrue(queryStmtSubstitue.getResultExprs().get(0) 
instanceof CastExpr);
+        CastExpr resultExpr0 = (CastExpr) 
queryStmtSubstitue.getResultExprs().get(0);
+        Assert.assertEquals(resultExpr0.getChild(0), slots.get(0));
 
         Assert.assertTrue(queryStmtSubstitue.getResultExprs().get(5) 
instanceof FunctionCallExpr);
         FunctionCallExpr expr5 = (FunctionCallExpr) 
queryStmtSubstitue.getResultExprs().get(5);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java
new file mode 100644
index 0000000..0688021
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.common.UserException;
+
+import java.util.List;
+
+import com.clearspring.analytics.util.Lists;
+import mockit.Expectations;
+import mockit.Injectable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UpdateStmtTest {
+
+    @Test
+    public void testAnalyze(@Injectable Analyzer analyzer) {
+        TableName tableName = new TableName("db", "table");
+        IntLiteral intLiteral = new IntLiteral(1);
+        SlotRef slotRef = new SlotRef(tableName, "c1");
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ, intLiteral, slotRef);
+        List<Expr> setExprs = Lists.newArrayList();
+        setExprs.add(binaryPredicate);
+
+        new Expectations() {
+            {
+                analyzer.getClusterName();
+                result = "default";
+            }
+        };
+        UpdateStmt updateStmt = new UpdateStmt(tableName, 
Lists.newArrayList(setExprs), null);
+        try {
+            updateStmt.analyze(analyzer);
+            Assert.fail();
+        } catch (UserException e) {
+            System.out.println(e.getMessage());
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
index 7092630..5298042 100755
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java
@@ -81,7 +81,5 @@ public class ThreadPoolManagerTest {
         Assert.assertEquals(0, testFixedThreaddPool.getActiveCount());
         Assert.assertEquals(0, testFixedThreaddPool.getQueue().size());
         Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount());
-
-
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java
new file mode 100644
index 0000000..d75f7c1
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.update;
+
+import org.apache.doris.analysis.UpdateStmt;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import com.clearspring.analytics.util.Lists;
+import mockit.Expectations;
+import mockit.Injectable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UpdateManagerTest {
+
+    @Test
+    public void testDisableConcurrentUpdate(@Injectable UpdateStmt updateStmt,
+                                            @Injectable UpdateStmtExecutor 
updateStmtExecutor) {
+        Config.enable_concurrent_update = false;
+        Map<Long, List<UpdateStmtExecutor>> tableIdToCurrentUpdate = 
Maps.newConcurrentMap();
+        List<UpdateStmtExecutor> currentUpdate = Lists.newArrayList();
+        currentUpdate.add(updateStmtExecutor);
+        tableIdToCurrentUpdate.put(new Long(1), currentUpdate);
+        UpdateManager updateManager = new UpdateManager();
+        Assert.assertFalse(Deencapsulation.getField(updateManager, 
"enableConcurrentUpdate"));
+        Deencapsulation.setField(updateManager, "tableIdToCurrentUpdate", 
tableIdToCurrentUpdate);
+        new Expectations() {
+            {
+                updateStmt.getTargetTable().getId();
+                result = 1;
+            }
+        };
+
+        try {
+            Deencapsulation.invoke(updateManager, "addUpdateExecutor", 
updateStmt);
+            Assert.fail();
+        } catch (Exception e) {
+            if (e instanceof DdlException) {
+                System.out.println(e.getMessage());
+            } else {
+                throw e;
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java
new file mode 100644
index 0000000..c70ddb5
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.update;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.UpdateStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+
+import java.util.List;
+
+import com.clearspring.analytics.util.Lists;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class UpdateStmtExecutorTest {
+
+    @Test
+    public void testCommitAndPublishTxn(@Injectable Analyzer analyzer,
+                                        @Mocked GlobalTransactionMgr 
globalTransactionMgr) {
+        UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor();
+        Deencapsulation.setField(updateStmtExecutor, "effectRows", 0);
+        Deencapsulation.setField(updateStmtExecutor, "analyzer", analyzer);
+        Deencapsulation.invoke(updateStmtExecutor, "commitAndPublishTxn");
+    }
+
+    @Test
+    public void testFromUpdateStmt(@Injectable OlapTable olapTable,
+                                   @Mocked Catalog catalog,
+                                   @Injectable Database db,
+                                   @Injectable Analyzer analyzer) throws 
AnalysisException {
+        TableName tableName = new TableName("db", "test");
+        List<Expr> setExprs = Lists.newArrayList();
+        SlotRef slotRef = new SlotRef(tableName, "v1");
+        IntLiteral intLiteral = new IntLiteral(1);
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ,
+                slotRef, intLiteral);
+        setExprs.add(binaryPredicate);
+        SlotRef keySlotRef = new SlotRef(tableName, "k1");
+        Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, 
keySlotRef, intLiteral);
+        UpdateStmt updateStmt = new UpdateStmt(tableName, setExprs, whereExpr);
+        Deencapsulation.setField(updateStmt, "targetTable", olapTable);
+        Deencapsulation.setField(updateStmt, "analyzer", analyzer);
+        new Expectations() {
+            {
+                catalog.getDb("db");
+                result = db;
+                db.getId();
+                result = 1;
+                analyzer.getContext().queryId();
+                result = new TUniqueId(1, 2);
+                analyzer.getContext().getSessionVariable().getQueryTimeoutS();
+                result = 1000;
+                olapTable.getId();
+                result = 2;
+            }
+        };
+        UpdateStmtExecutor executor = 
UpdateStmtExecutor.fromUpdateStmt(updateStmt);
+        Assert.assertEquals(new Long(2), new 
Long(executor.getTargetTableId()));
+        Assert.assertEquals(whereExpr, Deencapsulation.getField(executor, 
"whereExpr"));
+        Assert.assertEquals(setExprs, Deencapsulation.getField(executor, 
"setExprs"));
+        Assert.assertEquals(new Long(1), Deencapsulation.getField(executor, 
"dbId"));
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index d6535a3..418ce90 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -41,21 +41,21 @@ import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.thrift.TRuntimeFilterMode;
 import org.apache.doris.utframe.UtFrameUtils;
 
 import com.google.common.collect.Lists;
 
 import org.apache.commons.lang3.StringUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.File;
 import java.util.List;
 import java.util.UUID;
 
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 public class QueryPlanTest {
     // use a unique dir so that it won't be conflict with other unit test which
     // may also start a Mocked Frontend
@@ -446,7 +446,7 @@ public class QueryPlanTest {
 
         queryStr = "explain insert into test.bitmap_table select id, id from 
test.bitmap_table_2;";
         String errorMsg = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
-        Assert.assertTrue(errorMsg.contains("bitmap column id2 require the 
function return type is BITMAP"));
+        Assert.assertTrue(errorMsg.contains("bitmap column require the 
function return type is BITMAP"));
     }
 
     private static void testBitmapQueryPlan(String sql, String result) throws 
Exception {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java
new file mode 100644
index 0000000..d1e0e3c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.IntLiteral;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotId;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.update.UpdatePlanner;
+
+import java.util.List;
+
+import com.clearspring.analytics.util.Lists;
+import mockit.Expectations;
+import mockit.Injectable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.doris.alter.SchemaChangeHandler.SHADOW_NAME_PRFIX;
+
+public class UpdatePlannerTest {
+
+    private final IdGenerator<TupleId> tupleIdGenerator_ = 
TupleId.createGenerator();
+    private final IdGenerator<SlotId> slotIdGenerator_ = 
SlotId.createGenerator();
+
+    /**
+     * Full columns: k1, k2 v1, shadow_column
+     * Shadow column: SHADOW_NAME_PRFIX + v1
+     * Set expr: v1=1
+     * Expect output exprs: k1, k2, 1, 1
+     */
+    @Test
+    public void testComputeOutputExprsWithShadowColumnAndSetExpr(@Injectable 
OlapTable targetTable,
+                                                                 @Injectable 
Column k1,
+                                                                 @Injectable 
Column k2,
+                                                                 @Injectable 
Column v1,
+                                                                 @Injectable 
Column shadow_v1,
+                                                                 @Injectable 
Analyzer analyzer) {
+        List<Expr> setExprs = Lists.newArrayList();
+        TableName tableName = new TableName(null, "test");
+        SlotRef slotRef = new SlotRef(tableName, "V1");
+        IntLiteral intLiteral = new IntLiteral(1);
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ,
+                slotRef, intLiteral);
+        setExprs.add(binaryPredicate);
+        TupleDescriptor srcTupleDesc = new 
TupleDescriptor(tupleIdGenerator_.getNextId());
+        SlotDescriptor k1SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        k1SlotDesc.setColumn(k1);
+        srcTupleDesc.addSlot(k1SlotDesc);
+        SlotDescriptor k2SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        k2SlotDesc.setColumn(k2);
+        srcTupleDesc.addSlot(k2SlotDesc);
+        SlotDescriptor v1SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        v1SlotDesc.setColumn(v1);
+        srcTupleDesc.addSlot(v1SlotDesc);
+        List<Column> fullSchema = Lists.newArrayList();
+        fullSchema.add(k1);
+        fullSchema.add(k2);
+        fullSchema.add(v1);
+        fullSchema.add(shadow_v1);
+
+        new Expectations(){
+            {
+                targetTable.getFullSchema();
+                result = fullSchema;
+                k1.getName();
+                result = "k1";
+                k2.getName();
+                result = "k2";
+                v1.getName();
+                result = "v1";
+                shadow_v1.getName();
+                result = SHADOW_NAME_PRFIX + "v1";
+            }
+        };
+
+        UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, 
setExprs, srcTupleDesc, analyzer);
+        List<Expr> outputExpr = Deencapsulation.invoke(updatePlanner, 
"computeOutputExprs");
+        Assert.assertEquals(4, outputExpr.size());
+        Expr outputExpr1 = outputExpr.get(0);
+        Assert.assertTrue(outputExpr1 instanceof SlotRef);
+        Assert.assertEquals(((SlotRef) 
outputExpr1).getDesc().getColumn().getName(), "k1");
+        Expr outputExpr2 = outputExpr.get(1);
+        Assert.assertTrue(outputExpr2 instanceof SlotRef);
+        Assert.assertEquals(((SlotRef) 
outputExpr2).getDesc().getColumn().getName(), "k2");
+        Expr outputExpr3 = outputExpr.get(2);
+        Assert.assertTrue(outputExpr3 instanceof IntLiteral);
+        Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1);
+        Expr outputExpr4 = outputExpr.get(3);
+        Assert.assertTrue(outputExpr4 instanceof IntLiteral);
+        Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1);
+    }
+
+    @Test
+    public void testNewColumnBySchemaChange(@Injectable OlapTable targetTable,
+                                            @Injectable Column k1,
+                                            @Injectable Column k2,
+                                            @Injectable Column v1,
+                                            @Injectable Column new_v2,
+                                            @Injectable Analyzer analyzer) 
throws AnalysisException {
+        List<Expr> setExprs = Lists.newArrayList();
+        TableName tableName = new TableName(null, "test");
+        SlotRef slotRef = new SlotRef(tableName, "V1");
+        IntLiteral intLiteral = new IntLiteral(1);
+        BinaryPredicate binaryPredicate = new 
BinaryPredicate(BinaryPredicate.Operator.EQ,
+                slotRef, intLiteral);
+        setExprs.add(binaryPredicate);
+        TupleDescriptor srcTupleDesc = new 
TupleDescriptor(tupleIdGenerator_.getNextId());
+        SlotDescriptor k1SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        k1SlotDesc.setColumn(k1);
+        srcTupleDesc.addSlot(k1SlotDesc);
+        SlotDescriptor k2SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        k2SlotDesc.setColumn(k2);
+        srcTupleDesc.addSlot(k2SlotDesc);
+        SlotDescriptor v1SlotDesc = new 
SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc);
+        v1SlotDesc.setColumn(v1);
+        srcTupleDesc.addSlot(v1SlotDesc);
+        List<Column> fullSchema = Lists.newArrayList();
+        fullSchema.add(k1);
+        fullSchema.add(k2);
+        fullSchema.add(v1);
+        fullSchema.add(new_v2);
+
+        new Expectations(){
+            {
+                targetTable.getFullSchema();
+                result = fullSchema;
+                k1.getName();
+                result = "k1";
+                k2.getName();
+                result = "k2";
+                v1.getName();
+                result = "v1";
+                new_v2.getName();
+                result = "v2";
+                new_v2.getDefaultValue();
+                result = "1";
+                new_v2.getDefaultValueExpr();
+                result = new IntLiteral(1);
+            }
+        };
+
+        UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, 
setExprs, srcTupleDesc, analyzer);
+        List<Expr> outputExpr = Deencapsulation.invoke(updatePlanner, 
"computeOutputExprs");
+        Assert.assertEquals(4, outputExpr.size());
+        Expr outputExpr1 = outputExpr.get(0);
+        Assert.assertTrue(outputExpr1 instanceof SlotRef);
+        Assert.assertEquals(((SlotRef) 
outputExpr1).getDesc().getColumn().getName(), "k1");
+        Expr outputExpr2 = outputExpr.get(1);
+        Assert.assertTrue(outputExpr2 instanceof SlotRef);
+        Assert.assertEquals(((SlotRef) 
outputExpr2).getDesc().getColumn().getName(), "k2");
+        Expr outputExpr3 = outputExpr.get(2);
+        Assert.assertTrue(outputExpr3 instanceof IntLiteral);
+        Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1);
+        Expr outputExpr4 = outputExpr.get(3);
+        Assert.assertTrue(outputExpr4 instanceof IntLiteral);
+        Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to