This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c5116c5d31b [fix](schema change) reduce memory usage of alter
multi-column statement (#33073)
c5116c5d31b is described below
commit c5116c5d31be440bba343084f3599f0b029bb293
Author: Luwei <[email protected]>
AuthorDate: Sat Apr 27 13:59:10 2024 +0800
[fix](schema change) reduce memory usage of alter multi-column statement
(#33073)
---
.../java/org/apache/doris/alter/RollupJobV2.java | 9 ++--
.../org/apache/doris/alter/SchemaChangeJobV2.java | 7 +--
.../org/apache/doris/task/AlterReplicaTask.java | 52 +++++++++++++++-------
3 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index ec0868637e7..93aceb0936b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
-import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -86,6 +85,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -388,7 +388,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
}
tbl.readLock();
- Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
+ Map<Object, Object> objectPool = new ConcurrentHashMap<Object,
Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
@@ -401,14 +401,13 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
// the rollup task will transform the data before visible
version(included).
long visibleVersion = partition.getVisibleVersion();
+ Map<String, Expr> defineExprs = Maps.newHashMap();
MaterializedIndex rollupIndex = entry.getValue();
Map<Long, Long> tabletIdMap =
this.partitionIdToBaseRollupTabletIdMap.get(partitionId);
for (Tablet rollupTablet : rollupIndex.getTablets()) {
long rollupTabletId = rollupTablet.getId();
long baseTabletId = tabletIdMap.get(rollupTabletId);
- Map<String, Expr> defineExprs = Maps.newHashMap();
-
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc =
descTable.createTupleDescriptor();
Map<String, SlotDescriptor> descMap = new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
@@ -470,7 +469,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
partitionId, rollupIndexId, baseIndexId,
rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash,
baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable,
tbl.getSchemaByIndexId(baseIndexId, true),
- tcloumnsPool, whereClause, expiration,
vaultId);
+ objectPool, whereClause, expiration, vaultId);
rollupBatchTask.addTask(rollupTask);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index f00177ec537..277a3411541 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
-import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -76,6 +75,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -418,7 +418,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
tbl.readLock();
- Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
+ Map<Object, Object> objectPool = new ConcurrentHashMap<Object,
Object>();
String vaultId = tbl.getStorageVaultId();
try {
long expiration = (createTimeMs + timeoutMs) / 1000;
@@ -478,12 +478,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
AlterReplicaTask rollupTask = new
AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId,
originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash,
originSchemaHash, visibleVersion, jobId,
- JobType.SCHEMA_CHANGE, defineExprs,
descTable, originSchemaColumns, tcloumnsPool,
+ JobType.SCHEMA_CHANGE, defineExprs,
descTable, originSchemaColumns, objectPool,
null, expiration, vaultId);
schemaChangeBatchTask.addTask(rollupTask);
}
}
}
+
} // end for partitions
} finally {
tbl.readUnlock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index ebf505e454a..c95cc267076 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -42,6 +42,7 @@ import java.util.Map;
* The new replica can be a rollup replica, or a shadow replica of schema
change.
*/
public class AlterReplicaTask extends AgentTask {
+
private long baseTabletId;
private long newReplicaId;
private int baseSchemaHash;
@@ -53,7 +54,7 @@ public class AlterReplicaTask extends AgentTask {
private Map<String, Expr> defineExprs;
private Expr whereClause;
private DescriptorTable descTable;
- private Map<Object, List<TColumn>> tcloumnsPool;
+ private Map<Object, Object> objectPool;
private List<Column> baseSchemaColumns;
private long expiration;
@@ -67,7 +68,7 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long
partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long
newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType
jobType, Map<String, Expr> defineExprs,
- DescriptorTable descTable, List<Column> baseSchemaColumns,
Map<Object, List<TColumn>> tcloumnsPool,
+ DescriptorTable descTable, List<Column> baseSchemaColumns,
Map<Object, Object> objectPool,
Expr whereClause, long expiration, String vaultId) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId,
rollupIndexId, rollupTabletId);
@@ -85,7 +86,7 @@ public class AlterReplicaTask extends AgentTask {
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
- this.tcloumnsPool = tcloumnsPool;
+ this.objectPool = objectPool;
this.expiration = expiration;
this.vaultId = vaultId;
}
@@ -134,32 +135,51 @@ public class AlterReplicaTask extends AgentTask {
default:
break;
}
+
if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
- List<SlotRef> slots = Lists.newArrayList();
- entry.getValue().collect(SlotRef.class, slots);
- TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(entry.getKey());
- mvParam.setMvExpr(entry.getValue().treeToThrift());
- req.addToMaterializedViewParams(mvParam);
+ Object value = objectPool.get(entry.getKey());
+ if (value == null) {
+ List<SlotRef> slots = Lists.newArrayList();
+ entry.getValue().collect(SlotRef.class, slots);
+ TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(entry.getKey());
+ mvParam.setMvExpr(entry.getValue().treeToThrift());
+ req.addToMaterializedViewParams(mvParam);
+ objectPool.put(entry.getKey(), mvParam);
+ } else {
+ TAlterMaterializedViewParam mvParam =
(TAlterMaterializedViewParam) value;
+ req.addToMaterializedViewParams(mvParam);
+ }
}
}
+
if (whereClause != null) {
- TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(Column.WHERE_SIGN);
- mvParam.setMvExpr(whereClause.treeToThrift());
- req.addToMaterializedViewParams(mvParam);
+ Object value = objectPool.get(Column.WHERE_SIGN);
+ if (value == null) {
+ TAlterMaterializedViewParam mvParam = new
TAlterMaterializedViewParam(Column.WHERE_SIGN);
+ mvParam.setMvExpr(whereClause.treeToThrift());
+ req.addToMaterializedViewParams(mvParam);
+ objectPool.put(Column.WHERE_SIGN, mvParam);
+ } else {
+ TAlterMaterializedViewParam mvParam =
(TAlterMaterializedViewParam) value;
+ req.addToMaterializedViewParams(mvParam);
+ }
}
req.setDescTbl(descTable.toThrift());
if (baseSchemaColumns != null) {
- List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
- if (columns == null) {
- columns = new ArrayList<TColumn>();
+ Object value = objectPool.get(baseSchemaColumns);
+ if (value == null) {
+ List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
- tcloumnsPool.put(baseSchemaColumns, columns);
+ objectPool.put(baseSchemaColumns, columns);
+ req.setColumns(columns);
+ } else {
+ List<TColumn> columns = (List<TColumn>) value;
+ req.setColumns(columns);
}
- req.setColumns(columns);
}
req.setStorageVaultId(this.vaultId);
return req;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]