This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8631bcafdc3676a4c6ec36a6b81dee81b179bd86
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Thu Mar 7 15:02:02 2024 +0800

    Support show auto analyze pending jobs. (#31926)
---
 fe/fe-core/src/main/cup/sql_parser.cup             |   4 +
 .../doris/analysis/ShowAutoAnalyzeJobsStmt.java    | 210 +++++++++++++++++++++
 .../java/org/apache/doris/qe/ShowExecutor.java     |  34 ++++
 .../apache/doris/statistics/AnalysisManager.java   |  40 +++-
 .../doris/statistics/AutoAnalysisPendingJob.java   |  50 +++++
 .../org/apache/doris/statistics/JobPriority.java   |  24 +++
 .../doris/statistics/StatisticsAutoCollector.java  |  16 +-
 .../doris/statistics/StatisticsJobAppender.java    |  25 ++-
 8 files changed, 383 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 22dd017b68a..149d03999f0 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -4361,6 +4361,10 @@ show_param ::=
     {:
         RESULT = new ShowAnalyzeStmt(tbl, parser.where, true);
     :}
+    | KW_AUTO KW_JOBS opt_table_name:tbl opt_wild_where
+    {:
+        RESULT = new ShowAutoAnalyzeJobsStmt(tbl, parser.where);
+    :}
     | KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId
     {:
         RESULT = new ShowAnalyzeTaskStatus(jobId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
new file mode 100644
index 00000000000..560387fa5bc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAutoAnalyzeJobsStmt.java
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.statistics.JobPriority;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * ShowAutoAnalyzeJobsStmt is used to show pending auto analysis jobs.
+ * syntax:
+ *    SHOW AUTO ANALYZE JOBS
+ *        [TABLE]
+ *        [
+ *            WHERE
+ *            [PRIORITY = ["HIGH"|"MID"|"LOW"]]
+ *        ]
+ */
+public class ShowAutoAnalyzeJobsStmt extends ShowStmt {
+    private static final String PRIORITY = "priority";
+    private static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
+            .add("catalog_name")
+            .add("db_name")
+            .add("tbl_name")
+            .add("col_list")
+            .add("priority")
+            .build();
+
+    private final TableName tableName;
+    private final Expr whereClause;
+
+    public ShowAutoAnalyzeJobsStmt(TableName tableName, Expr whereClause) {
+        this.tableName = tableName;
+        this.whereClause = whereClause;
+    }
+
+    // extract from predicate
+    private String jobPriority;
+
+    public String getPriority() {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The stateValue must be obtained after the parsing is 
complete");
+        return jobPriority;
+    }
+
+    public Expr getWhereClause() {
+        Preconditions.checkArgument(isAnalyzed(),
+                "The whereClause must be obtained after the parsing is 
complete");
+        return whereClause;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        if (!ConnectContext.get().getSessionVariable().enableStats) {
+            throw new UserException("Analyze function is forbidden, you should 
add `enable_stats=true`"
+                    + "in your FE conf file");
+        }
+        super.analyze(analyzer);
+        if (tableName != null) {
+            tableName.analyze(analyzer);
+            String catalogName = tableName.getCtl();
+            String dbName = tableName.getDb();
+            String tblName = tableName.getTbl();
+            checkShowAnalyzePriv(catalogName, dbName, tblName);
+        }
+
+        // analyze where clause if not null
+        if (whereClause != null) {
+            analyzeSubPredicate(whereClause);
+        }
+    }
+
+    @Override
+    public ShowResultSetMetaData getMetaData() {
+        ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
+        for (String title : TITLE_NAMES) {
+            builder.addColumn(new Column(title, 
ScalarType.createVarchar(128)));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
+
+    private void checkShowAnalyzePriv(String catalogName, String dbName, 
String tblName) throws AnalysisException {
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkTblPriv(ConnectContext.get(), catalogName, dbName, 
tblName, PrivPredicate.SHOW)) {
+            ErrorReport.reportAnalysisException(
+                    ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+                    "SHOW ANALYZE",
+                    ConnectContext.get().getQualifiedUser(),
+                    ConnectContext.get().getRemoteIP(),
+                    dbName + ": " + tblName);
+        }
+    }
+
+    private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {
+        if (subExpr == null) {
+            return;
+        }
+
+        boolean valid = true;
+
+        CHECK: {
+            if (subExpr instanceof BinaryPredicate) {
+                BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr;
+                if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) {
+                    valid = false;
+                    break CHECK;
+                }
+            } else {
+                valid = false;
+                break CHECK;
+            }
+
+            // left child
+            if (!(subExpr.getChild(0) instanceof SlotRef)) {
+                valid = false;
+                break CHECK;
+            }
+            String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName();
+            if (!PRIORITY.equalsIgnoreCase(leftKey)) {
+                valid = false;
+                break CHECK;
+            }
+
+            // right child
+            if (!(subExpr.getChild(1) instanceof StringLiteral)) {
+                valid = false;
+                break CHECK;
+            }
+
+            String value = subExpr.getChild(1).getStringValue();
+            if (Strings.isNullOrEmpty(value)) {
+                valid = false;
+                break CHECK;
+            }
+
+            jobPriority = value.toUpperCase();
+            try {
+                JobPriority.valueOf(jobPriority);
+            } catch (Exception e) {
+                valid = false;
+            }
+        }
+
+        if (!valid) {
+            throw new AnalysisException("Where clause should looks like: "
+                    + "PRIORITY = \"HIGH|MID|LOW\"");
+        }
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SHOW AUTO ANALYZE");
+
+        if (tableName != null) {
+            sb.append(" ");
+            sb.append(tableName.toSql());
+        }
+
+        if (whereClause != null) {
+            sb.append(" ");
+            sb.append("WHERE");
+            sb.append(" ");
+            sb.append(whereClause.toSql());
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    public TableName getTableName() {
+        return tableName;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 53ff62edb68..670ec85f6f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -30,6 +30,7 @@ import org.apache.doris.analysis.ShowAlterStmt;
 import org.apache.doris.analysis.ShowAnalyzeStmt;
 import org.apache.doris.analysis.ShowAnalyzeTaskStatus;
 import org.apache.doris.analysis.ShowAuthorStmt;
+import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
 import org.apache.doris.analysis.ShowBackendsStmt;
 import org.apache.doris.analysis.ShowBackupStmt;
 import org.apache.doris.analysis.ShowBrokerStmt;
@@ -203,6 +204,7 @@ import org.apache.doris.mysql.privilege.PrivBitSet;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.mysql.privilege.Privilege;
 import org.apache.doris.statistics.AnalysisInfo;
+import org.apache.doris.statistics.AutoAnalysisPendingJob;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.statistics.ResultRow;
@@ -440,6 +442,8 @@ public class ShowExecutor {
             handleShowCreateCatalog();
         } else if (stmt instanceof ShowAnalyzeStmt) {
             handleShowAnalyze();
+        } else if (stmt instanceof ShowAutoAnalyzeJobsStmt) {
+            handleShowAutoAnalyzePendingJobs();
         } else if (stmt instanceof ShowTabletsBelongStmt) {
             handleShowTabletsBelong();
         } else if (stmt instanceof AdminCopyTabletStmt) {
@@ -2873,6 +2877,36 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
     }
 
+    private void handleShowAutoAnalyzePendingJobs() {
+        ShowAutoAnalyzeJobsStmt showStmt = (ShowAutoAnalyzeJobsStmt) stmt;
+        List<AutoAnalysisPendingJob> jobs = 
Env.getCurrentEnv().getAnalysisManager().showAutoPendingJobs(showStmt);
+        List<List<String>> resultRows = Lists.newArrayList();
+        for (AutoAnalysisPendingJob job : jobs) {
+            try {
+                List<String> row = new ArrayList<>();
+                CatalogIf<? extends DatabaseIf<? extends TableIf>> c
+                        = StatisticsUtil.findCatalog(job.catalogName);
+                row.add(c.getName());
+                Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = 
c.getDb(job.dbName);
+                row.add(databaseIf.isPresent() ? 
databaseIf.get().getFullName() : "DB may get deleted");
+                if (databaseIf.isPresent()) {
+                    Optional<? extends TableIf> table = 
databaseIf.get().getTable(job.tableName);
+                    row.add(table.isPresent() ? table.get().getName() : "Table 
may get deleted");
+                } else {
+                    row.add("DB may get deleted");
+                }
+                row.add(job.getColumnNames());
+                row.add(String.valueOf(job.priority));
+                resultRows.add(row);
+            } catch (Exception e) {
+                LOG.warn("Failed to get pending jobs for table {}.{}.{}, 
reason: {}",
+                        job.catalogName, job.dbName, job.tableName, 
e.getMessage());
+                continue;
+            }
+        }
+        resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
+    }
+
     private void handleShowTabletsBelong() {
         ShowTabletsBelongStmt showStmt = (ShowTabletsBelongStmt) stmt;
         List<List<String>> rows = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index e2bd85a0f43..c6713b3a7c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.DropAnalyzeJobStmt;
 import org.apache.doris.analysis.DropStatsStmt;
 import org.apache.doris.analysis.KillAnalysisJobStmt;
 import org.apache.doris.analysis.ShowAnalyzeStmt;
+import org.apache.doris.analysis.ShowAutoAnalyzeJobsStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
@@ -113,9 +114,9 @@ public class AnalysisManager implements Writable {
     private static final int COLUMN_QUEUE_SIZE = 1000;
     public final Queue<HighPriorityColumn> highPriorityColumns = new 
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
     public final Queue<HighPriorityColumn> midPriorityColumns = new 
ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
-    public final Map<TableIf, Set<String>> highPriorityJobs = new 
LinkedHashMap<>();
-    public final Map<TableIf, Set<String>> midPriorityJobs = new 
LinkedHashMap<>();
-    public final Map<TableIf, Set<String>> lowPriorityJobs = new 
LinkedHashMap<>();
+    public final Map<TableName, Set<String>> highPriorityJobs = new 
LinkedHashMap<>();
+    public final Map<TableName, Set<String>> midPriorityJobs = new 
LinkedHashMap<>();
+    public final Map<TableName, Set<String>> lowPriorityJobs = new 
LinkedHashMap<>();
 
     // Tracking running manually submitted async tasks, keep in mem only
     protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> 
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -598,6 +599,39 @@ public class AnalysisManager implements Writable {
         }
     }
 
+    public List<AutoAnalysisPendingJob> 
showAutoPendingJobs(ShowAutoAnalyzeJobsStmt stmt) {
+        TableName tblName = stmt.getTableName();
+        String priority = stmt.getPriority();
+        List<AutoAnalysisPendingJob> result = Lists.newArrayList();
+        if (priority == null || priority.isEmpty()) {
+            result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, 
tblName));
+            result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, 
tblName));
+            result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, 
tblName));
+        } else if (priority.equals(JobPriority.HIGH.name())) {
+            result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, 
tblName));
+        } else if (priority.equals(JobPriority.MID.name())) {
+            result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, 
tblName));
+        } else if (priority.equals(JobPriority.LOW.name())) {
+            result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, 
tblName));
+        }
+        return result;
+    }
+
+    protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, 
Set<String>> jobMap,
+            JobPriority priority, TableName tblName) {
+        List<AutoAnalysisPendingJob> result = Lists.newArrayList();
+        synchronized (jobMap) {
+            for (Entry<TableName, Set<String>> entry : jobMap.entrySet()) {
+                TableName table = entry.getKey();
+                if (tblName == null || tblName.equals(table)) {
+                    result.add(new AutoAnalysisPendingJob(table.getCtl(),
+                            table.getDb(), table.getTbl(), entry.getValue(), 
priority));
+                }
+            }
+        }
+        return result;
+    }
+
     public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
         return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
