This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6ac60b  [SegmentV2] Optimize the upgrade logic of SegmentV2 (#3340)
c6ac60b is described below

commit c6ac60bab9563a77b052a2766b0de859528aee41
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Apr 21 10:45:29 2020 +0800

    [SegmentV2] Optimize the upgrade logic of SegmentV2 (#3340)
    
    This CL mainly made the following modifications:
    
    1. Reorganized SegmentV2 upgrade document.
    2. When the variable `use_v2_rollup` is set to true, the base rollup in v2 
format is forcibly queried for verifying the data.
    3. Fix a problem that there is no persistent storage format information in 
the schema change operation that performs v2 conversion.
    4. Allow users to directly create v2 format tables.
---
 .../cn/administrator-guide/segment-v2-usage.md     | 215 ++++++++++++---------
 .../apache/doris/alter/SchemaChangeHandler.java    |   5 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  19 +-
 .../apache/doris/analysis/InstallPluginStmt.java   |   3 +-
 .../apache/doris/analysis/UninstallPluginStmt.java |   3 +-
 .../java/org/apache/doris/catalog/Catalog.java     |  60 +++---
 .../java/org/apache/doris/catalog/OlapTable.java   |  15 ++
 .../org/apache/doris/catalog/TableProperty.java    |  24 ++-
 .../java/org/apache/doris/common/ErrorCode.java    |   4 +-
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../apache/doris/common/util/PropertyAnalyzer.java |  17 +-
 .../doris/planner/MaterializedViewSelector.java    |  45 ++---
 .../org/apache/doris/planner/RollupSelector.java   |  30 +--
 .../apache/doris/planner/SingleNodePlanner.java    |  17 +-
 .../java/org/apache/doris/plugin/AuditEvent.java   |   2 +-
 .../main/java/org/apache/doris/qe/QueryState.java  |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   1 +
 .../org/apache/doris/system/HeartbeatFlags.java    |   3 +-
 .../org/apache/doris/alter/AlterJobV2Test.java     |  89 ++++++++-
 .../doris/plugin/audit/AuditLoaderPlugin.java      |   4 +-
 .../doris/plugin/audit/DorisStreamLoader.java      |   3 +-
 tools/show_segment_status/README.md                |   4 +-
 23 files changed, 384 insertions(+), 198 deletions(-)

diff --git a/docs/documentation/cn/administrator-guide/segment-v2-usage.md 
b/docs/documentation/cn/administrator-guide/segment-v2-usage.md
index faf6b59..1c821a1 100644
--- a/docs/documentation/cn/administrator-guide/segment-v2-usage.md
+++ b/docs/documentation/cn/administrator-guide/segment-v2-usage.md
@@ -1,118 +1,147 @@
-# Doris Segment V2上线和试用手册
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Segment V2 升级手册
 
 ## 背景
 
-Doris 0.12版本中实现了segment v2(新的存储格式),引入词典压缩、bitmap索引、page 
cache等优化,能够提升系统性能。目前0.12版本已经发布alpha版本,正在内部上线过程中,上线的方案和试用方法记录如下
+Doris 0.12 版本中实现了新的存储格式:Segment V2,引入词典压缩、bitmap索引、page cache等优化,能够提升系统性能。
+
+0.12 版本会同时支持读写原有的 Segment V1(以下简称V1) 和新的 Segment V2(以下简称V2) 两种格式。如果原有数据想使用 V2 
相关特性,需通过命令将 V1 转换成 V2 格式。
+
+本文档主要介绍从 0.11 版本升级至 0.12 版本后,如何转换和使用 V2 格式。
 
-## 上线
+V2 格式的表可以支持以下新的特性:
 
-为了保证上线的稳定性,上线分为三个阶段:
-第一个阶段是上线0.12版本,但是不全量开启segment v2的功能,只在验证的时候,创建segment v2的表(或者索引)
-第二个阶段是全量开启segment v2的功能,替换现有的segment存储格式,这样对新表会创建segment 
v2的格式的存储文件,但是对于旧表,需要依赖于compaction和schema change等过程,实现格式的转化
-第三个阶段就是转化旧的segment格式到新的segment v2的格式,这个需要在验证segment 
v2的正确性和性能没有问题之后,可以按照用户意愿逐步完成。
+1. bitmap 索引
+2. 内存表
+3. page cache
+4. 字典压缩
+5. 延迟物化(Lazy Materialization)
 
-### 上线验证
+## 集群升级
 
-- 正确性
+0.12 版本仅支持从 0.11 版本升级,不支持从 0.11 之前的版本升级。请先确保升级的前的 Doris 集群版本为 0.11。
 
-正确性是segment v2上线最需要保证的指标。 在第一阶段,为了保证线上环境的稳定性,并且验证segment v2的正确性,采用如下的方案:
-1. 选择几个需要验证的表,使用以下语句,创建segment v2格式的rollup表,该rollup表与base表的schema相同
+0.12 版本有两个 V2 相关的重要参数:
 
-       alter table table_name add rollup table_name (columns) properties 
("storage_format" = "v2");
+* `default_rowset_type`:FE 一个全局变量(Global Variable)设置,默认为 "alpha",即 V1 版本。
+* `default_rowset_type`:BE 的一个配置项,默认为 "ALPHA",即 V1 版本。
 
-       其中,
-       rollup后面的index名字直接指定为base table的table name,该语句会自动生成一个__v2_table_name。
+保持上述配置默认的话,按常规步骤对集群升级后,原有集群数据的存储格式不会变更,即依然为 V1 格式。如果对 V2 
格式没有需求,则继续正常使用集群即可,无需做任何额外操作。所有原有数据、以及新导入的数据,都依然是 V1 版本。
 
-       columns可以随便指定一个列名即可,这里一方面是为了兼容现有的语法,一方面是为了方便。
+## V2 格式转换
 
-2. 上面的创建出来的rollup index名字格式类似:__v2_table_name
+### 已有表数据转换成 V2
 
-       通过命令 :
+对于已有表数据的格式转换,Doris 提供两种方式:
 
-       `desc table_name all;`
+1. 创建一个 V2 格式的特殊 Rollup
 
-       查看table中是否存在名字:__v2_table_name的rollup。由于创建segment 
v2的rollup表是一个异步操作,所以并不会立即成功。如果上面命令中并没有显示新创建的 rollup,可能是还在创建过程中。
+    该方式会针对指定表,创建一个 V2 格式的特殊 Rollup。创建完成后,新的 V2 格式的 Rollup 会和原有表格式数据并存。用户可以指定对 
V2 格式的 Rollup 进行查询验证。
+    
+    该方式主要用于对 V2 格式的验证,因为不会修改原有表数据,因此可以安全的进行 V2 
格式的数据验证,而不用担心表数据因格式转换而损坏。通常先使用这个方式对数据进行校验,之后再使用方法2对整个表进行数据格式转换。
+    
+    操作步骤如下:
+    
+    ```
+    ## 创建 V2 格式的 Rollup
+    
+    ALTER TABLE table_name ADD ROLLUP table_name (columns) PROPERTIES 
("storage_format" = "v2");
+    ```
 
-    可以通过下面命令查看正在进行的 rollup 表。
+    其中, Rollup 的名称必须为表名。columns 字段可以任意填写,系统不会检查该字段的合法性。该语句会自动生成一个名为 
`__V2_table_name` 的 Rollup,并且该 Rollup 列包含表的全部列。
+    
+    通过以下语句查看创建进度:
+    
+    ```
+    SHOW ALTER TABLE ROLLUP;
+    ```
+    
+    创建完成后,可以通过 `DESC table_name ALL;` 查看到名为 `__v2_table_name` 的 Rollup。
+    
+    之后,通过如下命令,切换到 V2 格式查询:
 
-       `show alter table rollup;`
+    ```
+    set use_v2_rollup = true;
+    select * from table_name limit 10;
+    ```
+    
+    `use_V2_Rollup` 这个变量会强制查询名为 `__V2_table_name` 的 Rollup,并且不会考虑其他 Rollup 
的命中条件。所以该参数仅用于对 V2 格式数据进行验证。
 
-       看到会有一个名字为__v2_table_name的rollup表正在创建。
+2. 转换现有表数据格式
 
-3. 通过查询base表和segment v2格式的rollup表,验证查询结果是否一致(查询可以来自audit日志)
+    该方式相当于给指定的表发送一个 schema change 作业,作业完成后,表的所有数据会被转换成 V2 格式。该方法不会保留原有 v1 
格式,所以请先使用方法1进行格式验证。
+    
+    ```
+    ALTER TABLE table_name SET ("storage_format" = "v2");
+    ```
 
-       为了让查询使用segment v2的rollup表,增加了一个session变量,通过在查询中使用如下的语句,来查询segment 
v2格式的index:
+    之后通过如下命令查看作业进度:
+    
+    ```
+    SHOW ALTER TABLE COLUMN;
+    ```
+    
+    作业完成后,该表的所有数据(包括Rollup)都转换为了 V2。且 V1 版本的数据已被删除。如果该表是分区表,则之后创建的分区也都是 V2 格式。
+    
+    **V2 格式的表不能重新转换为 V1**
+    
+### 创建新的 V2 格式的表
 
-       set use_v2_rollup = true;
+在不改变默认配置参数的情况下,用户可以创建 V2 格式的表:
 
-       比如说,要当前db中有一个simple的表,直接查询如下:
-
-       ```
-               mysql> select * from simple;         
-               +------+-------+
-               | key  | value |
-               +------+-------+
-               |    2 |     6 |
-               |    1 |     6 |
-               |    4 |     5 |
-               +------+-------+
-               3 rows in set (0.01 sec)
-       ```
-
-       然后使用使用如下的query查询__v2_simpel的rollup表:
-
-       ```
-               mysql> set use_v2_rollup = true;
-               Query OK, 0 rows affected (0.04 sec)
-
-               mysql> select * from simple;
-               +------+-------+        
-               | key  | value |
-               +------+-------+
-               |    4 |     5 |
-               |    1 |     6 |
-               |    2 |     6 |
-               +------+-------+
-               3 rows in set (0.01 sec)
-       ```
-
-       期望的结果是两次查询的结果相同。
-       
如果结果不一致,需要进行排查。第一步需要定位是否是因为由于两次查询之间有新的导入,导致数据不一致。如果是的话,需要进行重试。如果不是,则需要进一步定位原因。
-
-4. 对比同样查询在base表和segment v2的rollup表中的结果是否一致
-
-- 查询延时
-
-       在segment v2中优化了v1中的随机读取,并且增加了bitmap索引、lazy 
materialization等优化,预计查询延时会有下降,
-
-- 导入延时
-- page cache命中率
-- string类型字段压缩率
-- bitmap索引的性能提升率和空间占用
-
-### 全量开启segment v2
-
-有两个方式:
-1. fe中有一个全局变量,通过设置全局变量default_rowset_type,来设置BE中使用segment v2作为默认的存储格式,命令如下:
-
-       set global default_rowset_type = beta;
-
-       
使用这个方式,只需要执行上述命令,之后等待10s左右,FE会将配置同步到BE中。不需要在每个BE中进行配置。推荐使用这个配置。不过该方式目前没有简单的方式来验证BE上的default
 rowset type是否已经变更,一种办法是建一个表,然后查看对应的表的元数据。但是这样子比较麻烦,后续看需要可以在某个地方实现状态查看。
-
-2. 修改BE中的配置文件中的配置default_rowset_type为BETA。这种方式需要在每个BE中添加对应的配置。
-
-
-### 转化segment v1格式为v2
-
-如果不想等待系统后台自动转换存储格式(比如想要使用bitmap索引、词典压缩),可以手动指定触发某个表的存储格式转换。具体是通过schema 
change来实现将v1的格式转化为v2的格式:
-
-       alter table table_name set ("storage_format" = "v2");
-
-       将存储格式转化为v2.
-
-如果在没有设置默认的存储格式为v2的时候,想要新建一个v2的表,需要按照以下的步骤进行操作:
-1. 使用create table来创建v1的表
-2. 使用上面的schema change命令进行格式的转化
+```
+CREATE TABLE tbl_name
+(
+    k1 INT,
+    k2 INT
+)
+DISTRIBUTED BY HASH(k1) BUCKETS 1
+PROPERTIES
+(
+    "storage_format" = "v2"
+);
+```
 
+在 `properties` 中指定 `"storage_format" = "v2"` 后,该表将使用 V2 
格式创建。如果是分区表,则之后创建的分区也都是 V2 格式。
 
+### 全量格式转换(试验功能)
 
+通过以下方式可以开启整个集群的全量数据格式转换(V1 -> V2)。全量数据转换是通过 BE 后台的数据 compaction 过程异步进行的。
+
+1. 从 BE 开启全量格式转换
+
+    在 `be.conf` 中添加变量 `default_rowset_type=BETA` 并重启 BE 节点。在之后的 compaction 
流程中,数据会自动从 V1 转换成 V2。
+    
+2. 从 FE 开启全量格式转换
+
+    通过 mysql 客户端连接 Doris 后,执行如下语句:
+    
+    `SET GLOBAL default_rowset_type = beta;`
+
+    执行完成后,FE 会通过心跳将信息发送给 BE,之后 BE 的 compaction 流程中,数据会自动从 V1 转换成 V2。
+    
+    FE 的配置参数优先级高于 BE 的配置。即使 BE 中的配置 `default_rowset_type` 为 ALPHA,如果 FE 配置为 
beta 后,则 BE 依然开始进行 V1 到 V2 的数据格式转换。
+
+    **建议先通过对单独表的数据格式转换验证后,再进行全量转换。全量转换的时间比较长,且进度依赖于 compaction 的进度。**可能出现 
compaction 无法完成的情况,因此需要通过显式的执行 `ALTER TABLE` 操作进行个别表的数据格式转换。
+
+3. 查看全量转换进度
+
+    全量转换进度须通过脚本查看。脚本位置为代码库的 `tools/show_segment_status/` 目录。请参阅其中的 `README` 
文档查看使用帮助。
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 2fc03a4..2783666 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -871,6 +871,7 @@ public class SchemaChangeHandler extends AlterHandler {
         
         // property 3: timeout
         long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, 
Config.alter_table_timeout_second);
+
         TStorageFormat storageFormat = 
PropertyAnalyzer.analyzeStorageFormat(propertyMap);
 
         // create job
@@ -944,7 +945,9 @@ public class SchemaChangeHandler extends AlterHandler {
             } else if (hasIndexChange) {
                 needAlter = true;
             } else if (storageFormat == TStorageFormat.V2) {
-                needAlter = true;
+                if (olapTable.getTableProperty().getStorageFormat() != 
TStorageFormat.V2) {
+                    needAlter = true;
+                }
             }
 
             if (!needAlter) {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 9c8c688..d1ca93b 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -110,7 +110,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     // The schema change job will wait all transactions before this txn id 
finished, then send the schema change tasks.
     protected long watershedTxnId = -1;
 
-    private TStorageFormat storageFormat = null;
+    private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
     // save all schema change tasks
     private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
@@ -571,6 +571,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
             tbl.setIndexes(indexes);
         }
 
+        // set storage format of table, only set if format is v2
+        if (storageFormat == TStorageFormat.V2) {
+            tbl.setStorageFormat(storageFormat);
+        }
+
         tbl.setState(OlapTableState.NORMAL);
     }
 
@@ -890,6 +895,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 out.writeBoolean(false);
             }
         }
