This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new a2a13da [Optimize] Make light schema change complete more faster under concurrent conditions (#6292) a2a13da is described below commit a2a13dadba57dac8d3960334501acc8722f6c11c Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Sun Aug 29 09:41:56 2021 +0800 [Optimize] Make light schema change complete more faster under concurrent conditions (#6292) * [Optimize] Make schema change complete more faster under concurrent conditions Co-authored-by: caiconghui <caicong...@xiaomi.com> --- .../java/org/apache/doris/alter/AlterHandler.java | 6 +- .../apache/doris/alter/SchemaChangeHandler.java | 77 +++++++++++++++++++++- .../java/org/apache/doris/common/FeConstants.java | 3 + 3 files changed, 82 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java index 4cf206a..61fdcca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java @@ -86,7 +86,11 @@ public abstract class AlterHandler extends MasterDaemon { } public AlterHandler(String name) { - super(name, FeConstants.default_scheduler_interval_millisecond); + this(name, FeConstants.default_scheduler_interval_millisecond); + } + + public AlterHandler(String name, int scheduler_interval_millisecond) { + super(name, scheduler_interval_millisecond); } protected void addAlterJobV2(AlterJobV2 alterJob) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index a109192..e25a27f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -61,12 +61,14 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.RemoveAlterJobV2OperationLog; import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; @@ -97,6 +99,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class SchemaChangeHandler extends AlterHandler { @@ -105,8 +108,20 @@ public class SchemaChangeHandler extends AlterHandler { // all shadow indexes should have this prefix in name public static final String SHADOW_NAME_PRFIX = "__doris_shadow_"; + public static final int MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE = 10; + + public static final int CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB = 20; + + public final ThreadPoolExecutor schemaChangeThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE, "schema-change-pool", true); + + public final Map<Long, AlterJobV2> activeSchemaChangeJobsV2 = Maps.newConcurrentMap(); + + public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap(); + + public int cycle_count = 0; + public SchemaChangeHandler() { - super("schema change"); + super("schema change", FeConstants.default_schema_change_scheduler_interval_millisecond); } private void processAddColumn(AddColumnClause alterClause, OlapTable olapTable, @@ -1387,13 +1402,36 @@ public class SchemaChangeHandler extends AlterHandler { @Override protected void runAfterCatalogReady() { - super.runAfterCatalogReady(); + if (cycle_count >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) { + clearFinishedOrCancelledSchemaChangeJobV2(); + super.runAfterCatalogReady(); + cycle_count = 0; + } runOldAlterJob(); runAlterJobV2(); + cycle_count++; } private void runAlterJobV2() { - alterJobsV2.values().forEach(AlterJobV2::run); + runnableSchemaChangeJobV2.values().forEach( + alterJobsV2 -> { + if (!alterJobsV2.isDone() && !activeSchemaChangeJobsV2.containsKey(alterJobsV2.getJobId()) && + activeSchemaChangeJobsV2.size() < MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE) { + if (FeConstants.runningUnitTest) { + alterJobsV2.run(); + } else { + schemaChangeThreadPool.submit(() -> { + if (activeSchemaChangeJobsV2.putIfAbsent(alterJobsV2.getJobId(), alterJobsV2) == null) { + try { + alterJobsV2.run(); + } finally { + activeSchemaChangeJobsV2.remove(alterJobsV2.getJobId()); + } + } + }); + } + } + }); } @Deprecated @@ -1999,4 +2037,37 @@ public class SchemaChangeHandler extends AlterHandler { } } } + + @Override + protected void addAlterJobV2(AlterJobV2 alterJob) { + super.addAlterJobV2(alterJob); + runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob); + } + + + private void clearFinishedOrCancelledSchemaChangeJobV2() { + Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator(); + while (iterator.hasNext()) { + AlterJobV2 alterJobV2 = iterator.next().getValue(); + if (alterJobV2.isDone()) { + iterator.remove(); + } + } + } + + @Override + public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) { + if (runnableSchemaChangeJobV2.containsKey(log.getJobId())) { + runnableSchemaChangeJobV2.remove(log.getJobId()); + } + super.replayRemoveAlterJobV2(log); + } + + @Override + public void replayAlterJobV2(AlterJobV2 alterJob) { + if (!alterJob.isDone() && !runnableSchemaChangeJobV2.containsKey(alterJob.getJobId())) { + runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob); + } + super.replayAlterJobV2(alterJob); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index fc5b18d..b997939 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -47,6 +47,9 @@ public class FeConstants { // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; + // default schema change scheduler interval is 500 millisecond + public static int default_schema_change_scheduler_interval_millisecond = 500; + // general model // Current meta data version. Use this version to write journals and image public static int meta_version = FeMetaVersion.VERSION_CURRENT; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org