This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch high-priority-column in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8631bcafdc3676a4c6ec36a6b81dee81b179bd86 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Thu Mar 7 15:02:02 2024 +0800 Support show auto analyze pending jobs. (#31926) --- fe/fe-core/src/main/cup/sql_parser.cup | 4 + .../doris/analysis/ShowAutoAnalyzeJobsStmt.java | 210 +++++++++++++++++++++ .../java/org/apache/doris/qe/ShowExecutor.java | 34 ++++ .../apache/doris/statistics/AnalysisManager.java | 40 +++- .../doris/statistics/AutoAnalysisPendingJob.java | 50 +++++ .../org/apache/doris/statistics/JobPriority.java | 24 +++ .../doris/statistics/StatisticsAutoCollector.java | 16 +- .../doris/statistics/StatisticsJobAppender.java | 25 ++- 8 files changed, 383 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 22dd017b68a..149d03999f0 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4361,6 +4361,10 @@ show_param ::= {: RESULT = new ShowAnalyzeStmt(tbl, parser.where, true); :} + | KW_AUTO KW_JOBS opt_table_name:tbl opt_wild_where + {: + RESULT = new ShowAutoAnalyzeJobsStmt(tbl, parser.where); + :} | KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId {: RESULT = new ShowAnalyzeTaskStatus(jobId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java new file mode 100644 index 00000000000..560387fa5bc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.statistics.JobPriority; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + +/** + * ShowAutoAnalyzeJobsStmt is used to show pending auto analysis jobs. + * syntax: + * SHOW AUTO ANALYZE JOBS + * [TABLE] + * [ + * WHERE + * [PRIORITY = ["HIGH"|"MID"|"LOW"]] + * ] + */ +public class ShowAutoAnalyzeJobsStmt extends ShowStmt { + private static final String PRIORITY = "priority"; + private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("catalog_name") + .add("db_name") + .add("tbl_name") + .add("col_list") + .add("priority") + .build(); + + private final TableName tableName; + private final Expr whereClause; + + public ShowAutoAnalyzeJobsStmt(TableName tableName, Expr whereClause) { + this.tableName = tableName; + this.whereClause = whereClause; + } + + // extract from predicate + private String jobPriority; + + public String getPriority() { + Preconditions.checkArgument(isAnalyzed(), + "The stateValue must be obtained after the parsing is complete"); + return jobPriority; + } + + public Expr getWhereClause() { + Preconditions.checkArgument(isAnalyzed(), + "The whereClause must be obtained after the parsing is complete"); + return whereClause; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + if (!ConnectContext.get().getSessionVariable().enableStats) { + throw new UserException("Analyze function is forbidden, you should add `enable_stats=true`" + + "in your FE conf file"); + } + super.analyze(analyzer); + if (tableName != null) { + tableName.analyze(analyzer); + String catalogName = tableName.getCtl(); + String dbName = tableName.getDb(); + String tblName = tableName.getTbl(); + checkShowAnalyzePriv(catalogName, dbName, tblName); + } + + // analyze where clause if not null + if (whereClause != null) { + analyzeSubPredicate(whereClause); + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(128))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } + + private void checkShowAnalyzePriv(String catalogName, String dbName, String tblName) throws AnalysisException { + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), catalogName, dbName, tblName, PrivPredicate.SHOW)) { + ErrorReport.reportAnalysisException( + ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, + "SHOW ANALYZE", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + dbName + ": " + tblName); + } + } + + private void analyzeSubPredicate(Expr subExpr) throws AnalysisException { + if (subExpr == null) { + return; + } + + boolean valid = true; + + CHECK: { + if (subExpr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr; + if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) { + valid = false; + break CHECK; + } + } else { + valid = false; + break CHECK; + } + + // left child + if (!(subExpr.getChild(0) instanceof SlotRef)) { + valid = false; + break CHECK; + } + String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName(); + if (!PRIORITY.equalsIgnoreCase(leftKey)) { + valid = false; + break CHECK; + } + + // right child + if (!(subExpr.getChild(1) instanceof StringLiteral)) { + valid = false; + break CHECK; + } + + String value = subExpr.getChild(1).getStringValue(); + if (Strings.isNullOrEmpty(value)) { + valid = false; + break CHECK; + } + + jobPriority = value.toUpperCase(); + try { + JobPriority.valueOf(jobPriority); + } catch (Exception e) { + valid = false; + } + } + + if (!valid) { + throw new AnalysisException("Where clause should looks like: " + + "PRIORITY = \"HIGH|MID|LOW\""); + } + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("SHOW AUTO ANALYZE"); + + if (tableName != null) { + sb.append(" "); + sb.append(tableName.toSql()); + } + + if (whereClause != null) { + sb.append(" "); + sb.append("WHERE"); + sb.append(" "); + sb.append(whereClause.toSql()); + } + + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } + + public TableName getTableName() { + return tableName; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 53ff62edb68..670ec85f6f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -30,6 +30,7 @@ import org.apache.doris.analysis.ShowAlterStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.analysis.ShowAnalyzeTaskStatus; import org.apache.doris.analysis.ShowAuthorStmt; +import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt; import org.apache.doris.analysis.ShowBackendsStmt; import org.apache.doris.analysis.ShowBackupStmt; import org.apache.doris.analysis.ShowBrokerStmt; @@ -203,6 +204,7 @@ import org.apache.doris.mysql.privilege.PrivBitSet; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.mysql.privilege.Privilege; import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.AutoAnalysisPendingJob; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.statistics.ResultRow; @@ -440,6 +442,8 @@ public class ShowExecutor { handleShowCreateCatalog(); } else if (stmt instanceof ShowAnalyzeStmt) { handleShowAnalyze(); + } else if (stmt instanceof ShowAutoAnalyzeJobsStmt) { + handleShowAutoAnalyzePendingJobs(); } else if (stmt instanceof ShowTabletsBelongStmt) { handleShowTabletsBelong(); } else if (stmt instanceof AdminCopyTabletStmt) { @@ -2873,6 +2877,36 @@ public class ShowExecutor { resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows); } + private void handleShowAutoAnalyzePendingJobs() { + ShowAutoAnalyzeJobsStmt showStmt = (ShowAutoAnalyzeJobsStmt) stmt; + List<AutoAnalysisPendingJob> jobs = Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(showStmt); + List<List<String>> resultRows = Lists.newArrayList(); + for (AutoAnalysisPendingJob job : jobs) { + try { + List<String> row = new ArrayList<>(); + CatalogIf<? extends DatabaseIf<? extends TableIf>> c + = StatisticsUtil.findCatalog(job.catalogName); + row.add(c.getName()); + Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(job.dbName); + row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted"); + if (databaseIf.isPresent()) { + Optional<? extends TableIf> table = databaseIf.get().getTable(job.tableName); + row.add(table.isPresent() ? table.get().getName() : "Table may get deleted"); + } else { + row.add("DB may get deleted"); + } + row.add(job.getColumnNames()); + row.add(String.valueOf(job.priority)); + resultRows.add(row); + } catch (Exception e) { + LOG.warn("Failed to get pending jobs for table {}.{}.{}, reason: {}", + job.catalogName, job.dbName, job.tableName, e.getMessage()); + continue; + } + } + resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows); + } + private void handleShowTabletsBelong() { ShowTabletsBelongStmt showStmt = (ShowTabletsBelongStmt) stmt; List<List<String>> rows = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index e2bd85a0f43..c6713b3a7c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.DropAnalyzeJobStmt; import org.apache.doris.analysis.DropStatsStmt; import org.apache.doris.analysis.KillAnalysisJobStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; +import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; @@ -113,9 +114,9 @@ public class AnalysisManager implements Writable { private static final int COLUMN_QUEUE_SIZE = 1000; public final Queue<HighPriorityColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); public final Queue<HighPriorityColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE); - public final Map<TableIf, Set<String>> highPriorityJobs = new LinkedHashMap<>(); - public final Map<TableIf, Set<String>> midPriorityJobs = new LinkedHashMap<>(); - public final Map<TableIf, Set<String>> lowPriorityJobs = new LinkedHashMap<>(); + public final Map<TableName, Set<String>> highPriorityJobs = new LinkedHashMap<>(); + public final Map<TableName, Set<String>> midPriorityJobs = new LinkedHashMap<>(); + public final Map<TableName, Set<String>> lowPriorityJobs = new LinkedHashMap<>(); // Tracking running manually submitted async tasks, keep in mem only protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); @@ -598,6 +599,39 @@ public class AnalysisManager implements Writable { } } + public List<AutoAnalysisPendingJob> showAutoPendingJobs(ShowAutoAnalyzeJobsStmt stmt) { + TableName tblName = stmt.getTableName(); + String priority = stmt.getPriority(); + List<AutoAnalysisPendingJob> result = Lists.newArrayList(); + if (priority == null || priority.isEmpty()) { + result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName)); + result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName)); + result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName)); + } else if (priority.equals(JobPriority.HIGH.name())) { + result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName)); + } else if (priority.equals(JobPriority.MID.name())) { + result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName)); + } else if (priority.equals(JobPriority.LOW.name())) { + result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName)); + } + return result; + } + + protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, Set<String>> jobMap, + JobPriority priority, TableName tblName) { + List<AutoAnalysisPendingJob> result = Lists.newArrayList(); + synchronized (jobMap) { + for (Entry<TableName, Set<String>> entry : jobMap.entrySet()) { + TableName table = entry.getKey(); + if (tblName == null || tblName.equals(table)) { + result.add(new AutoAnalysisPendingJob(table.getCtl(), + table.getDb(), table.getTbl(), entry.getValue(), priority)); + } + } + } + return result; + } + public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) { return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java new file mode 100644 index 00000000000..ddd06d17c81 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import java.util.Set; +import java.util.StringJoiner; + +public class AutoAnalysisPendingJob { + + public final String catalogName; + public final String dbName; + public final String tableName; + public final Set<String> columnNames; + public final JobPriority priority; + + public AutoAnalysisPendingJob(String catalogName, String dbName, String tableName, + Set<String> columnNames, JobPriority priority) { + this.catalogName = catalogName; + this.dbName = dbName; + this.tableName = tableName; + this.columnNames = columnNames; + this.priority = priority; + } + + public String getColumnNames() { + if (columnNames == null) { + return ""; + } + StringJoiner stringJoiner = new StringJoiner(","); + for (String colName : columnNames) { + stringJoiner.add(colName); + } + return stringJoiner.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java new file mode 100644 index 00000000000..2786b063563 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public enum JobPriority { + HIGH, + MID, + LOW; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index e0df94b5cb0..227074dbb5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -17,6 +17,7 @@ package org.apache.doris.statistics; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; @@ -57,13 +58,14 @@ public class StatisticsAutoCollector extends StatisticsCollector { @Override protected void collect() { while (canCollect()) { - Map.Entry<TableIf, Set<String>> job = getJob(); + Map.Entry<TableName, Set<String>> job = getJob(); if (job == null) { // No more job to process, break and sleep. break; } try { - TableIf table = job.getKey(); + TableName tblName = job.getKey(); + TableIf table = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl()); if (!supportAutoAnalyze(table)) { continue; } @@ -78,7 +80,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { processOneJob(table, columns); } catch (Exception e) { LOG.warn("Failed to analyze table {} with columns [{}]", - job.getKey().getName(), job.getValue().stream().collect(Collectors.joining(",")), e); + job.getKey().getTbl(), job.getValue().stream().collect(Collectors.joining(",")), e); } } } @@ -88,9 +90,9 @@ public class StatisticsAutoCollector extends StatisticsCollector { && StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId())); } - protected Map.Entry<TableIf, Set<String>> getJob() { + protected Map.Entry<TableName, Set<String>> getJob() { AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); - Optional<Map.Entry<TableIf, Set<String>>> job = fetchJobFromMap(manager.highPriorityJobs); + Optional<Map.Entry<TableName, Set<String>>> job = fetchJobFromMap(manager.highPriorityJobs); if (job.isPresent()) { return job.get(); } @@ -102,9 +104,9 @@ public class StatisticsAutoCollector extends StatisticsCollector { return job.isPresent() ? job.get() : null; } - protected Optional<Map.Entry<TableIf, Set<String>>> fetchJobFromMap(Map<TableIf, Set<String>> jobMap) { + protected Optional<Map.Entry<TableName, Set<String>>> fetchJobFromMap(Map<TableName, Set<String>> jobMap) { synchronized (jobMap) { - Optional<Map.Entry<TableIf, Set<String>>> first = jobMap.entrySet().stream().findFirst(); + Optional<Map.Entry<TableName, Set<String>>> first = jobMap.entrySet().stream().findFirst(); first.ifPresent(entry -> jobMap.remove(entry.getKey())); return first; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java index 71bb71d3cda..93d03a3fdb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -17,6 +17,7 @@ package org.apache.doris.statistics; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -81,31 +82,33 @@ public class StatisticsJobAppender extends MasterDaemon { } } - protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, Map<TableIf, Set<String>> jobsMap) { + protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, Map<TableName, Set<String>> jobsMap) { int size = columnQueue.size(); for (int i = 0; i < size; i++) { HighPriorityColumn column = columnQueue.poll(); LOG.info("Process column " + column.tblId + "." + column.colName); TableIf table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId); + TableName tableName = new TableName(table.getDatabase().getCatalog().getName(), + table.getDatabase().getFullName(), table.getName()); synchronized (jobsMap) { // If job map reach the upper limit, stop putting new jobs. - if (!jobsMap.containsKey(table) && jobsMap.size() >= JOB_MAP_SIZE) { + if (!jobsMap.containsKey(tableName) && jobsMap.size() >= JOB_MAP_SIZE) { LOG.info("Job map full."); break; } - if (jobsMap.containsKey(table)) { - jobsMap.get(table).add(column.colName); + if (jobsMap.containsKey(tableName)) { + jobsMap.get(tableName).add(column.colName); } else { HashSet<String> columns = new HashSet<>(); columns.add(column.colName); - jobsMap.put(table, columns); + jobsMap.put(tableName, columns); } LOG.info("Column " + column.tblId + "." + column.colName + " added"); } } } - protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) { + protected void appendToLowQueue(Map<TableName, Set<String>> jobsMap) { InternalCatalog catalog = Env.getCurrentInternalCatalog(); List<Long> sortedDbs = catalog.getDbIds().stream().sorted().collect(Collectors.toList()); int batchSize = 100; @@ -122,18 +125,20 @@ public class StatisticsJobAppender extends MasterDaemon { if (!(t instanceof OlapTable) || t.getId() <= currentTableId) { continue; } + TableName tableName = new TableName(t.getDatabase().getCatalog().getName(), + t.getDatabase().getFullName(), t.getName()); synchronized (jobsMap) { // If job map reach the upper limit, stop adding new jobs. - if (!jobsMap.containsKey(t) && jobsMap.size() >= JOB_MAP_SIZE) { + if (!jobsMap.containsKey(tableName) && jobsMap.size() >= JOB_MAP_SIZE) { return; } Set<String> columns = t.getColumns().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) .map(c -> c.getName()).collect(Collectors.toSet()); - if (jobsMap.containsKey(t)) { - jobsMap.get(t).addAll(columns); + if (jobsMap.containsKey(tableName)) { + jobsMap.get(tableName).addAll(columns); } else { - jobsMap.put(t, columns); + jobsMap.put(tableName, columns); } } currentTableId = t.getId(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org