new file mode 100644
index 00000000000..ddd06d17c81
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AutoAnalysisPendingJob.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.Set;
+import java.util.StringJoiner;
+
+public class AutoAnalysisPendingJob {
+
+    public final String catalogName;
+    public final String dbName;
+    public final String tableName;
+    public final Set<String> columnNames;
+    public final JobPriority priority;
+
+    public AutoAnalysisPendingJob(String catalogName, String dbName, String 
tableName,
+            Set<String> columnNames, JobPriority priority) {
+        this.catalogName = catalogName;
+        this.dbName = dbName;
+        this.tableName = tableName;
+        this.columnNames = columnNames;
+        this.priority = priority;
+    }
+
+    public String getColumnNames() {
+        if (columnNames == null) {
+            return "";
+        }
+        StringJoiner stringJoiner = new StringJoiner(",");
+        for (String colName : columnNames) {
+            stringJoiner.add(colName);
+        }
+        return stringJoiner.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
new file mode 100644
index 00000000000..2786b063563
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JobPriority.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+public enum JobPriority {
+    HIGH,
+    MID,
+    LOW;
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
index e0df94b5cb0..227074dbb5c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
@@ -57,13 +58,14 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
     @Override
     protected void collect() {
         while (canCollect()) {
-            Map.Entry<TableIf, Set<String>> job = getJob();
+            Map.Entry<TableName, Set<String>> job = getJob();
             if (job == null) {
                 // No more job to process, break and sleep.
                 break;
             }
             try {
-                TableIf table = job.getKey();
+                TableName tblName = job.getKey();
+                TableIf table = StatisticsUtil.findTable(tblName.getCtl(), 
tblName.getDb(), tblName.getTbl());
                 if (!supportAutoAnalyze(table)) {
                     continue;
                 }
@@ -78,7 +80,7 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
                 processOneJob(table, columns);
             } catch (Exception e) {
                 LOG.warn("Failed to analyze table {} with columns [{}]",
-                        job.getKey().getName(), 
job.getValue().stream().collect(Collectors.joining(",")), e);
+                        job.getKey().getTbl(), 
job.getValue().stream().collect(Collectors.joining(",")), e);
             }
         }
     }
@@ -88,9 +90,9 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
             && 
StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
     }
 
-    protected Map.Entry<TableIf, Set<String>> getJob() {
+    protected Map.Entry<TableName, Set<String>> getJob() {
         AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
-        Optional<Map.Entry<TableIf, Set<String>>> job = 
fetchJobFromMap(manager.highPriorityJobs);
+        Optional<Map.Entry<TableName, Set<String>>> job = 
fetchJobFromMap(manager.highPriorityJobs);
         if (job.isPresent()) {
             return job.get();
         }
@@ -102,9 +104,9 @@ public class StatisticsAutoCollector extends 
StatisticsCollector {
         return job.isPresent() ? job.get() : null;
     }
 
-    protected Optional<Map.Entry<TableIf, Set<String>>> 
fetchJobFromMap(Map<TableIf, Set<String>> jobMap) {
+    protected Optional<Map.Entry<TableName, Set<String>>> 
fetchJobFromMap(Map<TableName, Set<String>> jobMap) {
         synchronized (jobMap) {
-            Optional<Map.Entry<TableIf, Set<String>>> first = 
jobMap.entrySet().stream().findFirst();
+            Optional<Map.Entry<TableName, Set<String>>> first = 
jobMap.entrySet().stream().findFirst();
             first.ifPresent(entry -> jobMap.remove(entry.getKey()));
             return first;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
index 71bb71d3cda..93d03a3fdb8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
@@ -81,31 +82,33 @@ public class StatisticsJobAppender extends MasterDaemon {
         }
     }
 
-    protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, 
Map<TableIf, Set<String>> jobsMap) {
+    protected void appendColumnsToJobs(Queue<HighPriorityColumn> columnQueue, 
Map<TableName, Set<String>> jobsMap) {
         int size = columnQueue.size();
         for (int i = 0; i < size; i++) {
             HighPriorityColumn column = columnQueue.poll();
             LOG.info("Process column " + column.tblId + "." + column.colName);
             TableIf table = StatisticsUtil.findTable(column.catalogId, 
column.dbId, column.tblId);
+            TableName tableName = new 
TableName(table.getDatabase().getCatalog().getName(),
+                    table.getDatabase().getFullName(), table.getName());
             synchronized (jobsMap) {
                 // If job map reach the upper limit, stop putting new jobs.
-                if (!jobsMap.containsKey(table) && jobsMap.size() >= 
JOB_MAP_SIZE) {
+                if (!jobsMap.containsKey(tableName) && jobsMap.size() >= 
JOB_MAP_SIZE) {
                     LOG.info("Job map full.");
                     break;
                 }
-                if (jobsMap.containsKey(table)) {
-                    jobsMap.get(table).add(column.colName);
+                if (jobsMap.containsKey(tableName)) {
+                    jobsMap.get(tableName).add(column.colName);
                 } else {
                     HashSet<String> columns = new HashSet<>();
                     columns.add(column.colName);
-                    jobsMap.put(table, columns);
+                    jobsMap.put(tableName, columns);
                 }
                 LOG.info("Column " + column.tblId + "." + column.colName + " 
added");
             }
         }
     }
 
-    protected void appendToLowQueue(Map<TableIf, Set<String>> jobsMap) {
+    protected void appendToLowQueue(Map<TableName, Set<String>> jobsMap) {
         InternalCatalog catalog = Env.getCurrentInternalCatalog();
         List<Long> sortedDbs = 
catalog.getDbIds().stream().sorted().collect(Collectors.toList());
         int batchSize = 100;
@@ -122,18 +125,20 @@ public class StatisticsJobAppender extends MasterDaemon {
                 if (!(t instanceof OlapTable) || t.getId() <= currentTableId) {
                     continue;
                 }
+                TableName tableName = new 
TableName(t.getDatabase().getCatalog().getName(),
+                        t.getDatabase().getFullName(), t.getName());
                 synchronized (jobsMap) {
                     // If job map reach the upper limit, stop adding new jobs.
-                    if (!jobsMap.containsKey(t) && jobsMap.size() >= 
JOB_MAP_SIZE) {
+                    if (!jobsMap.containsKey(tableName) && jobsMap.size() >= 
JOB_MAP_SIZE) {
                         return;
                     }
                     Set<String> columns
                             = t.getColumns().stream().filter(c -> 
!StatisticsUtil.isUnsupportedType(c.getType()))
                             .map(c -> c.getName()).collect(Collectors.toSet());
-                    if (jobsMap.containsKey(t)) {
-                        jobsMap.get(t).addAll(columns);
+                    if (jobsMap.containsKey(tableName)) {
+                        jobsMap.get(tableName).addAll(columns);
                     } else {
-                        jobsMap.put(t, columns);
+                        jobsMap.put(tableName, columns);
                     }
                 }
                 currentTableId = t.getId();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to