+
+        Text.writeString(out, storageFormat.name());
     }
 
     /**
@@ -968,6 +975,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 }
             }
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_84) {
+            storageFormat = TStorageFormat.valueOf(Text.readString(in));
+        }
     }
 
     /**
@@ -1013,6 +1024,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 out.writeBoolean(false);
             }
         }
+
+        Text.writeString(out, storageFormat.name());
     }
 
     /**
@@ -1060,6 +1073,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                 this.indexes = null;
             }
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() >= 
FeMetaVersion.VERSION_84) {
+            storageFormat = TStorageFormat.valueOf(Text.readString(in));
+        }
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
index 801eb21..73b1331 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
@@ -43,7 +43,8 @@ public class InstallPluginStmt extends DdlStmt {
         super.analyze(analyzer);
 
         if (!Config.plugin_enable) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_INVALID_OPERATION, "INSTALL 
PLUGIN");
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_OPERATION_DISABLED, "INSTALL 
PLUGIN",
+                    "Please enable it by setting 'plugin_enable' = 'true'");
         }
 
         // check operation privilege
diff --git 
a/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java 
b/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
index 08d6fe3..6ba4301 100644
--- a/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
@@ -43,7 +43,8 @@ public class UninstallPluginStmt extends DdlStmt {
         super.analyze(analyzer);
 
         if (!Config.plugin_enable) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_INVALID_OPERATION, "UNINSTALL 
PLUGIN");
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_OPERATION_DISABLED, 
"UNINSTALL PLUGIN",
+                    "Please enable it by setting 'plugin_enable' = 'true'");
         }
 
         // check operation privilege
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java 
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 858399c..bc6e827 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -199,6 +199,7 @@ import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.task.PullLoadJobMgr;
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTaskType;
@@ -3070,7 +3071,8 @@ public class Catalog {
                     singlePartitionDesc.getVersionInfo(),
                     bfColumns, olapTable.getBfFpp(),
                     tabletIdSet, olapTable.getCopiedIndexes(),
-                    singlePartitionDesc.isInMemory());
+                    singlePartitionDesc.isInMemory(),
+                    olapTable.getTableProperty().getStorageFormat());
 
             // check again
             db.writeLock();
@@ -3381,7 +3383,8 @@ public class Catalog {
                                                  double bfFpp,
                                                  Set<Long> tabletIdSet,
                                                  List<Index> indexes,
-                                                 boolean isInMemory) throws 
DdlException {
+                                                 boolean isInMemory,
+                                                 TStorageFormat storageFormat) 
throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
         MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, 
IndexState.NORMAL);
@@ -3446,6 +3449,7 @@ public class Catalog {
                             countDownLatch,
                             indexes,
                             isInMemory);
+                    task.setStorageFormat(storageFormat);
                     batchTask.addTask(task);
                     // add to AgentTaskQueue for handling finish report.
                     // not for resending task
@@ -3684,6 +3688,15 @@ public class Catalog {
         }
         Preconditions.checkNotNull(versionInfo);
 
+        // get storage format
+        TStorageFormat storageFormat = TStorageFormat.DEFAULT; // default 
means it's up to BE's config
+        try {
+            storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setStorageFormat(storageFormat);
+
         // a set to record every new tablet created when create table
         // if failed in any step, use this set to do clear things
         Set<Long> tabletIdSet = new HashSet<Long>();
@@ -3706,7 +3719,7 @@ public class Catalog {
                         partitionInfo.getReplicationNum(partitionId),
                         versionInfo, bfColumns, bfFpp,
                         tabletIdSet, olapTable.getCopiedIndexes(),
-                        isInMemory);
+                        isInMemory, storageFormat);
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE) {
                 try {
@@ -3735,7 +3748,7 @@ public class Catalog {
                             partitionInfo.getReplicationNum(entry.getValue()),
                             versionInfo, bfColumns, bfFpp,
                             tabletIdSet, olapTable.getCopiedIndexes(),
-                            isInMemory);
+                            isInMemory, storageFormat);
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -3914,22 +3927,21 @@ public class Catalog {
             // properties
             sb.append("\nPROPERTIES (\n");
 
-            // 1. storage type
-            
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).append("\" = 
\"");
-            TStorageType storageType = olapTable.getStorageTypeByIndexId(
-                    olapTable.getIndexIdByName(olapTable.getName()));
-            sb.append(storageType.name()).append("\"");
+            // replicationNum
+            Short replicationNum = olapTable.getDefaultReplicationNum();
+            
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM).append("\" 
= \"");
+            sb.append(replicationNum).append("\"");
 
-            // 2. bloom filter
+            // bloom filter
             Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
             if (bfColumnNames != null) {
-                sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_BF_COLUMNS).append("\" = \"");
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_BF_COLUMNS).append("\" = 
\"");
                 sb.append(Joiner.on(", 
").join(olapTable.getCopiedBfColumns())).append("\"");
             }
 
             if (separatePartition) {
-                // 3. version info
-                sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_VERSION_INFO).append("\" = \"");
+                // version info
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VERSION_INFO).append("\" 
= \"");
                 Partition partition = null;
                 if (olapTable.getPartitionInfo().getType() == 
PartitionType.UNPARTITIONED) {
                     partition = olapTable.getPartition(olapTable.getName());
@@ -3941,27 +3953,26 @@ public class Catalog {
                         .append("\"");
             }
 
-            // 5. colocateTable
+            // colocateTable
             String colocateTable = olapTable.getColocateGroup();
             if (colocateTable != null) {
-                sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\" = \"");
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\" 
= \"");
                 sb.append(colocateTable).append("\"");
             }
 
-            // 6. dynamic partition
+            // dynamic partition
             if (olapTable.dynamicPartitionExists()) {
                 
sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().toString());
             }
 
-            // 7. replicationNum
-            Short replicationNum = olapTable.getDefaultReplicationNum();
-            sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM).append("\" = \"");
-            sb.append(replicationNum).append("\"");
-
-            // 8. in memory
-            sb.append(",\n 
\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = \"");
+            // in memory
+            
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = 
\"");
             sb.append(olapTable.isInMemory()).append("\"");
 
+            // storage type
+            
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\"
 = \"");
+            
sb.append(olapTable.getTableProperty().getStorageFormat()).append("\"");
+
             sb.append("\n)");
         } else if (table.getType() == TableType.MYSQL) {
             MysqlTable mysqlTable = (MysqlTable) table;
@@ -6158,7 +6169,8 @@ public class Catalog {
                         copiedTbl.getBfFpp(),
                         tabletIdSet,
                         copiedTbl.getCopiedIndexes(),
-                        copiedTbl.isInMemory());
+                        copiedTbl.isInMemory(),
+                        copiedTbl.getTableProperty().getStorageFormat());
                 newPartitions.add(newPartition);
             }
         } catch (DdlException e) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 725c640..719e29e 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.catalog;
 
+import org.apache.doris.alter.MaterializedViewHandler;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.backup.Status;
 import org.apache.doris.backup.Status.ErrCode;
@@ -39,6 +40,7 @@ import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TOlapTable;
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
 import org.apache.doris.thrift.TTableDescriptor;
@@ -309,6 +311,11 @@ public class OlapTable extends Table {
         return indexNameToId.get(indexName);
     }
 
+    public Long getSegmentV2FormatIndexId() {
+        String v2RollupIndexName = 
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + getName();
+        return indexNameToId.get(v2RollupIndexName);
+    }
+
     public String getIndexNameById(long indexId) {
         for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
             if (entry.getValue() == indexId) {
@@ -1455,4 +1462,12 @@ public class OlapTable extends Table {
     public boolean existTempPartitions() {
         return !tempPartitions.isEmpty();
     }
+
+    public void setStorageFormat(TStorageFormat storageFormat) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, 
storageFormat.name());
+        tableProperty.buildStorageFormat();
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
index 64d5564..51614cc 100644
--- a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.persist.OperationType;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
@@ -50,6 +51,16 @@ public class TableProperty implements Writable {
 
     private boolean isInMemory = false;
 
+    /*
+     * the default storage format of this table.
+     * DEFAULT: depends on BE's config 'default_rowset_type'
+     * V1: alpha rowset
+     * V2: beta rowset
+     * 
+     * This property should be set when creating the table, and can only be 
changed to V2 using Alter Table stmt.
+     */
+    private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
+
     public TableProperty(Map<String, String> properties) {
         this.properties = properties;
     }
