This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new aa24c8f973 [Feature](multi-catalog) Support refresh catalog metadata (#11656) aa24c8f973 is described below commit aa24c8f973b544e71ef5f2f4b3fe305e3185ca61 Author: Stalary <stal...@163.com> AuthorDate: Thu Aug 11 15:14:05 2022 +0800 [Feature](multi-catalog) Support refresh catalog metadata (#11656) --- fe/fe-core/src/main/cup/sql_parser.cup | 4 ++ .../apache/doris/analysis/RefreshCatalogStmt.java | 67 ++++++++++++++++++++++ .../apache/doris/datasource/CatalogFactory.java | 3 + .../org/apache/doris/datasource/DataSourceMgr.java | 49 ++++++++++++++-- .../doris/datasource/EsExternalDataSource.java | 2 - .../doris/datasource/ExternalDataSource.java | 1 + .../doris/datasource/HMSExternalDataSource.java | 1 - .../org/apache/doris/journal/JournalEntity.java | 8 +-- .../java/org/apache/doris/persist/EditLog.java | 21 +++++-- .../org/apache/doris/persist/OperationType.java | 20 ++++--- .../main/java/org/apache/doris/qe/DdlExecutor.java | 13 ++++- 11 files changed, 163 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 9ca18f39c0..96ad0f0805 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -808,6 +808,10 @@ refresh_stmt ::= {: RESULT = new RefreshMaterializedViewStmt(mv, MVRefreshInfo.RefreshMethod.COMPLETE); :} + | KW_REFRESH KW_CATALOG ident:catalogName + {: + RESULT = new RefreshCatalogStmt(catalogName); + :} ; clean_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java new file mode 100644 index 0000000000..d2f27fec19 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.InternalDataSource; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +/** + * RefreshCatalogStmt + * Manually refresh the catalog metadata. + */ +public class RefreshCatalogStmt extends DdlStmt { + + private final String catalogName; + + public RefreshCatalogStmt(String catalogName) { + this.catalogName = catalogName; + } + + public String getCatalogName() { + return catalogName; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + Util.checkCatalogAllRules(catalogName); + if (catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) { + throw new AnalysisException("Internal catalog name can't be refresh."); + } + + if (!Env.getCurrentEnv().getAuth().checkCtlPriv( + ConnectContext.get(), catalogName, PrivPredicate.ALTER)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED, + analyzer.getQualifiedUser(), catalogName); + } + } + + @Override + public String toSql() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("REFRESH CATALOG ").append("`").append(catalogName).append("`"); + return stringBuilder.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index f3669d325b..8b110f80b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt; import org.apache.doris.analysis.AlterCatalogPropertyStmt; import org.apache.doris.analysis.CreateCatalogStmt; import org.apache.doris.analysis.DropCatalogStmt; +import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.StatementBase; import org.apache.doris.common.DdlException; @@ -48,6 +49,8 @@ public class CatalogFactory { } else if (stmt instanceof AlterCatalogNameStmt) { log.setCatalogId(catalogId); log.setNewCatalogName(((AlterCatalogNameStmt) stmt).getNewCatalogName()); + } else if (stmt instanceof RefreshCatalogStmt) { + log.setCatalogId(catalogId); } else { throw new RuntimeException("Unknown stmt for datasource manager " + stmt.getClass().getSimpleName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java index c9e58be38f..22a9e00efd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DataSourceMgr.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterCatalogNameStmt; import org.apache.doris.analysis.AlterCatalogPropertyStmt; import org.apache.doris.analysis.CreateCatalogStmt; import org.apache.doris.analysis.DropCatalogStmt; +import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.ShowCatalogStmt; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -95,6 +96,16 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { return catalog; } + private void unprotectedRefreshCatalog(long catalogId) { + DataSourceIf catalog = idToCatalog.get(catalogId); + if (catalog != null) { + String catalogName = catalog.getName(); + if (!catalogName.equals(InternalDataSource.INTERNAL_DS_NAME)) { + ((ExternalDataSource) catalog).setInitialized(false); + } + } + } + public InternalDataSource getInternalDataSource() { return internalDataSource; } @@ -192,7 +203,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { long id = Env.getCurrentEnv().getNextId(); CatalogLog log = CatalogFactory.constructorCatalogLog(id, stmt); replayCreateCatalog(log); - Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_DS, log); + Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_CREATE_CATALOG, log); } finally { writeUnlock(); } @@ -214,7 +225,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { } CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); replayDropCatalog(log); - Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_DS, log); + Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_DROP_CATALOG, log); } finally { writeUnlock(); } @@ -232,7 +243,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { } CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); replayAlterCatalogName(log); - Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_NAME, log); + Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_NAME, log); } finally { writeUnlock(); } @@ -253,7 +264,7 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { } CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); replayAlterCatalogProps(log); - Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_DS_PROPS, log); + Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_ALTER_CATALOG_PROPS, log); } finally { writeUnlock(); } @@ -308,6 +319,24 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { return new ShowResultSet(showStmt.getMetaData(), rows); } + /** + * Refresh the catalog meta and write the meta log. + */ + public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException { + writeLock(); + try { + DataSourceIf catalog = nameToCatalog.get(stmt.getCatalogName()); + if (catalog == null) { + throw new DdlException("No catalog found with name: " + stmt.getCatalogName()); + } + CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt); + replayRefreshCatalog(log); + Env.getCurrentEnv().getEditLog().logDatasourceLog(OperationType.OP_REFRESH_CATALOG, log); + } finally { + writeUnlock(); + } + } + /** * Reply for create catalog event. */ @@ -333,6 +362,18 @@ public class DataSourceMgr implements Writable, GsonPostProcessable { } } + /** + * Reply for refresh catalog event. + */ + public void replayRefreshCatalog(CatalogLog log) throws DdlException { + writeLock(); + try { + unprotectedRefreshCatalog(log.getCatalogId()); + } finally { + writeUnlock(); + } + } + /** * Reply for alter catalog name event. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java index fd5b3fb077..cf557df762 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalDataSource.java @@ -60,8 +60,6 @@ public class EsExternalDataSource extends ExternalDataSource { private EsRestClient esRestClient; - private boolean initialized = false; - private String[] nodes; private String username = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java index 79223498f8..a0f376da75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDataSource.java @@ -49,6 +49,7 @@ public abstract class ExternalDataSource implements DataSourceIf<ExternalDatabas // save properties of this data source, such as hive meta store url. @SerializedName(value = "dsProperty") protected DataSourceProperty dsProperty = new DataSourceProperty(); + protected boolean initialized = false; /** * @return names of database in this data source. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java index 293266bdb4..fa4087b3f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalDataSource.java @@ -44,7 +44,6 @@ public class HMSExternalDataSource extends ExternalDataSource { // Cache of db name to db id. private Map<String, Long> dbNameToId; private Map<Long, HMSExternalDatabase> idToDb; - private boolean initialized = false; protected HiveMetaStoreClient client; /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 19556ce614..a724518386 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -667,10 +667,10 @@ public class JournalEntity implements Writable { isRead = true; break; } - case OperationType.OP_CREATE_DS: - case OperationType.OP_DROP_DS: - case OperationType.OP_ALTER_DS_NAME: - case OperationType.OP_ALTER_DS_PROPS: { + case OperationType.OP_CREATE_CATALOG: + case OperationType.OP_DROP_CATALOG: + case OperationType.OP_ALTER_CATALOG_NAME: + case OperationType.OP_ALTER_CATALOG_PROPS: { data = CatalogLog.read(in); isRead = true; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 9da6cdd656..9bc1ec48e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -95,6 +95,9 @@ public class EditLog { private Journal journal; + /** + * The constructor. + **/ public EditLog(String nodeName) { String journalType = Config.edit_log_type; if (journalType.equalsIgnoreCase("bdb")) { @@ -134,6 +137,9 @@ public class EditLog { return journal == null ? 0 : 1; } + /** + * Load journal. + **/ public static void loadJournal(Env env, JournalEntity journal) { short opCode = journal.getOpCode(); if (opCode != OperationType.OP_SAVE_NEXTID && opCode != OperationType.OP_TIMESTAMP) { @@ -838,26 +844,31 @@ public class EditLog { env.getPolicyMgr().replayStoragePolicyAlter(log); break; } - case OperationType.OP_CREATE_DS: { + case OperationType.OP_CREATE_CATALOG: { CatalogLog log = (CatalogLog) journal.getData(); env.getDataSourceMgr().replayCreateCatalog(log); break; } - case OperationType.OP_DROP_DS: { + case OperationType.OP_DROP_CATALOG: { CatalogLog log = (CatalogLog) journal.getData(); env.getDataSourceMgr().replayDropCatalog(log); break; } - case OperationType.OP_ALTER_DS_NAME: { + case OperationType.OP_ALTER_CATALOG_NAME: { CatalogLog log = (CatalogLog) journal.getData(); env.getDataSourceMgr().replayAlterCatalogName(log); break; } - case OperationType.OP_ALTER_DS_PROPS: { + case OperationType.OP_ALTER_CATALOG_PROPS: { CatalogLog log = (CatalogLog) journal.getData(); env.getDataSourceMgr().replayAlterCatalogProps(log); break; } + case OperationType.OP_REFRESH_CATALOG: { + CatalogLog log = (CatalogLog) journal.getData(); + env.getDataSourceMgr().replayRefreshCatalog(log); + break; + } case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS: { final TableAddOrDropColumnsInfo info = (TableAddOrDropColumnsInfo) journal.getData(); env.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info); @@ -875,7 +886,7 @@ public class EditLog { } } } catch (MetaNotFoundException e) { - /** + /* * In the following cases, doris may record metadata modification information * for a table that no longer exists. * 1. Thread 1: get TableA object diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 411bf056b4..f87b9c51a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -20,6 +20,9 @@ package org.apache.doris.persist; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +/** + * Operation name and code mapping. + **/ public class OperationType { // OP_LOCAL_EOF is only for local edit log, to indicate the end of a edit log run. public static final short OP_LOCAL_EOF = -1; @@ -227,15 +230,18 @@ public class OperationType { // policy 310-320 public static final short OP_CREATE_POLICY = 310; public static final short OP_DROP_POLICY = 311; + public static final short OP_ALTER_STORAGE_POLICY = 312; - // datasource 312-315 - public static final short OP_CREATE_DS = 312; - public static final short OP_DROP_DS = 313; - public static final short OP_ALTER_DS_NAME = 314; - public static final short OP_ALTER_DS_PROPS = 315; - public static final short OP_ALTER_STORAGE_POLICY = 316; + // catalog 320-330 + public static final short OP_CREATE_CATALOG = 320; + public static final short OP_DROP_CATALOG = 321; + public static final short OP_ALTER_CATALOG_NAME = 322; + public static final short OP_ALTER_CATALOG_PROPS = 323; + public static final short OP_REFRESH_CATALOG = 324; - // get opcode name by op codeStri + /** + * Get opcode name by op code. + **/ public static String getOpName(short opCode) { try { Field[] fields = OperationType.class.getDeclaredFields(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 1f42930ecd..7f6d00846a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -95,6 +95,7 @@ import org.apache.doris.analysis.PauseSyncJobStmt; import org.apache.doris.analysis.RecoverDbStmt; import org.apache.doris.analysis.RecoverPartitionStmt; import org.apache.doris.analysis.RecoverTableStmt; +import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.RefreshDbStmt; import org.apache.doris.analysis.RefreshMaterializedViewStmt; import org.apache.doris.analysis.RefreshTableStmt; @@ -116,7 +117,13 @@ import org.apache.doris.common.DdlException; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.sync.SyncJobManager; +/** + * Use for execute ddl. + **/ public class DdlExecutor { + /** + * Execute ddl. + **/ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { if (ddlStmt instanceof CreateClusterStmt) { CreateClusterStmt stmt = (CreateClusterStmt) ddlStmt; @@ -153,8 +160,6 @@ public class DdlExecutor { env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) { env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt) ddlStmt); - } else if (ddlStmt instanceof DropMaterializedViewStmt) { - env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof AlterTableStmt) { env.alterTable((AlterTableStmt) ddlStmt); } else if (ddlStmt instanceof AlterTableStatsStmt) { @@ -322,13 +327,15 @@ public class DdlExecutor { } else if (ddlStmt instanceof AlterCatalogPropertyStmt) { env.getDataSourceMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt); } else if (ddlStmt instanceof CleanLabelStmt) { - env.getCurrentEnv().getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt); + env.getLoadManager().cleanLabel((CleanLabelStmt) ddlStmt); } else if (ddlStmt instanceof AlterMaterializedViewStmt) { env.alterMaterializedView((AlterMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof DropMaterializedViewStmt) { env.dropMaterializedView((DropMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof RefreshMaterializedViewStmt) { env.refreshMaterializedView((RefreshMaterializedViewStmt) ddlStmt); + } else if (ddlStmt instanceof RefreshCatalogStmt) { + env.getDataSourceMgr().refreshCatalog((RefreshCatalogStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org