This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6ac60b [SegmentV2] Optimize the upgrade logic of SegmentV2 (#3340)
c6ac60b is described below
commit c6ac60bab9563a77b052a2766b0de859528aee41
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Apr 21 10:45:29 2020 +0800
[SegmentV2] Optimize the upgrade logic of SegmentV2 (#3340)
This CL mainly made the following modifications:
1. Reorganized SegmentV2 upgrade document.
2. When the variable `use_v2_rollup` is set to true, the base rollup in v2
format is forcibly queried for verifying the data.
3. Fix a problem that there is no persistent storage format information in
the schema change operation that performs v2 conversion.
4. Allow users to directly create v2 format tables.
---
.../cn/administrator-guide/segment-v2-usage.md | 215 ++++++++++++---------
.../apache/doris/alter/SchemaChangeHandler.java | 5 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 19 +-
.../apache/doris/analysis/InstallPluginStmt.java | 3 +-
.../apache/doris/analysis/UninstallPluginStmt.java | 3 +-
.../java/org/apache/doris/catalog/Catalog.java | 60 +++---
.../java/org/apache/doris/catalog/OlapTable.java | 15 ++
.../org/apache/doris/catalog/TableProperty.java | 24 ++-
.../java/org/apache/doris/common/ErrorCode.java | 4 +-
.../org/apache/doris/common/FeMetaVersion.java | 4 +-
.../apache/doris/common/util/PropertyAnalyzer.java | 17 +-
.../doris/planner/MaterializedViewSelector.java | 45 ++---
.../org/apache/doris/planner/RollupSelector.java | 30 +--
.../apache/doris/planner/SingleNodePlanner.java | 17 +-
.../java/org/apache/doris/plugin/AuditEvent.java | 2 +-
.../main/java/org/apache/doris/qe/QueryState.java | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 13 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 1 +
.../org/apache/doris/system/HeartbeatFlags.java | 3 +-
.../org/apache/doris/alter/AlterJobV2Test.java | 89 ++++++++-
.../doris/plugin/audit/AuditLoaderPlugin.java | 4 +-
.../doris/plugin/audit/DorisStreamLoader.java | 3 +-
tools/show_segment_status/README.md | 4 +-
23 files changed, 384 insertions(+), 198 deletions(-)
diff --git a/docs/documentation/cn/administrator-guide/segment-v2-usage.md
b/docs/documentation/cn/administrator-guide/segment-v2-usage.md
index faf6b59..1c821a1 100644
--- a/docs/documentation/cn/administrator-guide/segment-v2-usage.md
+++ b/docs/documentation/cn/administrator-guide/segment-v2-usage.md
@@ -1,118 +1,147 @@
-# Doris Segment V2上线和试用手册
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Segment V2 升级手册
## 背景
-Doris 0.12版本中实现了segment v2(新的存储格式),引入词典压缩、bitmap索引、page
cache等优化,能够提升系统性能。目前0.12版本已经发布alpha版本,正在内部上线过程中,上线的方案和试用方法记录如下
+Doris 0.12 版本中实现了新的存储格式:Segment V2,引入词典压缩、bitmap索引、page cache等优化,能够提升系统性能。
+
+0.12 版本会同时支持读写原有的 Segment V1(以下简称V1) 和新的 Segment V2(以下简称V2) 两种格式。如果原有数据想使用 V2
相关特性,需通过命令将 V1 转换成 V2 格式。
+
+本文档主要介绍从 0.11 版本升级至 0.12 版本后,如何转换和使用 V2 格式。
-## 上线
+V2 格式的表可以支持以下新的特性:
-为了保证上线的稳定性,上线分为三个阶段:
-第一个阶段是上线0.12版本,但是不全量开启segment v2的功能,只在验证的时候,创建segment v2的表(或者索引)
-第二个阶段是全量开启segment v2的功能,替换现有的segment存储格式,这样对新表会创建segment
v2的格式的存储文件,但是对于旧表,需要依赖于compaction和schema change等过程,实现格式的转化
-第三个阶段就是转化旧的segment格式到新的segment v2的格式,这个需要在验证segment
v2的正确性和性能没有问题之后,可以按照用户意愿逐步完成。
+1. bitmap 索引
+2. 内存表
+3. page cache
+4. 字典压缩
+5. 延迟物化(Lazy Materialization)
-### 上线验证
+## 集群升级
-- 正确性
+0.12 版本仅支持从 0.11 版本升级,不支持从 0.11 之前的版本升级。请先确保升级的前的 Doris 集群版本为 0.11。
-正确性是segment v2上线最需要保证的指标。 在第一阶段,为了保证线上环境的稳定性,并且验证segment v2的正确性,采用如下的方案:
-1. 选择几个需要验证的表,使用以下语句,创建segment v2格式的rollup表,该rollup表与base表的schema相同
+0.12 版本有两个 V2 相关的重要参数:
- alter table table_name add rollup table_name (columns) properties
("storage_format" = "v2");
+* `default_rowset_type`:FE 一个全局变量(Global Variable)设置,默认为 "alpha",即 V1 版本。
+* `default_rowset_type`:BE 的一个配置项,默认为 "ALPHA",即 V1 版本。
- 其中,
- rollup后面的index名字直接指定为base table的table name,该语句会自动生成一个__v2_table_name。
+保持上述配置默认的话,按常规步骤对集群升级后,原有集群数据的存储格式不会变更,即依然为 V1 格式。如果对 V2
格式没有需求,则继续正常使用集群即可,无需做任何额外操作。所有原有数据、以及新导入的数据,都依然是 V1 版本。
- columns可以随便指定一个列名即可,这里一方面是为了兼容现有的语法,一方面是为了方便。
+## V2 格式转换
-2. 上面的创建出来的rollup index名字格式类似:__v2_table_name
+### 已有表数据转换成 V2
- 通过命令 :
+对于已有表数据的格式转换,Doris 提供两种方式:
- `desc table_name all;`
+1. 创建一个 V2 格式的特殊 Rollup
- 查看table中是否存在名字:__v2_table_name的rollup。由于创建segment
v2的rollup表是一个异步操作,所以并不会立即成功。如果上面命令中并没有显示新创建的 rollup,可能是还在创建过程中。
+ 该方式会针对指定表,创建一个 V2 格式的特殊 Rollup。创建完成后,新的 V2 格式的 Rollup 会和原有表格式数据并存。用户可以指定对
V2 格式的 Rollup 进行查询验证。
+
+ 该方式主要用于对 V2 格式的验证,因为不会修改原有表数据,因此可以安全的进行 V2
格式的数据验证,而不用担心表数据因格式转换而损坏。通常先使用这个方式对数据进行校验,之后再使用方法2对整个表进行数据格式转换。
+
+ 操作步骤如下:
+
+ ```
+ ## 创建 V2 格式的 Rollup
+
+ ALTER TABLE table_name ADD ROLLUP table_name (columns) PROPERTIES
("storage_format" = "v2");
+ ```
- 可以通过下面命令查看正在进行的 rollup 表。
+ 其中, Rollup 的名称必须为表名。columns 字段可以任意填写,系统不会检查该字段的合法性。该语句会自动生成一个名为
`__V2_table_name` 的 Rollup,并且该 Rollup 列包含表的全部列。
+
+ 通过以下语句查看创建进度:
+
+ ```
+ SHOW ALTER TABLE ROLLUP;
+ ```
+
+ 创建完成后,可以通过 `DESC table_name ALL;` 查看到名为 `__v2_table_name` 的 Rollup。
+
+ 之后,通过如下命令,切换到 V2 格式查询:
- `show alter table rollup;`
+ ```
+ set use_v2_rollup = true;
+ select * from table_name limit 10;
+ ```
+
+ `use_V2_Rollup` 这个变量会强制查询名为 `__V2_table_name` 的 Rollup,并且不会考虑其他 Rollup
的命中条件。所以该参数仅用于对 V2 格式数据进行验证。
- 看到会有一个名字为__v2_table_name的rollup表正在创建。
+2. 转换现有表数据格式
-3. 通过查询base表和segment v2格式的rollup表,验证查询结果是否一致(查询可以来自audit日志)
+ 该方式相当于给指定的表发送一个 schema change 作业,作业完成后,表的所有数据会被转换成 V2 格式。该方法不会保留原有 v1
格式,所以请先使用方法1进行格式验证。
+
+ ```
+ ALTER TABLE table_name SET ("storage_format" = "v2");
+ ```
- 为了让查询使用segment v2的rollup表,增加了一个session变量,通过在查询中使用如下的语句,来查询segment
v2格式的index:
+ 之后通过如下命令查看作业进度:
+
+ ```
+ SHOW ALTER TABLE COLUMN;
+ ```
+
+ 作业完成后,该表的所有数据(包括Rollup)都转换为了 V2。且 V1 版本的数据已被删除。如果该表是分区表,则之后创建的分区也都是 V2 格式。
+
+ **V2 格式的表不能重新转换为 V1**
+
+### 创建新的 V2 格式的表
- set use_v2_rollup = true;
+在不改变默认配置参数的情况下,用户可以创建 V2 格式的表:
- 比如说,要当前db中有一个simple的表,直接查询如下:
-
- ```
- mysql> select * from simple;
- +------+-------+
- | key | value |
- +------+-------+
- | 2 | 6 |
- | 1 | 6 |
- | 4 | 5 |
- +------+-------+
- 3 rows in set (0.01 sec)
- ```
-
- 然后使用使用如下的query查询__v2_simpel的rollup表:
-
- ```
- mysql> set use_v2_rollup = true;
- Query OK, 0 rows affected (0.04 sec)
-
- mysql> select * from simple;
- +------+-------+
- | key | value |
- +------+-------+
- | 4 | 5 |
- | 1 | 6 |
- | 2 | 6 |
- +------+-------+
- 3 rows in set (0.01 sec)
- ```
-
- 期望的结果是两次查询的结果相同。
-
如果结果不一致,需要进行排查。第一步需要定位是否是因为由于两次查询之间有新的导入,导致数据不一致。如果是的话,需要进行重试。如果不是,则需要进一步定位原因。
-
-4. 对比同样查询在base表和segment v2的rollup表中的结果是否一致
-
-- 查询延时
-
- 在segment v2中优化了v1中的随机读取,并且增加了bitmap索引、lazy
materialization等优化,预计查询延时会有下降,
-
-- 导入延时
-- page cache命中率
-- string类型字段压缩率
-- bitmap索引的性能提升率和空间占用
-
-### 全量开启segment v2
-
-有两个方式:
-1. fe中有一个全局变量,通过设置全局变量default_rowset_type,来设置BE中使用segment v2作为默认的存储格式,命令如下:
-
- set global default_rowset_type = beta;
-
-
使用这个方式,只需要执行上述命令,之后等待10s左右,FE会将配置同步到BE中。不需要在每个BE中进行配置。推荐使用这个配置。不过该方式目前没有简单的方式来验证BE上的default
rowset type是否已经变更,一种办法是建一个表,然后查看对应的表的元数据。但是这样子比较麻烦,后续看需要可以在某个地方实现状态查看。
-
-2. 修改BE中的配置文件中的配置default_rowset_type为BETA。这种方式需要在每个BE中添加对应的配置。
-
-
-### 转化segment v1格式为v2
-
-如果不想等待系统后台自动转换存储格式(比如想要使用bitmap索引、词典压缩),可以手动指定触发某个表的存储格式转换。具体是通过schema
change来实现将v1的格式转化为v2的格式:
-
- alter table table_name set ("storage_format" = "v2");
-
- 将存储格式转化为v2.
-
-如果在没有设置默认的存储格式为v2的时候,想要新建一个v2的表,需要按照以下的步骤进行操作:
-1. 使用create table来创建v1的表
-2. 使用上面的schema change命令进行格式的转化
+```
+CREATE TABLE tbl_name
+(
+ k1 INT,
+ k2 INT
+)
+DISTRIBUTED BY HASH(k1) BUCKETS 1
+PROPERTIES
+(
+ "storage_format" = "v2"
+);
+```
+在 `properties` 中指定 `"storage_format" = "v2"` 后,该表将使用 V2
格式创建。如果是分区表,则之后创建的分区也都是 V2 格式。
+### 全量格式转换(试验功能)
+通过以下方式可以开启整个集群的全量数据格式转换(V1 -> V2)。全量数据转换是通过 BE 后台的数据 compaction 过程异步进行的。
+
+1. 从 BE 开启全量格式转换
+
+ 在 `be.conf` 中添加变量 `default_rowset_type=BETA` 并重启 BE 节点。在之后的 compaction
流程中,数据会自动从 V1 转换成 V2。
+
+2. 从 FE 开启全量格式转换
+
+ 通过 mysql 客户端连接 Doris 后,执行如下语句:
+
+ `SET GLOBAL default_rowset_type = beta;`
+
+ 执行完成后,FE 会通过心跳将信息发送给 BE,之后 BE 的 compaction 流程中,数据会自动从 V1 转换成 V2。
+
+ FE 的配置参数优先级高于 BE 的配置。即使 BE 中的配置 `default_rowset_type` 为 ALPHA,如果 FE 配置为
beta 后,则 BE 依然开始进行 V1 到 V2 的数据格式转换。
+
+ **建议先通过对单独表的数据格式转换验证后,再进行全量转换。全量转换的时间比较长,且进度依赖于 compaction 的进度。**可能出现
compaction 无法完成的情况,因此需要通过显式的执行 `ALTER TABLE` 操作进行个别表的数据格式转换。
+
+3. 查看全量转换进度
+
+ 全量转换进度须通过脚本查看。脚本位置为代码库的 `tools/show_segment_status/` 目录。请参阅其中的 `README`
文档查看使用帮助。
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 2fc03a4..2783666 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -871,6 +871,7 @@ public class SchemaChangeHandler extends AlterHandler {
// property 3: timeout
long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap,
Config.alter_table_timeout_second);
+
TStorageFormat storageFormat =
PropertyAnalyzer.analyzeStorageFormat(propertyMap);
// create job
@@ -944,7 +945,9 @@ public class SchemaChangeHandler extends AlterHandler {
} else if (hasIndexChange) {
needAlter = true;
} else if (storageFormat == TStorageFormat.V2) {
- needAlter = true;
+ if (olapTable.getTableProperty().getStorageFormat() !=
TStorageFormat.V2) {
+ needAlter = true;
+ }
}
if (!needAlter) {
diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 9c8c688..d1ca93b 100644
--- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -110,7 +110,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// The schema change job will wait all transactions before this txn id
finished, then send the schema change tasks.
protected long watershedTxnId = -1;
- private TStorageFormat storageFormat = null;
+ private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
// save all schema change tasks
private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
@@ -571,6 +571,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.setIndexes(indexes);
}
+ // set storage format of table, only set if format is v2
+ if (storageFormat == TStorageFormat.V2) {
+ tbl.setStorageFormat(storageFormat);
+ }
+
tbl.setState(OlapTableState.NORMAL);
}
@@ -890,6 +895,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
out.writeBoolean(false);
}
}
+
+ Text.writeString(out, storageFormat.name());
}
/**
@@ -968,6 +975,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
}
}
+
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_84) {
+ storageFormat = TStorageFormat.valueOf(Text.readString(in));
+ }
}
/**
@@ -1013,6 +1024,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
out.writeBoolean(false);
}
}
+
+ Text.writeString(out, storageFormat.name());
}
/**
@@ -1060,6 +1073,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
this.indexes = null;
}
}
+
+ if (Catalog.getCurrentCatalogJournalVersion() >=
FeMetaVersion.VERSION_84) {
+ storageFormat = TStorageFormat.valueOf(Text.readString(in));
+ }
}
@Override
diff --git a/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
b/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
index 801eb21..73b1331 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InstallPluginStmt.java
@@ -43,7 +43,8 @@ public class InstallPluginStmt extends DdlStmt {
super.analyze(analyzer);
if (!Config.plugin_enable) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_INVALID_OPERATION, "INSTALL
PLUGIN");
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_OPERATION_DISABLED, "INSTALL
PLUGIN",
+ "Please enable it by setting 'plugin_enable' = 'true'");
}
// check operation privilege
diff --git
a/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
b/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
index 08d6fe3..6ba4301 100644
--- a/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/UninstallPluginStmt.java
@@ -43,7 +43,8 @@ public class UninstallPluginStmt extends DdlStmt {
super.analyze(analyzer);
if (!Config.plugin_enable) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_INVALID_OPERATION, "UNINSTALL
PLUGIN");
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_OPERATION_DISABLED,
"UNINSTALL PLUGIN",
+ "Please enable it by setting 'plugin_enable' = 'true'");
}
// check operation privilege
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 858399c..bc6e827 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -199,6 +199,7 @@ import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PullLoadJobMgr;
+import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTaskType;
@@ -3070,7 +3071,8 @@ public class Catalog {
singlePartitionDesc.getVersionInfo(),
bfColumns, olapTable.getBfFpp(),
tabletIdSet, olapTable.getCopiedIndexes(),
- singlePartitionDesc.isInMemory());
+ singlePartitionDesc.isInMemory(),
+ olapTable.getTableProperty().getStorageFormat());
// check again
db.writeLock();
@@ -3381,7 +3383,8 @@ public class Catalog {
double bfFpp,
Set<Long> tabletIdSet,
List<Index> indexes,
- boolean isInMemory) throws
DdlException {
+ boolean isInMemory,
+ TStorageFormat storageFormat)
throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId,
IndexState.NORMAL);
@@ -3446,6 +3449,7 @@ public class Catalog {
countDownLatch,
indexes,
isInMemory);
+ task.setStorageFormat(storageFormat);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
// not for resending task
@@ -3684,6 +3688,15 @@ public class Catalog {
}
Preconditions.checkNotNull(versionInfo);
+ // get storage format
+ TStorageFormat storageFormat = TStorageFormat.DEFAULT; // default
means it's up to BE's config
+ try {
+ storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setStorageFormat(storageFormat);
+
// a set to record every new tablet created when create table
// if failed in any step, use this set to do clear things
Set<Long> tabletIdSet = new HashSet<Long>();
@@ -3706,7 +3719,7 @@ public class Catalog {
partitionInfo.getReplicationNum(partitionId),
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
- isInMemory);
+ isInMemory, storageFormat);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE) {
try {
@@ -3735,7 +3748,7 @@ public class Catalog {
partitionInfo.getReplicationNum(entry.getValue()),
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
- isInMemory);
+ isInMemory, storageFormat);
olapTable.addPartition(partition);
}
} else {
@@ -3914,22 +3927,21 @@ public class Catalog {
// properties
sb.append("\nPROPERTIES (\n");
- // 1. storage type
-
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_TYPE).append("\" =
\"");
- TStorageType storageType = olapTable.getStorageTypeByIndexId(
- olapTable.getIndexIdByName(olapTable.getName()));
- sb.append(storageType.name()).append("\"");
+ // replicationNum
+ Short replicationNum = olapTable.getDefaultReplicationNum();
+
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM).append("\"
= \"");
+ sb.append(replicationNum).append("\"");
- // 2. bloom filter
+ // bloom filter
Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
if (bfColumnNames != null) {
- sb.append(",\n
\"").append(PropertyAnalyzer.PROPERTIES_BF_COLUMNS).append("\" = \"");
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_BF_COLUMNS).append("\" =
\"");
sb.append(Joiner.on(",
").join(olapTable.getCopiedBfColumns())).append("\"");
}
if (separatePartition) {
- // 3. version info
- sb.append(",\n
\"").append(PropertyAnalyzer.PROPERTIES_VERSION_INFO).append("\" = \"");
+ // version info
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VERSION_INFO).append("\"
= \"");
Partition partition = null;
if (olapTable.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED) {
partition = olapTable.getPartition(olapTable.getName());
@@ -3941,27 +3953,26 @@ public class Catalog {
.append("\"");
}
- // 5. colocateTable
+ // colocateTable
String colocateTable = olapTable.getColocateGroup();
if (colocateTable != null) {
- sb.append(",\n
\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\" = \"");
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\"
= \"");
sb.append(colocateTable).append("\"");
}
- // 6. dynamic partition
+ // dynamic partition
if (olapTable.dynamicPartitionExists()) {
sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().toString());
}
- // 7. replicationNum
- Short replicationNum = olapTable.getDefaultReplicationNum();
- sb.append(",\n
\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM).append("\" = \"");
- sb.append(replicationNum).append("\"");
-
- // 8. in memory
- sb.append(",\n
\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = \"");
+ // in memory
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" =
\"");
sb.append(olapTable.isInMemory()).append("\"");
+ // storage type
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\"
= \"");
+
sb.append(olapTable.getTableProperty().getStorageFormat()).append("\"");
+
sb.append("\n)");
} else if (table.getType() == TableType.MYSQL) {
MysqlTable mysqlTable = (MysqlTable) table;
@@ -6158,7 +6169,8 @@ public class Catalog {
copiedTbl.getBfFpp(),
tabletIdSet,
copiedTbl.getCopiedIndexes(),
- copiedTbl.isInMemory());
+ copiedTbl.isInMemory(),
+ copiedTbl.getTableProperty().getStorageFormat());
newPartitions.add(newPartition);
}
} catch (DdlException e) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index 725c640..719e29e 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -17,6 +17,7 @@
package org.apache.doris.catalog;
+import org.apache.doris.alter.MaterializedViewHandler;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.backup.Status;
import org.apache.doris.backup.Status.ErrCode;
@@ -39,6 +40,7 @@ import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TOlapTable;
+import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTableDescriptor;
@@ -309,6 +311,11 @@ public class OlapTable extends Table {
return indexNameToId.get(indexName);
}
+ public Long getSegmentV2FormatIndexId() {
+ String v2RollupIndexName =
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + getName();
+ return indexNameToId.get(v2RollupIndexName);
+ }
+
public String getIndexNameById(long indexId) {
for (Map.Entry<String, Long> entry : indexNameToId.entrySet()) {
if (entry.getValue() == indexId) {
@@ -1455,4 +1462,12 @@ public class OlapTable extends Table {
public boolean existTempPartitions() {
return !tempPartitions.isEmpty();
}
+
+ public void setStorageFormat(TStorageFormat storageFormat) {
+ if (tableProperty == null) {
+ tableProperty = new TableProperty(new HashMap<>());
+ }
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
storageFormat.name());
+ tableProperty.buildStorageFormat();
+ }
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
index 64d5564..51614cc 100644
--- a/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TStorageFormat;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@@ -50,6 +51,16 @@ public class TableProperty implements Writable {
private boolean isInMemory = false;
+ /*
+ * the default storage format of this table.
+ * DEFAULT: depends on BE's config 'default_rowset_type'
+ * V1: alpha rowset
+ * V2: beta rowset
+ *
+ * This property should be set when creating the table, and can only be
changed to V2 using Alter Table stmt.
+ */
+ private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
+
public TableProperty(Map<String, String> properties) {
this.properties = properties;
}
@@ -101,6 +112,12 @@ public class TableProperty implements Writable {
return this;
}
+ public TableProperty buildStorageFormat() {
+ storageFormat =
TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
+ TStorageFormat.DEFAULT.name()));
+ return this;
+ }
+
public void modifyTableProperties(Map<String, String> modifyProperties) {
properties.putAll(modifyProperties);
}
@@ -125,6 +142,10 @@ public class TableProperty implements Writable {
return isInMemory;
}
+ public TStorageFormat getStorageFormat() {
+ return storageFormat;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
@@ -134,6 +155,7 @@ public class TableProperty implements Writable {
return GsonUtils.GSON.fromJson(Text.readString(in),
TableProperty.class)
.buildDynamicProperty()
.buildReplicationNum()
- .buildInMemory();
+ .buildInMemory()
+ .buildStorageFormat();
}
}
diff --git a/fe/src/main/java/org/apache/doris/common/ErrorCode.java
b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
index 9a6ecc8..7b416a3 100644
--- a/fe/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -230,7 +230,9 @@ public enum ErrorCode {
ERROR_DYNAMIC_PARTITION_ENABLE(5068, new byte[] {'4', '2', '0', '0', '0'},
"Invalid dynamic partition enable: %s. Expected true or false"),
ERROR_DYNAMIC_PARTITION_PREFIX(5069, new byte[] {'4', '2', '0', '0', '0'},
- "Invalid dynamic partition prefix: %s.");
+ "Invalid dynamic partition prefix: %s."),
+ ERR_OPERATION_DISABLED(5070, new byte[] {'4', '2', '0', '0', '0'},
+ "Operation %s is disabled. %s");
ErrorCode(int code, byte[] sqlState, String errorMsg) {
this.code = code;
diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
index fd3bd1d..bebaccf 100644
--- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -177,6 +177,8 @@ public final class FeMetaVersion {
public static final int VERSION_82 = 82;
// modify TransactionState Field
public static final int VERSION_83 = 83;
+ // add storage format in schema change job
+ public static final int VERSION_84 = 84;
// note: when increment meta version, should assign the latest version to
VERSION_CURRENT
- public static final int VERSION_CURRENT = VERSION_83;
+ public static final int VERSION_CURRENT = VERSION_84;
}
diff --git
a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 118288c..49129bb 100644
--- a/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -360,18 +360,23 @@ public class PropertyAnalyzer {
// analyzeStorageFormat will parse the storage format from properties
// sql: alter table tablet_name set ("storage_format" = "v2")
// Use this sql to convert all tablets(base and rollup index) to a new
format segment
- public static TStorageFormat analyzeStorageFormat(Map<String, String>
properties) {
- String storage_format = "";
+ public static TStorageFormat analyzeStorageFormat(Map<String, String>
properties) throws AnalysisException {
+ String storageFormat = "";
if (properties != null &&
properties.containsKey(PROPERTIES_STORAGE_FORMAT)) {
- storage_format = properties.get(PROPERTIES_STORAGE_FORMAT);
+ storageFormat = properties.get(PROPERTIES_STORAGE_FORMAT);
properties.remove(PROPERTIES_STORAGE_FORMAT);
+ } else {
+ return TStorageFormat.DEFAULT;
}
- if (storage_format.equalsIgnoreCase("v1")) {
+
+ if (storageFormat.equalsIgnoreCase("v1")) {
return TStorageFormat.V1;
- } else if(storage_format.equalsIgnoreCase("v2")) {
+ } else if (storageFormat.equalsIgnoreCase("v2")) {
return TStorageFormat.V2;
- } else {
+ } else if (storageFormat.equalsIgnoreCase("default")) {
return TStorageFormat.DEFAULT;
+ } else {
+ throw new AnalysisException("unknown storage format: " +
storageFormat);
}
}
diff --git
a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
index dcfe276..11ba01b 100644
--- a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java
@@ -17,7 +17,6 @@
package org.apache.doris.planner;
-import org.apache.doris.alter.MaterializedViewHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
@@ -109,9 +108,10 @@ public class MaterializedViewSelector {
long start = System.currentTimeMillis();
Preconditions.checkState(scanNode instanceof OlapScanNode);
OlapScanNode olapScanNode = (OlapScanNode) scanNode;
+
Map<Long, List<Column>> candidateIndexIdToSchema =
predicates(olapScanNode);
long bestIndexId = priorities(olapScanNode, candidateIndexIdToSchema);
- LOG.info("The best materialized view is {} for scan node {} in query
{}, cost {}",
+ LOG.debug("The best materialized view is {} for scan node {} in query
{}, cost {}",
bestIndexId, scanNode.getId(), selectStmt.toSql(),
(System.currentTimeMillis() - start));
return new BestIndexInfo(bestIndexId, isPreAggregation,
reasonOfDisable);
}
@@ -159,6 +159,23 @@ public class MaterializedViewSelector {
}
private long priorities(OlapScanNode scanNode, Map<Long, List<Column>>
candidateIndexIdToSchema) {
+ OlapTable tbl = scanNode.getOlapTable();
+ Long v2RollupIndexId = tbl.getSegmentV2FormatIndexId();
+ if (v2RollupIndexId != null) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().isUseV2Rollup()) {
+ // if user set `use_v2_rollup` variable to true, and there is
a segment v2 rollup,
+ // just return the segment v2 rollup, because user want to
check the v2 format data.
+ if (candidateIndexIdToSchema.containsKey(v2RollupIndexId)) {
+ return v2RollupIndexId;
+ }
+ } else {
+ // `use_v2_rollup` is not set, so v2 format rollup should not
be selected, remove it from
+ // candidateIndexIdToSchema
+ candidateIndexIdToSchema.remove(v2RollupIndexId);
+ }
+ }
+
// Step1: the candidate indexes that satisfies the most prefix index
final Set<String> equivalenceColumns = Sets.newHashSet();
final Set<String> unequivalenceColumns = Sets.newHashSet();
@@ -231,27 +248,6 @@ public class MaterializedViewSelector {
}
}
}
- String tableName = olapTable.getName();
- String v2RollupIndexName =
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName;
- Long v2RollupIndex = olapTable.getIndexIdByName(v2RollupIndexName);
- long baseIndexId = olapTable.getBaseIndexId();
- ConnectContext connectContext = ConnectContext.get();
- boolean useV2Rollup = false;
- if (connectContext != null) {
- useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup();
- }
- if (baseIndexId == selectedIndexId && v2RollupIndex != null &&
useV2Rollup) {
- // if the selectedIndexId is baseIndexId
- // check whether there is a V2 rollup index and useV2Rollup flag
is true,
- // if both true, use v2 rollup index
- selectedIndexId = v2RollupIndex;
- }
- if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex ==
selectedIndexId) {
- // if the selectedIndexId is v2RollupIndex
- // but useV2Rollup is false, use baseIndexId as selectedIndexId
- // just make sure to use baseIndex instead of v2RollupIndex if the
useV2Rollup is false
- selectedIndexId = baseIndexId;
- }
return selectedIndexId;
}
@@ -393,8 +389,7 @@ public class MaterializedViewSelector {
}
private void compensateCandidateIndex(Map<Long, MaterializedIndexMeta>
candidateIndexIdToMeta, Map<Long,
- MaterializedIndexMeta> allVisibleIndexes,
- OlapTable table) {
+ MaterializedIndexMeta> allVisibleIndexes, OlapTable table) {
isPreAggregation = false;
reasonOfDisable = "The aggregate operator does not match";
int keySizeOfBaseIndex =
table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
index 1a5fee7..88ed703 100644
--- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
+++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java
@@ -64,9 +64,20 @@ public final class RollupSelector {
Collection<Long> partitionIds, List<Expr> conjuncts, boolean
isPreAggregation)
throws UserException {
Preconditions.checkArgument(partitionIds != null , "Paritition can't
be null.");
+
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().isUseV2Rollup()) {
+ // if user set `use_v2_rollup` variable to true, and there is a
segment v2 rollup,
+ // just return the segment v2 rollup, because user want to check
the v2 format data.
+ String v2RollupIndexName =
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + table.getName();
+ Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
+ if (v2RollupIndexId != null) {
+ return v2RollupIndexId;
+ }
+ }
+
// Get first partition to select best prefix index rollups, because
MaterializedIndex ids in one rollup's partitions are all same.
- final List<Long> bestPrefixIndexRollups =
- selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
+ final List<Long> bestPrefixIndexRollups =
selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
}
@@ -93,20 +104,9 @@ public final class RollupSelector {
}
String tableName = table.getName();
String v2RollupIndexName =
MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName;
- Long v2RollupIndex = table.getIndexIdByName(v2RollupIndexName);
+ Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
long baseIndexId = table.getBaseIndexId();
- ConnectContext connectContext = ConnectContext.get();
- boolean useV2Rollup = false;
- if (connectContext != null) {
- useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup();
- }
- if (baseIndexId == selectedIndexId && v2RollupIndex != null &&
useV2Rollup) {
- // if the selectedIndexId is baseIndexId
- // check whether there is a V2 rollup index and useV2Rollup flag
is true,
- // if both true, use v2 rollup index
- selectedIndexId = v2RollupIndex;
- }
- if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex ==
selectedIndexId) {
+ if (v2RollupIndexId != null && v2RollupIndexId == selectedIndexId) {
// if the selectedIndexId is v2RollupIndex
// but useV2Rollup is false, use baseIndexId as selectedIndexId
// just make sure to use baseIndex instead of v2RollupIndex if the
useV2Rollup is false
diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0d2c14d..f4dcf68 100644
--- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -17,12 +17,6 @@
package org.apache.doris.planner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
import org.apache.doris.analysis.AggregateInfo;
import org.apache.doris.analysis.AnalyticInfo;
import org.apache.doris.analysis.Analyzer;
@@ -63,6 +57,13 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -772,8 +773,8 @@ public class SingleNodePlanner {
if (olapScanNode.getSelectedPartitionIds().size() == 0 &&
!FeConstants.runningUnitTest) {
continue;
}
- MaterializedViewSelector.BestIndexInfo bestIndexInfo =
materializedViewSelector.selectBestMV
- (olapScanNode);
+ MaterializedViewSelector.BestIndexInfo bestIndexInfo =
materializedViewSelector.selectBestMV(
+ olapScanNode);
olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(),
bestIndexInfo.isPreAggregation(),
bestIndexInfo.getReasonOfDisable());
}
diff --git a/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 302134e..ebaf3da 100644
--- a/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -71,7 +71,7 @@ public class AuditEvent {
public String queryId = "";
@AuditField(value = "IsQuery")
public boolean isQuery = false;
- @AuditField(value = "fe_ip")
+ @AuditField(value = "feIp")
public String feIp = "";
@AuditField(value = "Stmt")
public String stmt = "";
diff --git a/fe/src/main/java/org/apache/doris/qe/QueryState.java
b/fe/src/main/java/org/apache/doris/qe/QueryState.java
index ecaeb3f..bc6e424 100644
--- a/fe/src/main/java/org/apache/doris/qe/QueryState.java
+++ b/fe/src/main/java/org/apache/doris/qe/QueryState.java
@@ -100,7 +100,7 @@ public class QueryState {
return errType;
}
- public void setQuery(boolean isQuery) {
+ public void setIsQuery(boolean isQuery) {
this.isQuery = isQuery;
}
diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
index 63b56a0..1a0e454 100644
--- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -220,7 +220,7 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = LOAD_MEM_LIMIT)
private long loadMemLimit = 0L;
- // the default rowset type flag which will be passed to Backends througth
heartbeat
+ // the default rowset type flag which will be passed to Backends through
heartbeat
@VariableMgr.VarAttr(name = DEFAULT_ROWSET_TYPE)
private String defaultRowsetType = "alpha";
@@ -408,7 +408,12 @@ public class SessionVariable implements Serializable,
Writable {
return forwardToMaster;
}
- public boolean getUseV2Rollup() { return useV2Rollup; }
+ public boolean isUseV2Rollup() { return useV2Rollup; }
+
+ // for unit test
+ public void setUseV2Rollup(boolean useV2Rollup) {
+ this.useV2Rollup = useV2Rollup;
+ }
public boolean getTestMaterializedView() {
return this.testMaterializedView;
@@ -446,8 +451,8 @@ public class SessionVariable implements Serializable,
Writable {
return divPrecisionIncrement;
}
- public void setDivPrecisionIncrement(int divPrecisionIncrement) {
- this.divPrecisionIncrement = divPrecisionIncrement;
+ public String getDefaultRowsetType() {
+ return defaultRowsetType;
}
// Serialize to thrift object
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 109b737..8852b16 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -234,6 +234,7 @@ public class StmtExecutor {
}
if (parsedStmt instanceof QueryStmt) {
+ context.getState().setIsQuery(true);
int retryTime = Config.max_query_retry_time;
for (int i = 0; i < retryTime; i ++) {
try {
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
index 6a80e8a..fc3930e 100644
--- a/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatFlags.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.thrift.HeartbeatServiceConstants;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -37,7 +38,7 @@ public class HeartbeatFlags {
return "alpha".equalsIgnoreCase(rowsetType) ||
"beta".equalsIgnoreCase(rowsetType);
}
- public long getHeartbeatFlags () {
+ public long getHeartbeatFlags() {
long heartbeatFlags = 0;
try {
String defaultRowsetType = VariableMgr.getValue(null, new
SysVariableDesc(SessionVariable.DEFAULT_ROWSET_TYPE, SetType.GLOBAL));
diff --git a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
index c2cea4c..543fe05 100644
--- a/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
+++ b/fe/src/test/java/org/apache/doris/alter/AlterJobV2Test.java
@@ -23,25 +23,20 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.qe.ShowResultSet;
-import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException;
-import org.apache.doris.utframe.MockedFrontend.FeStartException;
-import org.apache.doris.utframe.MockedFrontend.NotInitException;
-
+import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.utframe.UtFrameUtils;
+
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
import java.util.Map;
import java.util.UUID;
@@ -54,6 +49,9 @@ public class AlterJobV2Test {
@BeforeClass
public static void beforeClass() throws Exception {
+ FeConstants.default_scheduler_interval_millisecond = 1000;
+ FeConstants.runningUnitTest = true;
+
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context
@@ -64,6 +62,8 @@ public class AlterJobV2Test {
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3
int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
+
+ createTable("CREATE TABLE test.segmentv2(k1 int, k2 int, v1 int sum)
distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
}
@AfterClass
@@ -110,7 +110,6 @@ public class AlterJobV2Test {
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 2. check alter job
Map<Long, AlterJobV2> alterJobs =
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
- Assert.assertEquals(1, alterJobs.size());
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println("alter job " + alterJobV2.getDbId() + " is
running. state: " + alterJobV2.getJobState());
@@ -127,4 +126,78 @@ public class AlterJobV2Test {
System.out.println(showResultSet.getMetaData());
System.out.println(showResultSet.getResultRows());
}
+
+ @Test
+ public void testAlterSegmentV2() throws Exception {
+ Database db =
Catalog.getCurrentCatalog().getDb("default_cluster:test");
+ Assert.assertNotNull(db);
+ OlapTable tbl = (OlapTable) db.getTable("segmentv2");
+ Assert.assertNotNull(tbl);
+ Assert.assertEquals(TStorageFormat.DEFAULT,
tbl.getTableProperty().getStorageFormat());
+
+ // 1. create a rollup r1
+ String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2,
v1)";
+ AlterTableStmt alterTableStmt = (AlterTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ Map<Long, AlterJobV2> alterJobs =
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
running. state: " + alterJobV2.getJobState());
+ Thread.sleep(1000);
+ }
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED,
alterJobV2.getJobState());
+ }
+
+ String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
+ String explainString =
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
+ Assert.assertTrue(explainString.contains("rollup: r1"));
+
+ // 2. create a rollup with segment v2
+ alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2,
v1) properties('storage_format' = 'v2')";
+ alterTableStmt = (AlterTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ alterJobs =
Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
running. state: " + alterJobV2.getJobState());
+ Thread.sleep(1000);
+ }
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED,
alterJobV2.getJobState());
+ }
+
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
"explain " + sql);
+ Assert.assertTrue(explainString.contains("rollup: r1"));
+
+ // set use_v2_rollup = true;
+ connectContext.getSessionVariable().setUseV2Rollup(true);
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
"explain " + sql);
+ Assert.assertTrue(explainString.contains("rollup: __v2_segmentv2"));
+
+ // 3. process alter segment v2
+ alterStmtStr = "alter table test.segmentv2 set ('storage_format' =
'v2');";
+ alterTableStmt = (AlterTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
+
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ // 4. check alter job
+ alterJobs =
Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
+ for (AlterJobV2 alterJobV2 : alterJobs.values()) {
+ while (!alterJobV2.getJobState().isFinalState()) {
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
running. state: " + alterJobV2.getJobState());
+ Thread.sleep(1000);
+ }
+ System.out.println("alter job " + alterJobV2.getDbId() + " is
done. state: " + alterJobV2.getJobState());
+ Assert.assertEquals(AlterJobV2.JobState.FINISHED,
alterJobV2.getJobState());
+ }
+ // 5. check storage format of table
+ Assert.assertEquals(TStorageFormat.V2,
tbl.getTableProperty().getStorageFormat());
+
+ // 6. alter again, that no job will be created
+ try {
+
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
+ Assert.fail();
+ } catch (DdlException e) {
+ Assert.assertTrue(e.getMessage().contains("Nothing is changed"));
+ }
+ }
}
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 8d7fe95..a45cdd9 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -190,8 +190,8 @@ public class AuditLoaderPlugin extends Plugin implements
AuditPlugin {
public String frontendHostPort = "127.0.0.1:9030";
public String user = "root";
public String password = "";
- public String database = "__doris_audit_db";
- public String table = "__doris_audit_tbl";
+ public String database = "doris_audit_db__";
+ public String table = "doris_audit_tbl__";
public void init(Map<String, String> properties) throws
PluginException {
try {
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 8dbac8f..d7fd87d 100644
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -94,6 +94,7 @@ public class DorisStreamLoader {
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_fiter_ratio", "1.0");
+ conn.addRequestProperty("columns", "query_id, time, client_ip,
user, db, state, query_time, scan_bytes, scan_rows, return_rows, stmt_id,
is_query, frontend_ip, stmt");
conn.setDoOutput(true);
conn.setDoInput(true);
@@ -148,4 +149,4 @@ public class DorisStreamLoader {
return sb.toString();
}
}
-}
\ No newline at end of file
+}
diff --git a/tools/show_segment_status/README.md
b/tools/show_segment_status/README.md
index 5eacd87..ba00e7c 100644
--- a/tools/show_segment_status/README.md
+++ b/tools/show_segment_status/README.md
@@ -50,8 +50,9 @@ $ python setup.py install
# Output Example Format
-```
+The first number is about segment v2, the latter one is the total count.
+```
==========SUMMARY()===========
rowset_count: 289845 / 289845
rowset_disk_size: 84627551189 / 84627551189
@@ -67,5 +68,4 @@ rowset_count: 79650 / 79650
rowset_disk_size: 24473921575 / 24473921575
rowset_row_count: 331449328 / 331449328
===========================================================
-
```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]