This is an automated email from the ASF dual-hosted git repository. adonisling pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 96a3c60d3b [feature-wip](MTMV) Support alter statement (#16817) 96a3c60d3b is described below commit 96a3c60d3b9c415cad263fcdfc95dc0ca30db848 Author: huangzhaowei <carlmartin...@gmail.com> AuthorDate: Sun Feb 19 12:15:17 2023 +0800 [feature-wip](MTMV) Support alter statement (#16817) Steps: 1. drop the old MTMV jobs 2. clear the old task records and clean the running and pending tasks 3. set the new scheduler info in MTMV and replay it in followers. 4. create a job in the master node. Note that if you change the refresh info of MTMV, the old MTMV tasks will be cleaned. --- .../main/java/org/apache/doris/alter/Alter.java | 43 +++++++++++++-- .../main/java/org/apache/doris/catalog/Env.java | 4 +- .../org/apache/doris/catalog/MaterializedView.java | 4 ++ .../org/apache/doris/journal/JournalEntity.java | 6 +++ .../java/org/apache/doris/mtmv/MTMVJobManager.java | 15 +++++- .../org/apache/doris/mtmv/MTMVTaskManager.java | 3 ++ .../doris/persist/AlterMultiMaterializedView.java | 61 ++++++++++++++++++++++ .../java/org/apache/doris/persist/EditLog.java | 9 ++++ .../org/apache/doris/persist/OperationType.java | 2 + .../suites/mtmv_p0/test_create_mtmv.groovy | 12 +++++ 10 files changed, 152 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 16298fff41..8b256a0ebb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -19,7 +19,6 @@ package org.apache.doris.alter; import org.apache.doris.analysis.AddPartitionClause; import org.apache.doris.analysis.AlterClause; -import org.apache.doris.analysis.AlterMaterializedViewStmt; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.AlterViewStmt; @@ -47,6 +46,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DataProperty; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedView; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; @@ -60,13 +60,17 @@ import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.mtmv.MTMVJobFactory; import org.apache.doris.mtmv.MTMVUtils.TaskSubmitStatus; +import org.apache.doris.mtmv.metadata.MTMVJob; +import org.apache.doris.persist.AlterMultiMaterializedView; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; import org.apache.doris.persist.ModifyCommentOperationLog; @@ -507,10 +511,39 @@ public class Alter { } } - public void processAlterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException { - TableName tbl = stmt.getTable(); - Env.getCurrentEnv().getInternalCatalog().getDb(tbl.getDb()); - throw new DdlException("ALTER MATERIALIZED VIEW is not implemented: " + stmt.toSql()); + public void processAlterMaterializedView(AlterMultiMaterializedView alterView, boolean isReplay) + throws UserException { + TableName tbl = alterView.getMvName(); + MaterializedView olapTable = null; + try { + // 1. check mv exist + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb()); + olapTable = (MaterializedView) db.getTableOrMetaException(tbl.getTbl(), TableType.MATERIALIZED_VIEW); + + // 2. drop old job and kill the associated tasks + Env.getCurrentEnv().getMTMVJobManager().dropJobByName(tbl.getDb(), tbl.getTbl()); + + // 3. overwrite the refresh info in the memory of fe. + olapTable.writeLock(); + olapTable.setRefreshInfo(alterView.getInfo()); + + // 4. log it and replay it in the follower + if (!isReplay) { + Env.getCurrentEnv().getEditLog().logAlterMTMV(alterView); + // 5. master node generate new jobs + if (Config.enable_mtmv_scheduler_framework && MTMVJobFactory.isGenerateJob(olapTable)) { + List<MTMVJob> jobs = MTMVJobFactory.buildJob(olapTable, db.getFullName()); + for (MTMVJob job : jobs) { + Env.getCurrentEnv().getMTMVJobManager().createJob(job, false); + } + LOG.info("Alter mv success with new mv job created."); + } + } + } finally { + if (olapTable != null) { + olapTable.writeUnlock(); + } + } } // entry of processing replace table diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 4e1f10dd3f..008aa04143 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -171,6 +171,7 @@ import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.AlterMultiMaterializedView; import org.apache.doris.persist.BackendIdsUpdateInfo; import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; @@ -3666,7 +3667,8 @@ public class Env { } public void alterMaterializedView(AlterMaterializedViewStmt stmt) throws UserException { - this.alter.processAlterMaterializedView(stmt); + AlterMultiMaterializedView alter = new AlterMultiMaterializedView(stmt.getTable(), stmt.getRefreshInfo()); + this.alter.processAlterMaterializedView(alter, false); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java index b222fbec82..3de69dda8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java @@ -88,6 +88,10 @@ public class MaterializedView extends OlapTable { return refreshInfo; } + public void setRefreshInfo(MVRefreshInfo info) { + refreshInfo = info; + } + public String getQuery() { return query; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 9162cd837d..2cadc0efe1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -61,6 +61,7 @@ import org.apache.doris.mtmv.metadata.DropMTMVTask; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.mysql.privilege.UserPropertyInfo; +import org.apache.doris.persist.AlterMultiMaterializedView; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.AlterUserOperationLog; import org.apache.doris.persist.AlterViewInfo; @@ -776,6 +777,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_ALTER_MTMV_STMT: { + data = AlterMultiMaterializedView.read(in); + isRead = true; + break; + } case OperationType.OP_ALTER_USER: { data = AlterUserOperationLog.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 607541a7f4..642a2452ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -44,6 +44,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; @@ -283,8 +284,20 @@ public class MTMVJobManager { LOG.info("change job:{}", changeJob.getJobId()); } + public void dropJobByName(String dbName, String mvName) { + for (String jobName : nameToJobMap.keySet()) { + MTMVJob job = nameToJobMap.get(jobName); + if (job.getMVName().equals(mvName) && job.getDBName().equals(dbName)) { + dropJobs(Collections.singletonList(job.getId()), false); + return; + } + } + } + public void dropJobs(List<Long> jobIds, boolean isReplay) { - // keep nameToJobMap and manualTaskMap consist + if (jobIds.isEmpty()) { + return; + } if (!tryLock()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index 44629a7844..615fb2f37c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -442,6 +442,9 @@ public class MTMVTaskManager { } public void dropTasks(List<String> taskIds, boolean isReplay) { + if (taskIds.isEmpty()) { + return; + } if (!tryLock()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMultiMaterializedView.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMultiMaterializedView.java new file mode 100644 index 0000000000..11c675370e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterMultiMaterializedView.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.analysis.MVRefreshInfo; +import org.apache.doris.analysis.TableName; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class AlterMultiMaterializedView implements Writable { + + @SerializedName(value = "mvName") + private TableName mvName; + + @SerializedName(value = "info") + private MVRefreshInfo info; + + public AlterMultiMaterializedView(TableName mvName, MVRefreshInfo info) { + this.mvName = mvName; + this.info = info; + } + + public MVRefreshInfo getInfo() { + return info; + } + + public TableName getMvName() { + return mvName; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + public static AlterMultiMaterializedView read(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), AlterMultiMaterializedView.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index cfcac88a62..8d4d0212cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -915,6 +915,11 @@ public class EditLog { env.getMTMVJobManager().replayDropJobTasks(dropTask.getTaskIds()); break; } + case OperationType.OP_ALTER_MTMV_STMT: { + final AlterMultiMaterializedView alterView = (AlterMultiMaterializedView) journal.getData(); + env.getAlterInstance().processAlterMaterializedView(alterView, true); + break; + } case OperationType.OP_ALTER_USER: { final AlterUserOperationLog log = (AlterUserOperationLog) journal.getData(); env.getAuth().replayAlterUser(log); @@ -1713,4 +1718,8 @@ public class EditLog { public void logAlterUser(AlterUserOperationLog log) { logEdit(OperationType.OP_ALTER_USER, log); } + + public void logAlterMTMV(AlterMultiMaterializedView log) { + logEdit(OperationType.OP_ALTER_MTMV_STMT, log); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index bc99c80359..a8dfb59078 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -258,6 +258,8 @@ public class OperationType { public static final short OP_DROP_MTMV_TASK = 341; public static final short OP_CHANGE_MTMV_TASK = 342; + public static final short OP_ALTER_MTMV_STMT = 345; + public static final short OP_DROP_EXTERNAL_TABLE = 350; public static final short OP_DROP_EXTERNAL_DB = 351; public static final short OP_CREATE_EXTERNAL_TABLE = 352; diff --git a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy index c506519912..47d151c96f 100644 --- a/regression-test/suites/mtmv_p0/test_create_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_create_mtmv.groovy @@ -125,6 +125,18 @@ suite("test_create_mtmv") { assertEquals 'SUCCESS', state, show_task_result.last().toString() assertEquals 2, show_task_result.size() + // test alter mtmv + sql """ + alter MATERIALIZED VIEW ${mvName} REFRESH COMPLETE start with "2022-11-03 00:00:00" next 2 DAY + """ + show_job_meta = sql_meta "SHOW MTMV JOB ON ${mvName}" + def scheduleIndex = show_job_meta.indexOf(['Schedule', 'CHAR']) + + show_job_result = sql "SHOW MTMV JOB ON ${mvName}" + assertEquals 1, show_job_result.size() + + assertEquals 'START 2022-11-03T00:00 EVERY(2 DAYS)', show_job_result.last().get(scheduleIndex).toString(), show_job_result.last().toString() + sql """ DROP MATERIALIZED VIEW ${mvName} """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org