This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 53332eb4ba2 [fix](catalog) refactor the schema cache for external table (#34517) (#34599) 53332eb4ba2 is described below commit 53332eb4ba23cafcc121fa7f47fbb54c85a10279 Author: Mingyu Chen <morning...@163.com> AuthorDate: Thu May 9 18:02:18 2024 +0800 [fix](catalog) refactor the schema cache for external table (#34517) (#34599) bp #34517 --- .../apache/doris/datasource/ExternalCatalog.java | 9 ++-- .../apache/doris/datasource/ExternalDatabase.java | 2 +- .../doris/datasource/ExternalSchemaCache.java | 12 ++--- .../org/apache/doris/datasource/ExternalTable.java | 13 +++-- .../apache/doris/datasource/SchemaCacheValue.java | 40 +++++++++++++++ .../doris/datasource/es/EsExternalTable.java | 8 ++- .../doris/datasource/hive/HMSExternalTable.java | 31 ++++++------ .../doris/datasource/hive/HMSSchemaCacheValue.java | 43 ++++++++++++++++ .../datasource/iceberg/IcebergExternalTable.java | 6 ++- .../infoschema/ExternalInfoSchemaTable.java | 9 ++-- .../datasource/infoschema/ExternalMysqlTable.java | 8 +-- .../doris/datasource/jdbc/JdbcExternalTable.java | 7 ++- .../maxcompute/MaxComputeExternalTable.java | 58 +++++++++++++--------- .../maxcompute/MaxComputeSchemaCacheValue.java | 57 +++++++++++++++++++++ .../doris/datasource/metacache/MetaCache.java | 18 +++++-- .../datasource/paimon/PaimonExternalTable.java | 29 ++++++----- .../datasource/paimon/PaimonSchemaCacheValue.java | 39 +++++++++++++++ .../datasource/paimon/source/PaimonSource.java | 2 +- .../doris/datasource/test/TestExternalTable.java | 8 +-- .../org/apache/doris/mysql/AcceptListener.java | 4 +- 20 files changed, 309 insertions(+), 94 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 72eacbb1de5..435722e06b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.TableName; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InfoSchemaDb; @@ -386,7 +385,7 @@ public abstract class ExternalCatalog } } - public final List<Column> getSchema(String dbName, String tblName) { + public final Optional<SchemaCacheValue> getSchema(String dbName, String tblName) { makeSureInitialized(); Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName); if (db.isPresent()) { @@ -395,9 +394,7 @@ public abstract class ExternalCatalog return table.get().initSchemaAndUpdateTime(); } } - // return one column with unsupported type. - // not return empty to avoid some unexpected issue. - return Lists.newArrayList(Column.UNSUPPORTED_COLUMN); + return Optional.empty(); } @Override @@ -507,7 +504,7 @@ public abstract class ExternalCatalog } if (useMetaCache.get()) { - return metaCache.getMetaObjById(dbId).get(); + return metaCache.getMetaObjById(dbId).orElse(null); } else { return idToDb.get(dbId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index 43c24b5ebd5..6ab3421abc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -370,7 +370,7 @@ public abstract class ExternalDatabase<T extends ExternalTable> public T getTableNullable(String tableName) { makeSureInitialized(); if (extCatalog.getUseMetaCache().get()) { - return metaCache.getMetaObj(tableName).get(); + return metaCache.getMetaObj(tableName).orElse(null); } else { if (!tableNameToId.containsKey(tableName)) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index 9d0ddcfad2f..ad1c1306e34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -31,8 +31,8 @@ import lombok.Data; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -42,7 +42,7 @@ public class ExternalSchemaCache { private static final Logger LOG = LogManager.getLogger(ExternalSchemaCache.class); private final ExternalCatalog catalog; - private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache; + private LoadingCache<SchemaCacheKey, Optional<SchemaCacheValue>> schemaCache; public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService executor) { this.catalog = catalog; @@ -73,22 +73,22 @@ public class ExternalSchemaCache { MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge); } - private ImmutableList<Column> loadSchema(SchemaCacheKey key) { - ImmutableList<Column> schema = ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName)); + private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) { + Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, key.tblName); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } return schema; } - public List<Column> getSchema(String dbName, String tblName) { + public Optional<SchemaCacheValue> getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); return schemaCache.get(key); } public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column> schema) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); - schemaCache.put(key, schema); + schemaCache.put(key, Optional.of(new SchemaCacheValue(schema))); } public void invalidateTableCache(String dbName, String tblName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 952b5c64cf8..b394b85054a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -143,7 +143,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public List<Column> getFullSchema() { ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); - return cache.getSchema(dbName, name); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name); + return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); } @Override @@ -161,7 +162,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return getFullSchema(); } - @Override public void setNewFullSchema(List<Column> newSchema) { } @@ -301,12 +301,12 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * * @return */ - public List<Column> initSchemaAndUpdateTime() { + public Optional<SchemaCacheValue> initSchemaAndUpdateTime() { schemaUpdateTime = System.currentTimeMillis(); return initSchema(); } - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { throw new NotImplementedException("implement in sub class"); } @@ -365,4 +365,9 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { public List<Long> getChunkSizes() { throw new NotImplementedException("getChunkSized not implemented"); } + + protected Optional<SchemaCacheValue> getSchemaCacheValue() { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + return cache.getSchemaValue(dbName, name); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java new file mode 100644 index 00000000000..b02b8bda840 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.Column; + +import java.util.List; + +/** + * The cache value of ExternalSchemaCache. + * Different external table type has different schema cache value. + * For example, Hive table has HMSSchemaCacheValue, Paimon table has PaimonSchemaCacheValue. + * All objects that should be refreshed along with schema should be put in this class. + */ +public class SchemaCacheValue { + protected List<Column> schema; + + public SchemaCacheValue(List<Column> schema) { + this.schema = schema; + } + + public List<Column> getSchema() { + return schema; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java index 6399f89da55..cfde5e794a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java @@ -20,11 +20,13 @@ package org.apache.doris.datasource.es; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.EsTable; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TEsTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import java.util.List; +import java.util.Optional; /** * Elasticsearch external table. @@ -69,9 +71,11 @@ public class EsExternalTable extends ExternalTable { } @Override - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { EsRestClient restClient = ((EsExternalCatalog) catalog).getEsRestClient(); - return EsUtil.genColumnsFromEs(restClient, name, null, ((EsExternalCatalog) catalog).enableMappingEsId()); + return Optional.of(new SchemaCacheValue( + EsUtil.genColumnsFromEs(restClient, name, null, + ((EsExternalCatalog) catalog).enableMappingEsId()))); } private EsTable toEsTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index e29bafd5dc7..c2099a1acc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; import org.apache.doris.datasource.hudi.source.IncrementalRelation; @@ -159,7 +160,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List<Column> partitionColumns; private DLAType dlaType = DLAType.UNKNOWN; @@ -296,15 +296,17 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI public List<Type> getPartitionColumnTypes() { makeSureInitialized(); - getFullSchema(); - return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList()); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColTypes()) + .orElse(Collections.emptyList()); } @Override public List<Column> getPartitionColumns() { makeSureInitialized(); - getFullSchema(); - return partitionColumns; + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((HMSSchemaCacheValue) value).getPartitionColumns()) + .orElse(Collections.emptyList()); } public TableScanParams getScanParams() { @@ -532,7 +534,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public List<Column> initSchemaAndUpdateTime() { + public Optional<SchemaCacheValue> initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); // try to use transient_lastDdlTime from hms client @@ -554,7 +556,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { makeSureInitialized(); List<Column> columns; if (dlaType.equals(DLAType.ICEBERG)) { @@ -564,8 +566,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } else { columns = getHiveSchema(); } - initPartitionColumns(columns); - return columns; + List<Column> partitionColumns = initPartitionColumns(columns); + return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); } private List<Column> getIcebergSchema() { @@ -585,18 +587,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private List<Column> getHiveSchema() { HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient(); - List<Column> columns; List<FieldSchema> schema = client.getSchema(dbName, name); Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size()); + List<Column> columns = Lists.newArrayListWithCapacity(schema.size()); for (FieldSchema field : schema) { String fieldName = field.getName().toLowerCase(Locale.ROOT); String defaultValue = colDefaultValues.getOrDefault(fieldName, null); - tmpSchema.add(new Column(fieldName, + columns.add(new Column(fieldName, HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, defaultValue, field.getComment(), true, -1)); } - columns = tmpSchema; return columns; } @@ -613,10 +613,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return rowCount; } - private void initPartitionColumns(List<Column> schema) { + private List<Column> initPartitionColumns(List<Column> schema) { List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) .collect(Collectors.toList()); - partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); + List<Column> partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size()); for (String partitionKey : partitionKeys) { // Do not use "getColumn()", which will cause dead loop for (Column column : schema) { @@ -636,6 +636,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI if (LOG.isDebugEnabled()) { LOG.debug("get {} partition columns for table: {}", partitionColumns.size(), name); } + return partitionColumns; } public boolean hasColumnStatistics(String colName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java new file mode 100644 index 00000000000..79631e90db0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.SchemaCacheValue; + +import java.util.List; +import java.util.stream.Collectors; + +public class HMSSchemaCacheValue extends SchemaCacheValue { + + private List<Column> partitionColumns; + + public HMSSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) { + super(schema); + this.partitionColumns = partitionColumns; + } + + public List<Column> getPartitionColumns() { + return partitionColumns; + } + + public List<Type> getPartitionColTypes() { + return partitionColumns.stream().map(Column::getType).collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 5266d8745de..de9c3814fd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Column; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -29,6 +30,7 @@ import org.apache.doris.thrift.TTableType; import java.util.HashMap; import java.util.List; +import java.util.Optional; public class IcebergExternalTable extends ExternalTable { @@ -48,8 +50,8 @@ public class IcebergExternalTable extends ExternalTable { } @Override - public List<Column> initSchema() { - return IcebergUtils.getSchema(catalog, dbName, name); + public Optional<SchemaCacheValue> initSchema() { + return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java index 6faf965752b..9d133639612 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java @@ -18,16 +18,16 @@ package org.apache.doris.datasource.infoschema; import org.apache.doris.analysis.SchemaTableType; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.InfoSchemaDb; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TSchemaTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import java.util.List; +import java.util.Optional; public class ExternalInfoSchemaTable extends ExternalTable { @@ -36,10 +36,9 @@ public class ExternalInfoSchemaTable extends ExternalTable { } @Override - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { makeSureInitialized(); - List<Column> columns = SchemaTable.TABLE_MAP.get(name).getFullSchema(); - return columns; + return Optional.of(new SchemaCacheValue(SchemaTable.TABLE_MAP.get(name).getFullSchema())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java index fc64bc053a4..6f277a56906 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java @@ -18,16 +18,16 @@ package org.apache.doris.datasource.infoschema; import org.apache.doris.analysis.SchemaTableType; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MysqlDBTable; import org.apache.doris.catalog.MysqlDb; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TSchemaTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import java.util.List; +import java.util.Optional; public class ExternalMysqlTable extends ExternalTable { public ExternalMysqlTable(long id, String name, ExternalCatalog catalog) { @@ -35,9 +35,9 @@ public class ExternalMysqlTable extends ExternalTable { } @Override - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { makeSureInitialized(); - return MysqlDBTable.TABLE_MAP.get(name).getFullSchema(); + return Optional.of(new SchemaCacheValue(MysqlDBTable.TABLE_MAP.get(name).getFullSchema())); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 64fd25525e5..242b973b87e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource.jdbc; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.JdbcAnalysisTask; @@ -29,6 +30,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Optional; /** * Elasticsearch external table. @@ -71,8 +73,9 @@ public class JdbcExternalTable extends ExternalTable { } @Override - public List<Column> initSchema() { - return ((JdbcExternalCatalog) catalog).getJdbcClient().getColumnsFromJdbc(dbName, name); + public Optional<SchemaCacheValue> initSchema() { + return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient() + .getColumnsFromJdbc(dbName, name))); } private JdbcTable toJdbcTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 363b7ce689d..297a4c0fa09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.thrift.TMCTable; import org.apache.doris.thrift.TTableDescriptor; @@ -43,11 +44,13 @@ import com.aliyun.odps.type.TypeInfo; import com.aliyun.odps.type.VarcharTypeInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -55,12 +58,6 @@ import java.util.stream.Collectors; * MaxCompute external table. */ public class MaxComputeExternalTable extends ExternalTable { - - private Table odpsTable; - private List<String> partitionSpecs; - private Map<String, Column> partitionNameToColumns; - private List<Type> partitionTypes; - public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE); } @@ -69,8 +66,6 @@ public class MaxComputeExternalTable extends ExternalTable { protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); - initTablePartitions(); objectCreated = true; } } @@ -100,26 +95,37 @@ public class MaxComputeExternalTable extends ExternalTable { @Override public Set<String> getPartitionNames() { makeSureInitialized(); - return partitionNameToColumns.keySet(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColNames()) + .orElse(Collections.emptySet()); } public List<Column> getPartitionColumns() { makeSureInitialized(); - return new ArrayList<>(partitionNameToColumns.values()); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getPartitionColumns()) + .orElse(Collections.emptyList()); } public TablePartitionValues getPartitionValues() { makeSureInitialized(); - // Make sure to call it after initSchema() completes + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new TablePartitionValues(); + } + Table odpsTable = ((MaxComputeSchemaCacheValue) schemaCacheValue.get()).getOdpsTable(); String projectName = odpsTable.getProject(); String tableName = odpsTable.getName(); MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMaxComputeMetadataCache(catalog.getId()); return metadataCache.getCachedPartitionValues( - new MaxComputeCacheKey(projectName, tableName), key -> loadPartitionValues(key)); + new MaxComputeCacheKey(projectName, tableName), + key -> loadPartitionValues((MaxComputeSchemaCacheValue) schemaCacheValue.get())); } - private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) { + private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue schemaCacheValue) { + List<String> partitionSpecs = schemaCacheValue.getPartitionSpecs(); + List<Type> partitionTypes = schemaCacheValue.getPartitionTypes(); TablePartitionValues partitionValues = new TablePartitionValues(); partitionValues.addPartitions(partitionSpecs, partitionSpecs.stream() @@ -154,21 +160,19 @@ public class MaxComputeExternalTable extends ExternalTable { } @Override - public List<Column> initSchema() { + public Optional<SchemaCacheValue> initSchema() { // this method will be called at semantic parsing. makeSureInitialized(); + Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name); List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns(); - List<Column> result = Lists.newArrayListWithCapacity(columns.size()); + List<Column> schema = Lists.newArrayListWithCapacity(columns.size()); for (com.aliyun.odps.Column field : columns) { - result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, + schema.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null, true, field.getComment(), true, -1)); } - result.addAll(partitionNameToColumns.values()); - return result; - } - private void initTablePartitions() { List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns(); + List<String> partitionSpecs; if (!partitionColumns.isEmpty()) { partitionSpecs = odpsTable.getPartitions().stream() .map(e -> e.getPartitionSpec().toString(false, true)) @@ -177,17 +181,21 @@ public class MaxComputeExternalTable extends ExternalTable { partitionSpecs = ImmutableList.of(); } // sort partition columns to align partitionTypes and partitionName. - partitionNameToColumns = new LinkedHashMap<>(); + Map<String, Column> partitionNameToColumns = Maps.newHashMap(); for (com.aliyun.odps.Column partColumn : partitionColumns) { Column dorisCol = new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null, true, partColumn.getComment(), true, -1); partitionNameToColumns.put(dorisCol.getName(), dorisCol); } - partitionTypes = partitionNameToColumns.values() + List<Type> partitionTypes = partitionNameToColumns.values() .stream() .map(Column::getType) .collect(Collectors.toList()); + + schema.addAll(partitionNameToColumns.values()); + return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable, partitionSpecs, partitionNameToColumns, + partitionTypes)); } private Type mcTypeToDorisType(TypeInfo typeInfo) { @@ -295,6 +303,8 @@ public class MaxComputeExternalTable extends ExternalTable { public Table getOdpsTable() { makeSureInitialized(); - return odpsTable; + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue) value).getOdpsTable()) + .orElse(null); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java new file mode 100644 index 00000000000..b8337d96120 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.maxcompute; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.SchemaCacheValue; + +import com.aliyun.odps.Table; +import com.google.common.collect.Lists; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Getter +@Setter +public class MaxComputeSchemaCacheValue extends SchemaCacheValue { + private Table odpsTable; + private List<String> partitionSpecs; + private Map<String, Column> partitionNameToColumns; + private List<Type> partitionTypes; + + public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable, List<String> partitionSpecs, + Map<String, Column> partitionNameToColumns, List<Type> partitionTypes) { + super(schema); + this.odpsTable = odpsTable; + this.partitionSpecs = partitionSpecs; + this.partitionNameToColumns = partitionNameToColumns; + this.partitionTypes = partitionTypes; + } + + public Set<String> getPartitionColNames() { + return partitionNameToColumns.keySet(); + } + + public List<Column> getPartitionColumns() { + return Lists.newArrayList(partitionNameToColumns.values()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java index 023396670d8..da8f068dfd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java @@ -49,14 +49,26 @@ public class MetaCache<T> { RemovalListener<String, Optional<T>> removalListener) { this.name = name; - CacheFactory cacheFactory = new CacheFactory( + // ATTN: + // The refreshAfterWriteSec is only used for metaObjCache, not for namesCache. + // Because namesCache need to be refreshed at interval so that user can get the latest meta list. + // But metaObjCache does not need to be refreshed at interval, because the object is actually not + // from remote datasource, it is just a local generated object to represent the meta info. + // So it only need to be expired after specified duration. + CacheFactory namesCacheFactory = new CacheFactory( expireAfterWriteSec, refreshAfterWriteSec, maxSize, true, null); - namesCache = cacheFactory.buildCache(namesCacheLoader, null, executor); - metaObjCache = cacheFactory.buildCache(metaObjCacheLoader, removalListener, executor); + CacheFactory objCacheFactory = new CacheFactory( + expireAfterWriteSec, + OptionalLong.empty(), + maxSize, + true, + null); + namesCache = namesCacheFactory.buildCache(namesCacheLoader, null, executor); + metaObjCache = objCacheFactory.buildCache(metaObjCacheLoader, removalListener, executor); } public List<String> listNames() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 7d870f36059..d9e43bdd6cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -44,14 +45,13 @@ import org.apache.paimon.types.RowType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class PaimonExternalTable extends ExternalTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); - private Table originTable = null; - public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); } @@ -63,23 +63,20 @@ public class PaimonExternalTable extends ExternalTable { protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { - originTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - schemaUpdateTime = System.currentTimeMillis(); objectCreated = true; } } - public Table getOriginTable() { + public Table getPaimonTable() { makeSureInitialized(); - return originTable; + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); } @Override - public List<Column> initSchema() { - //init schema need update lastUpdateTime and get latest schema - objectCreated = false; - Table table = getOriginTable(); - TableSchema schema = ((FileStoreTable) table).schema(); + public Optional<SchemaCacheValue> initSchema() { + Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); + TableSchema schema = ((FileStoreTable) paimonTable).schema(); List<DataField> columns = schema.fields(); List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (DataField field : columns) { @@ -87,7 +84,7 @@ public class PaimonExternalTable extends ExternalTable { paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, field.id())); } - return tmpSchema; + return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); } private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { @@ -180,7 +177,13 @@ public class PaimonExternalTable extends ExternalTable { makeSureInitialized(); try { long rowCount = 0; - List<Split> splits = originTable.newReadBuilder().newScan().plan().splits(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) + .orElse(null); + if (paimonTable == null) { + return -1; + } + List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java new file mode 100644 index 00000000000..aaaefe7f32d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.SchemaCacheValue; + +import org.apache.paimon.table.Table; + +import java.util.List; + +public class PaimonSchemaCacheValue extends SchemaCacheValue { + + private Table paimonTable; + + public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { + super(schema); + this.paimonTable = paimonTable; + } + + public Table getPaimonTable() { + return paimonTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index a405aa92ea6..9ac44537e8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -39,7 +39,7 @@ public class PaimonSource { public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, Map<String, ColumnRange> columnNameToRange) { this.paimonExtTable = table; - this.originTable = paimonExtTable.getOriginTable(); + this.originTable = paimonExtTable.getPaimonTable(); this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java index 99f0238c170..6da0981b97e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java @@ -17,15 +17,15 @@ package org.apache.doris.datasource.test; -import org.apache.doris.catalog.Column; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.List; +import java.util.Optional; /** * TestExternalTable is a table for unit test. @@ -53,7 +53,7 @@ public class TestExternalTable extends ExternalTable { } @Override - public List<Column> initSchema() { - return ((TestExternalCatalog) catalog).mockedSchema(dbName, name); + public Optional<SchemaCacheValue> initSchema() { + return Optional.of(new SchemaCacheValue(((TestExternalCatalog) catalog).mockedSchema(dbName, name))); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index c3cdf8a955a..3d783f28cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -101,8 +101,8 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo } context.setStartTime(); int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()); - if (userQueryTimeout <= 0) { - LOG.warn("Connection set query timeout to {}", + if (userQueryTimeout <= 0 && LOG.isDebugEnabled()) { + LOG.debug("Connection set query timeout to {}", context.getSessionVariable().getQueryTimeoutS()); } context.setUserQueryTimeout(userQueryTimeout); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org