@@ -101,6 +112,12 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildStorageFormat() {
+        storageFormat = 
TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
+                TStorageFormat.DEFAULT.name()));
+        return this;
+    }
+
     public void modifyTableProperties(Map<String, String> modifyProperties) {
         properties.putAll(modifyProperties);
     }
@@ -125,6 +142,10 @@ public class TableProperty implements Writable {
         return isInMemory;
     }
 
+    public TStorageFormat getStorageFormat() {
+        return storageFormat;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -134,6 +155,7 @@ public class TableProperty implements Writable {
         return GsonUtils.GSON.fromJson(Text.readString(in), 
TableProperty.class)
                 .buildDynamicProperty()
                 .buildReplicationNum()
-                .buildInMemory();
+                .buildInMemory()
+                .buildStorageFormat();
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
index 9a6ecc8..7b416a3 100644
--- a/fe/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -230,7 +230,9 @@ public enum ErrorCode {
     ERROR_DYNAMIC_PARTITION_ENABLE(5068, new byte[] {'4', '2', '0', '0', '0'},
             "Invalid dynamic partition enable: %s. Expected true or false"),
     ERROR_DYNAMIC_PARTITION_PREFIX(5069, new byte[] {'4', '2', '0', '0', '0'},
-            "Invalid dynamic partition prefix: %s.");
+            "Invalid dynamic partition prefix: %s."),
+    ERR_OPERATION_DISABLED(5070, new byte[] {'4', '2', '0', '0', '0'},
+            "Operation %s is disabled. %s");
 
     ErrorCode(int code, byte[] sqlState, String errorMsg) {
         this.code = code;
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java 
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index fd3bd1d..bebaccf 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -177,6 +177,8 @@ public final class FeMetaVersion {
     public static final int VERSION_82 = 82;
     // modify TransactionState Field
     public static final int VERSION_83 = 83;
+    // add storage format in schema change job
+    public static final int VERSION_84 = 84;
     // note: when increment meta version, should assign the latest version to 
VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_83;
+    public static final int VERSION_CURRENT = VERSION_84;
 }
diff --git 
a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 118288c..49129bb 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -360,18 +360,23 @@ public class PropertyAnalyzer {
     // analyzeStorageFormat will parse the storage format from properties
     // sql: alter table tablet_name set ("storage_format" = "v2")
     // Use this sql to convert all tablets(base and rollup index) to a new 
format segment
-    public static TStorageFormat analyzeStorageFormat(Map<String, String> 
properties) {
-        String storage_format = "";
+    public static TStorageFormat analyzeStorageFormat(Map<String, String> 
properties) throws AnalysisException {
+        String storageFormat = "";
         if (properties != null && 
properties.containsKey(PROPERTIES_STORAGE_FORMAT)) {
-            storage_format = properties.get(PROPERTIES_STORAGE_FORMAT);
+            storageFormat = properties.get(PROPERTIES_STORAGE_FORMAT);
             properties.remove(PROPERTIES_STORAGE_FORMAT);
+        } else {
+            return TStorageFormat.DEFAULT;
         }
-        if (storage_format.equalsIgnoreCase("v1")) {
+
+        if (storageFormat.equalsIgnoreCase("v1")) {
             return TStorageFormat.V1;
-        } else if(storage_format.equalsIgnoreCase("v2")) {
+        } else if (storageFormat.equalsIgnoreCase("v2")) {
             return TStorageFormat.V2;
-        } else {
+        } else if (storageFormat.equalsIgnoreCase("default")) {
             return TStorageFormat.DEFAULT;
+        } else {
+            throw new AnalysisException("unknown storage format: " + 
storageFormat);
         }
     }
 
diff --git 
a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java 
b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index dcfe276..11ba01b 100644
--- a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.alter.MaterializedViewHandler;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CastExpr;
 import org.apache.doris.analysis.Expr;
@@ -109,9 +108,10 @@ public class MaterializedViewSelector {
         long start = System.currentTimeMillis();
         Preconditions.checkState(scanNode instanceof OlapScanNode);
         OlapScanNode olapScanNode = (OlapScanNode) scanNode;
+
         Map<Long, List<Column>> candidateIndexIdToSchema = 
predicates(olapScanNode);
         long bestIndexId = priorities(olapScanNode, candidateIndexIdToSchema);
-        LOG.info("The best materialized view is {} for scan node {} in query 
{}, cost {}",
+        LOG.debug("The best materialized view is {} for scan node {} in query 
{}, cost {}",
                  bestIndexId, scanNode.getId(), selectStmt.toSql(), 
(System.currentTimeMillis() - start));
         return new BestIndexInfo(bestIndexId, isPreAggregation, 
reasonOfDisable);
     }
@@ -159,6 +159,23 @@ public class MaterializedViewSelector {
     }
 
     private long priorities(OlapScanNode scanNode, Map<Long, List<Column>> 
candidateIndexIdToSchema) {
+        OlapTable tbl = scanNode.getOlapTable();
+        Long v2RollupIndexId = tbl.getSegmentV2FormatIndexId();
+        if (v2RollupIndexId != null) {
+            ConnectContext connectContext = ConnectContext.get();
+            if (connectContext != null && 
connectContext.getSessionVariable().isUseV2Rollup()) {
+                // if user set `use_v2_rollup` variable to true, and there is 
a segment v2 rollup,
+                // just return the segment v2 rollup, because user want to 
check the v2 format data.
+                if (candidateIndexIdToSchema.containsKey(v2RollupIndexId)) {
+                    return v2RollupIndexId;
+                }
+            } else {
+                // `use_v2_rollup` is not set, so v2 format rollup should not 
be selected, remove it from
+                // candidateIndexIdToSchema
+                candidateIndexIdToSchema.remove(v2RollupIndexId);
+            }
+        }
+
         // Step1: the candidate indexes that satisfies the most prefix index
         final Set<String> equivalenceColumns = Sets.newHashSet();
         final Set<String> unequivalenceColumns = Sets.newHashSet();
@@ -231,27 +248,6 @@ public class MaterializedViewSelector {
                 }
             }
         }
-        String tableName = olapTable.getName();
-        String v2RollupIndexName = 
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName;
-        Long v2RollupIndex = olapTable.getIndexIdByName(v2RollupIndexName);
-        long baseIndexId = olapTable.getBaseIndexId();
-        ConnectContext connectContext = ConnectContext.get();
-        boolean useV2Rollup = false;
-        if (connectContext != null) {
-            useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup();
-        }
-        if (baseIndexId == selectedIndexId && v2RollupIndex != null && 
useV2Rollup) {
-            // if the selectedIndexId is baseIndexId
-            // check whether there is a V2 rollup index and useV2Rollup flag 
is true,
-            // if both true, use v2 rollup index
-            selectedIndexId = v2RollupIndex;
-        }
-        if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == 
selectedIndexId) {
-            // if the selectedIndexId is v2RollupIndex
-            // but useV2Rollup is false, use baseIndexId as selectedIndexId
-            // just make sure to use baseIndex instead of v2RollupIndex if the 
useV2Rollup is false
-            selectedIndexId = baseIndexId;
-        }
         return selectedIndexId;
     }
 
@@ -393,8 +389,7 @@ public class MaterializedViewSelector {
     }
 
     private void compensateCandidateIndex(Map<Long, MaterializedIndexMeta> 
candidateIndexIdToMeta, Map<Long,
-            MaterializedIndexMeta> allVisibleIndexes,
-                                 OlapTable table) {
+            MaterializedIndexMeta> allVisibleIndexes, OlapTable table) {
         isPreAggregation = false;
         reasonOfDisable = "The aggregate operator does not match";
         int keySizeOfBaseIndex = 
table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java 
b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
index 1a5fee7..88ed703 100644
--- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
@@ -64,9 +64,20 @@ public final class RollupSelector {
             Collection<Long> partitionIds, List<Expr> conjuncts, boolean 
isPreAggregation)
             throws UserException {
         Preconditions.checkArgument(partitionIds != null , "Paritition can't 
be null.");
+        
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && 
connectContext.getSessionVariable().isUseV2Rollup()) {
+            // if user set `use_v2_rollup` variable to true, and there is a 
segment v2 rollup,
+            // just return the segment v2 rollup, because user want to check 
the v2 format data.
+            String v2RollupIndexName = 
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + table.getName();
+            Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
+            if (v2RollupIndexId != null) {
+                return v2RollupIndexId;
+            }
+        }
+
         // Get first partition to select best prefix index rollups, because 
MaterializedIndex ids in one rollup's partitions are all same.
-        final List<Long> bestPrefixIndexRollups =
-                selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
+        final List<Long> bestPrefixIndexRollups = 
selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
         return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
     }
 
@@ -93,20 +104,9 @@ public final class RollupSelector {
         }
         String tableName = table.getName();
         String v2RollupIndexName = 
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName;
-        Long v2RollupIndex = table.getIndexIdByName(v2RollupIndexName);
+        Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
         long baseIndexId = table.getBaseIndexId();
-        ConnectContext connectContext = ConnectContext.get();
-        boolean useV2Rollup = false;
-        if (connectContext != null) {
-            useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup();
-        }
-        if (baseIndexId == selectedIndexId && v2RollupIndex != null && 
useV2Rollup) {
-            // if the selectedIndexId is baseIndexId
-            // check whether there is a V2 rollup index and useV2Rollup flag 
is true,
-            // if both true, use v2 rollup index
-            selectedIndexId = v2RollupIndex;
-        }
-        if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == 
selectedIndexId) {
+        if (v2RollupIndexId != null && v2RollupIndexId == selectedIndexId) {
             // if the selectedIndexId is v2RollupIndex
             // but useV2Rollup is false, use baseIndexId as selectedIndexId
             // just make sure to use baseIndex instead of v2RollupIndex if the 
useV2Rollup is false
diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0d2c14d..f4dcf68 100644
--- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -17,12 +17,6 @@
 
 package org.apache.doris.planner;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.AnalyticInfo;
 import org.apache.doris.analysis.Analyzer;
@@ -63,6 +57,13 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -772,8 +773,8 @@ public class SingleNodePlanner {
                 if (olapScanNode.getSelectedPartitionIds().size() == 0 && 
!FeConstants.runningUnitTest) {
                     continue;
                 }
-                MaterializedViewSelector.BestIndexInfo bestIndexInfo = 
materializedViewSelector.selectBestMV
-                        (olapScanNode);
+                MaterializedViewSelector.BestIndexInfo bestIndexInfo = 
materializedViewSelector.selectBestMV(
+                        olapScanNode);
                 
olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(), 
bestIndexInfo.isPreAggregation(),
                         bestIndexInfo.getReasonOfDisable());
             }
diff --git a/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 302134e..ebaf3da 100644
--- a/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -71,7 +71,7 @@ public class AuditEvent {
     public String queryId = "";
     @AuditField(value = "IsQuery")
     public boolean isQuery = false;
-    @AuditField(value = "fe_ip")
+    @AuditField(value = "feIp")
     public String feIp = "";
     @AuditField(value = "Stmt")
     public String stmt = "";
diff --git a/fe/src/main/java/org/apache/doris/qe/QueryState.java 
b/fe/src/main/java/org/apache/doris/qe/QueryState.java
index ecaeb3f..bc6e424 100644
--- a/fe/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/src/main/java/org/apache/doris/qe/QueryState.java
@@ -100,7 +100,7 @@ public class QueryState {
         return errType;
     }
 
-    public void setQuery(boolean isQuery) {
+    public void setIsQuery(boolean isQuery) {
         this.isQuery = isQuery;
     }
 
diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
index 63b56a0..1a0e454 100644
--- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -220,7 +220,7 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
     private long loadMemLimit = 0L;
 
-    // the default rowset type flag which will be passed to Backends througth 
heartbeat
+    // the default rowset type flag which will be passed to Backends through 
heartbeat
     @VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE)
     private String defaultRowsetType = "alpha";
 
@@ -408,7 +408,12 @@ public class SessionVariable implements Serializable, 
Writable {
         return forwardToMaster;
     }
 
-    public boolean getUseV2Rollup() { return useV2Rollup; }
+    public boolean isUseV2Rollup() { return useV2Rollup; }
+
+    // for unit test
+    public void setUseV2Rollup(boolean useV2Rollup) {
+        this.useV2Rollup = useV2Rollup;
+    }
 
     public boolean getTestMaterializedView() {
         return this.testMaterializedView;
@@ -446,8 +451,8 @@ public class SessionVariable implements Serializable, 
Writable {
         return divPrecisionIncrement;
     }
 
-    public void setDivPrecisionIncrement(int divPrecisionIncrement) {
-        this.divPrecisionIncrement = divPrecisionIncrement;
+    public String getDefaultRowsetType() {
+        return defaultRowsetType;
     }
 
     // Serialize to thrift object
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 109b737..8852b16 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -234,6 +234,7 @@ public class StmtExecutor {
             }
 
             if (parsedStmt instanceof QueryStmt) {
+                context.getState().setIsQuery(true);
                 int retryTime = Config.max_query_retry_time;
                 for (int i = 0; i < retryTime; i ++) {
                     try {
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java 
b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
index 6a80e8a..fc3930e 100644
--- a/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.thrift.HeartbeatServiceConstants;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -37,7 +38,7 @@ public class HeartbeatFlags {
         return "alpha".equalsIgnoreCase(rowsetType) || 
"beta".equalsIgnoreCase(rowsetType);
     }
 
-    public long getHeartbeatFlags () {
+    public long getHeartbeatFlags() {
         long heartbeatFlags = 0;
         try {
             String defaultRowsetType = VariableMgr.getValue(null, new 
SysVariableDesc(SessionVariable.DEFAULT_ROWSET_TYPE, SetType.GLOBAL));
diff --git a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java 
b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
index c2cea4c..543fe05 100644
--- a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
@@ -23,25 +23,20 @@ import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.ShowAlterStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowExecutor;
 import org.apache.doris.qe.ShowResultSet;
-import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
-import org.apache.doris.utframe.MockedFrontend.FeStartException;
-import org.apache.doris.utframe.MockedFrontend.NotInitException;
-
+import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.utframe.UtFrameUtils;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Map;
 import java.util.UUID;
 
@@ -54,6 +49,9 @@ public class AlterJobV2Test {
 
     @BeforeClass
     public static void beforeClass() throws Exception {
+        FeConstants.default_scheduler_interval_millisecond = 1000;
+        FeConstants.runningUnitTest = true;
+
         UtFrameUtils.createMinDorisCluster(runningDir);
 
         // create connect context
@@ -64,6 +62,8 @@ public class AlterJobV2Test {
         Catalog.getCurrentCatalog().createDb(createDbStmt);
 
         createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 
int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
+        
+        createTable("CREATE TABLE test.segmentv2(k1 int, k2 int, v1 int sum) 
distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
     }
 
     @AfterClass
@@ -110,7 +110,6 @@ public class AlterJobV2Test {
         
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
         // 2. check alter job
         Map<Long, AlterJobV2> alterJobs = 
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
-        Assert.assertEquals(1, alterJobs.size());
         for (AlterJobV2 alterJobV2 : alterJobs.values()) {
             while (!alterJobV2.getJobState().isFinalState()) {
                 System.out.println("alter job " + alterJobV2.getDbId() + " is 
running. state: " + alterJobV2.getJobState());
@@ -127,4 +126,78 @@ public class AlterJobV2Test {
         System.out.println(showResultSet.getMetaData());
         System.out.println(showResultSet.getResultRows());
     }
+    
+    @Test
+    public void testAlterSegmentV2() throws Exception {
+        Database db = 
Catalog.getCurrentCatalog().getDb("default_cluster:test");
+        Assert.assertNotNull(db);
+        OlapTable tbl = (OlapTable) db.getTable("segmentv2");
+        Assert.assertNotNull(tbl);
+        Assert.assertEquals(TStorageFormat.DEFAULT, 
tbl.getTableProperty().getStorageFormat());
+        
+        // 1. create a rollup r1
+        String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, 
v1)";
+        AlterTableStmt alterTableStmt = (AlterTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+        
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+        Map<Long, AlterJobV2> alterJobs = 
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println("alter job " + alterJobV2.getDbId() + " is 
running. state: " + alterJobV2.getJobState());
+                Thread.sleep(1000);
+            }
+            System.out.println("alter job " + alterJobV2.getDbId() + " is 
done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, 
alterJobV2.getJobState());
+        }
+        
+        String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
+        String explainString = 
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
+        Assert.assertTrue(explainString.contains("rollup: r1"));
+        
+        // 2. create a rollup with segment v2
+        alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, 
v1) properties('storage_format' = 'v2')";
+        alterTableStmt = (AlterTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+        
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+        alterJobs = 
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println("alter job " + alterJobV2.getDbId() + " is 
running. state: " + alterJobV2.getJobState());
+                Thread.sleep(1000);
+            }
+            System.out.println("alter job " + alterJobV2.getDbId() + " is 
done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, 
alterJobV2.getJobState());
+        }
+
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
"explain " + sql);
+        Assert.assertTrue(explainString.contains("rollup: r1"));
+
+        // set use_v2_rollup = true;
+        connectContext.getSessionVariable().setUseV2Rollup(true);
+        explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
"explain " + sql);
+        Assert.assertTrue(explainString.contains("rollup: __v2_segmentv2"));
+
+        // 3. process alter segment v2
+        alterStmtStr = "alter table test.segmentv2 set ('storage_format' = 
'v2');";
+        alterTableStmt = (AlterTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+        
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+        // 4. check alter job
+        alterJobs = 
Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
+        for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+            while (!alterJobV2.getJobState().isFinalState()) {
+                System.out.println("alter job " + alterJobV2.getDbId() + " is 
running. state: " + alterJobV2.getJobState());
+                Thread.sleep(1000);
+            }
+            System.out.println("alter job " + alterJobV2.getDbId() + " is 
done. state: " + alterJobV2.getJobState());
+            Assert.assertEquals(AlterJobV2.JobState.FINISHED, 
alterJobV2.getJobState());
+        }
+        // 5. check storage format of table
+        Assert.assertEquals(TStorageFormat.V2, 
tbl.getTableProperty().getStorageFormat());
+
+        // 6. alter again, that no job will be created
+        try {
+            
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+            Assert.fail();
+        } catch (DdlException e) {
+            Assert.assertTrue(e.getMessage().contains("Nothing is changed"));
+        }
+    }
 }
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 8d7fe95..a45cdd9 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -190,8 +190,8 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         public String frontendHostPort = "127.0.0.1:9030";
         public String user = "root";
         public String password = "";
-        public String database = "__doris_audit_db";
-        public String table = "__doris_audit_tbl";
+        public String database = "doris_audit_db__";
+        public String table = "doris_audit_tbl__";
 
         public void init(Map<String, String> properties) throws 
PluginException {
             try {
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 8dbac8f..d7fd87d 100644
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -94,6 +94,7 @@ public class DorisStreamLoader {
 
             conn.addRequestProperty("label", label);
             conn.addRequestProperty("max_fiter_ratio", "1.0");
+            conn.addRequestProperty("columns", "query_id, time, client_ip, 
user, db, state, query_time, scan_bytes, scan_rows, return_rows, stmt_id, 
is_query, frontend_ip, stmt");
 
             conn.setDoOutput(true);
             conn.setDoInput(true);
@@ -148,4 +149,4 @@ public class DorisStreamLoader {
             return sb.toString();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/tools/show_segment_status/README.md 
b/tools/show_segment_status/README.md
index 5eacd87..ba00e7c 100644
--- a/tools/show_segment_status/README.md
+++ b/tools/show_segment_status/README.md
@@ -50,8 +50,9 @@ $ python setup.py install
 
 # Output Example Format
 
-```
+The first number is about segment v2, the latter one is the total count.
 
+```
 ==========SUMMARY()===========
 rowset_count: 289845 / 289845
 rowset_disk_size: 84627551189 / 84627551189
@@ -67,5 +68,4 @@ rowset_count: 79650 / 79650
 rowset_disk_size: 24473921575 / 24473921575
 rowset_row_count: 331449328 / 331449328
 ===========================================================
-
 ```


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to