This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7eed5a292c [feature-wip](multi-catalog) Support hive partition cache
(#14134)
7eed5a292c is described below
commit 7eed5a292c71a5ff67dce8333e58e4aa49ddb276
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Nov 14 14:12:40 2022 +0800
[feature-wip](multi-catalog) Support hive partition cache (#14134)
---
be/src/exec/text_converter.hpp | 2 +-
docs/en/docs/admin-manual/config/fe-config.md | 52 +++
docs/zh-CN/docs/admin-manual/config/fe-config.md | 43 ++-
.../maint-monitor/monitor-metrics/metrics.md | 5 +
.../java/org/apache/doris/analysis/ColumnDef.java | 4 +-
.../main/java/org/apache/doris/catalog/Env.java | 8 +
.../org/apache/doris/catalog/PartitionKey.java | 21 +-
.../org/apache/doris/catalog/RefreshManager.java | 3 +-
.../doris/catalog/external/EsExternalDatabase.java | 24 +-
.../doris/catalog/external/EsExternalTable.java | 82 +---
.../doris/catalog/external/ExternalDatabase.java | 27 +-
.../doris/catalog/external/ExternalTable.java | 43 +--
.../catalog/external/HMSExternalDatabase.java | 24 +-
.../doris/catalog/external/HMSExternalTable.java | 136 ++-----
.../main/java/org/apache/doris/common/Config.java | 28 ++
.../apache/doris/datasource/CacheException.java | 24 ++
.../org/apache/doris/datasource/CatalogMgr.java | 25 +-
.../apache/doris/datasource/EsExternalCatalog.java | 7 +
.../apache/doris/datasource/ExternalCatalog.java | 7 +-
.../doris/datasource/ExternalMetaCacheMgr.java | 95 +++++
.../doris/datasource/ExternalSchemaCache.java | 131 +++++++
.../doris/datasource/HMSClientException.java | 24 ++
.../doris/datasource/HMSExternalCatalog.java | 15 +
.../datasource/PooledHiveMetaStoreClient.java | 42 +-
.../doris/datasource/hive/HiveMetaStoreCache.java | 423 +++++++++++++++++++++
.../doris/datasource/hive/HivePartition.java | 46 +++
.../java/org/apache/doris/persist/EditLog.java | 8 +-
.../org/apache/doris/persist/OperationType.java | 1 +
.../java/org/apache/doris/planner/ScanNode.java | 1 -
.../planner/external/ExternalFileScanNode.java | 12 +-
.../doris/planner/external/HiveScanProvider.java | 151 ++++----
.../doris/planner/external/HudiScanProvider.java | 7 +-
.../planner/external/IcebergScanProvider.java | 9 +-
.../doris/planner/external/QueryScanProvider.java | 14 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
35 files changed, 1155 insertions(+), 391 deletions(-)
diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 4eaa065947..45a7a4e570 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -206,7 +206,6 @@ inline bool TextConverter::write_vec_column(const
SlotDescriptor* slot_desc,
}
StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
-
// Parse the raw-text data. Translate the text string to internal format.
switch (slot_desc->type().type) {
case TYPE_HLL: {
@@ -314,6 +313,7 @@ inline bool TextConverter::write_vec_column(const
SlotDescriptor* slot_desc,
size_t size = nullable_column->get_null_map_data().size();
doris::vectorized::NullMap& null_map_data =
nullable_column->get_null_map_data();
null_map_data[size - 1] = 1;
+ nullable_column->get_nested_column().insert_default();
}
return false;
}
diff --git a/docs/en/docs/admin-manual/config/fe-config.md
b/docs/en/docs/admin-manual/config/fe-config.md
index 66ae4c75d7..71fc393f72 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2258,3 +2258,55 @@ Default: 1
Is it possible to configure dynamically: true
Whether it is a configuration item unique to the Master FE node: true
+
+### `max_replica_count_when_schema_change`
+
+The maximum number of replicas allowed when OlapTable is doing schema changes.
Too many replicas will lead to FE OOM.
+
+Default: 100000
+
+Is it possible to configure dynamically: true
+
+Whether it is a configuration item unique to the Master FE node: true
+
+### `max_hive_partition_cache_num`
+
+The maximum number of caches for the hive partition.
+
+Default: 100000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `max_external_file_cache_num`
+
+Maximum number of file cache to use for external external tables.
+
+Default: 100000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `max_external_schema_cache_num`
+
+Maximum number of schema cache to use for external external tables.
+
+Default: 10000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `external_cache_expire_time_minutes_after_access`
+
+Set how long the data in the cache expires after the last access. The unit is
minutes.
+Applies to External Schema Cache as well as Hive Partition Cache.
+
+Default: 1440
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 957f87bcd6..dd2f238032 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2315,7 +2315,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
是否为 Master FE 节点独有的配置项:true
-### max_replica_count_when_schema_change
+### `max_replica_count_when_schema_change`
OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。
@@ -2324,3 +2324,44 @@ OlapTable在做schema change时,允许的最大副本数,副本数过大会
是否可以动态配置:true
是否为 Master FE 节点独有的配置项:true
+
+### `max_hive_partition_cache_num`
+
+hive partition 的最大缓存数量。
+
+默认值:100000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `max_external_file_cache_num`
+
+用于 external 外部表的最大文件缓存数量。
+
+默认值:100000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `max_external_schema_cache_num`
+
+用于 external 外部表的最大 schema 缓存数量。
+
+默认值:10000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `external_cache_expire_time_minutes_after_access`
+
+设置缓存中的数据,在最后一次访问后多久失效。单位为分钟。
+适用于 External Schema Cache 以及 Hive Partition Cache.
+
+默认值:1440
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 206f021e2e..96ede4e7d5 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -143,6 +143,11 @@ curl http://be_host:webserver_port/metrics?type=json
|`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test
当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 |
|`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如
{method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| |
|`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如
{method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| |
+|`doris_fe_external_schema_cache` | {catalog="hive"} | Num | 指定 External
Catalog 对应的 schema cache 的数量 |||
+|`doris_fe_hive_meta_cache` | {catalog="hive"} | Num | |||
+| | `{type="partition_value"}` | Num | 指定 External Hive Metastore Catalog 对应的
partition value cache 的数量 |||
+| | `{type="partition"}` | Num | 指定 External Hive Metastore Catalog 对应的
partition cache 的数量 |||
+| | `{type="file"}` | Num | 指定 External Hive Metastore Catalog 对应的 file cache
的数量 |||
### JVM 监控
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 87267e69d2..36b556c623 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -199,14 +199,14 @@ public class ColumnDef {
}
FeNameFormat.checkColumnName(name);
- // When string type length is not assigned, it need to be assigned to
1.
+ // When string type length is not assigned, it needs to be assigned to
1.
if (typeDef.getType().isScalarType()) {
final ScalarType targetType = (ScalarType) typeDef.getType();
if (targetType.getPrimitiveType().isStringType() &&
!targetType.isLengthSet()) {
if (targetType.getPrimitiveType() != PrimitiveType.STRING) {
targetType.setLength(1);
} else {
- // alway set text length MAX_STRING_LENGTH
+ // always set text length MAX_STRING_LENGTH
targetType.setLength(ScalarType.MAX_STRING_LENGTH);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 022f86cbf0..8ce6d7a346 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -124,6 +124,7 @@ import org.apache.doris.consistency.ConsistencyChecker;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
@@ -450,6 +451,8 @@ public class Env {
private final StatisticsCache statisticsCache;
+ private ExternalMetaCacheMgr extMetaCacheMgr;
+
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
@@ -515,6 +518,10 @@ public class Env {
return mtmvJobManager;
}
+ public ExternalMetaCacheMgr getExtMetaCacheMgr() {
+ return extMetaCacheMgr;
+ }
+
public CatalogIf getCurrentCatalog() {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
@@ -646,6 +653,7 @@ public class Env {
this.mtmvJobManager = new MTMVJobManager();
this.analysisJobScheduler = new AnalysisJobScheduler();
this.statisticsCache = new StatisticsCache();
+ this.extMetaCacheMgr = new ExternalMetaCacheMgr();
}
public static void destroyCheckpoint() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index d8bcb961e2..f773209cd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -40,6 +40,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.zip.CRC32;
public class PartitionKey implements Comparable<PartitionKey>, Writable {
@@ -86,7 +87,7 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
return partitionKey;
}
- public static PartitionKey createListPartitionKey(List<PartitionValue>
values, List<Column> columns)
+ public static PartitionKey
createListPartitionKeyWithTypes(List<PartitionValue> values, List<Type> types)
throws AnalysisException {
// for multi list partition:
//
@@ -108,18 +109,24 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
// PARTITION p6 VALUES IN ("26")
// )
//
- Preconditions.checkArgument(values.size() == columns.size(),
- "in value size[" + values.size() + "] is not equal to
partition column size[" + columns.size() + "].");
+ Preconditions.checkArgument(values.size() == types.size(),
+ "in value size[" + values.size() + "] is not equal to
partition column size[" + types.size() + "].");
PartitionKey partitionKey = new PartitionKey();
for (int i = 0; i < values.size(); i++) {
-
partitionKey.keys.add(values.get(i).getValue(Type.fromPrimitiveType(columns.get(i).getDataType())));
- partitionKey.types.add(columns.get(i).getDataType());
+ partitionKey.keys.add(values.get(i).getValue(types.get(i)));
+ partitionKey.types.add(types.get(i).getPrimitiveType());
}
return partitionKey;
}
+ public static PartitionKey createListPartitionKey(List<PartitionValue>
values, List<Column> columns)
+ throws AnalysisException {
+ List<Type> types = columns.stream().map(c ->
c.getType()).collect(Collectors.toList());
+ return createListPartitionKeyWithTypes(values, types);
+ }
+
public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) {
keys.add(keyValue);
types.add(keyType);
@@ -163,6 +170,10 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
return true;
}
+ public List<String> getPartitionValuesAsStringList() {
+ return keys.stream().map(k ->
k.getStringValue()).collect(Collectors.toList());
+ }
+
public static int compareLiteralExpr(LiteralExpr key1, LiteralExpr key2) {
int ret = 0;
if (key1 instanceof MaxLiteral || key2 instanceof MaxLiteral) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index bb37a9b1b9..439c8c73c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -23,7 +23,6 @@ import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
@@ -160,7 +159,7 @@ public class RefreshManager {
if (table == null) {
throw new DdlException("Table " + tableName + " does not exist in
db " + dbName);
}
- ((ExternalTable) table).setUnInitialized();
+ Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId(),
dbName, tableName);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
index 784c0b8bdc..e9b3ce354b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -18,12 +18,10 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@@ -65,7 +63,6 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> implem
Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
EsExternalTable table =
getTableForReplay(log.getRefreshTableIds().get(i));
- table.setUnInitialized();
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
@@ -86,24 +83,8 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> implem
}
}
- public synchronized void makeSureInitialized() {
- if (!initialized) {
- if (!Env.getCurrentEnv().isMaster()) {
- // Forward to master and wait the journal to replay.
- MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
- try {
- remoteExecutor.forward(extCatalog.getId(), id, -1);
- } catch (Exception e) {
- Util.logAndThrowRuntimeException(LOG,
- String.format("failed to forward init external db
%s operation to master", name), e);
- }
- return;
- }
- init();
- }
- }
-
- private void init() {
+ @Override
+ protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.ES);
initDatabaseLog.setCatalogId(extCatalog.getId());
@@ -118,7 +99,6 @@ public class EsExternalDatabase extends
ExternalDatabase<EsExternalTable> implem
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
EsExternalTable table = idToTbl.get(tblId);
- table.setUnInitialized();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
index 5cbafdcd15..eb8c8972b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -18,13 +18,8 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.EsExternalCatalog;
-import org.apache.doris.datasource.InitTableLog;
-import org.apache.doris.external.elasticsearch.EsUtil;
-import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -54,84 +49,13 @@ public class EsExternalTable extends ExternalTable {
super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE);
}
-
public synchronized void makeSureInitialized() {
- if (!initialized) {
- if (!Env.getCurrentEnv().isMaster()) {
- fullSchema = null;
- // Forward to master and wait the journal to replay.
- MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
- try {
- remoteExecutor.forward(catalog.getId(),
catalog.getDbNullable(dbName).getId(), id);
- } catch (Exception e) {
- Util.logAndThrowRuntimeException(LOG,
- String.format("failed to forward init external
table %s operation to master", name), e);
- }
- } else {
- init();
- }
- }
if (!objectCreated) {
esTable = toEsTable();
objectCreated = true;
}
}
- private void init() {
- boolean schemaChanged = false;
- List<Column> tmpSchema = EsUtil.genColumnsFromEs(
- ((EsExternalCatalog) catalog).getEsRestClient(), name, null);
- if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
- schemaChanged = true;
- } else {
- for (int i = 0; i < fullSchema.size(); i++) {
- if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
- schemaChanged = true;
- break;
- }
- }
- }
- if (schemaChanged) {
- timestamp = System.currentTimeMillis();
- fullSchema = tmpSchema;
- esTable = toEsTable();
- }
- initialized = true;
- InitTableLog initTableLog = new InitTableLog();
- initTableLog.setCatalogId(catalog.getId());
- initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
- initTableLog.setTableId(id);
- initTableLog.setSchema(fullSchema);
- Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
- }
-
- @Override
- public List<Column> getFullSchema() {
- makeSureInitialized();
- return fullSchema;
- }
-
- @Override
- public List<Column> getBaseSchema() {
- return getFullSchema();
- }
-
- @Override
- public List<Column> getBaseSchema(boolean full) {
- return getFullSchema();
- }
-
- @Override
- public Column getColumn(String name) {
- makeSureInitialized();
- for (Column column : fullSchema) {
- if (name.equals(column.getName())) {
- return column;
- }
- }
- return null;
- }
-
public EsTable getEsTable() {
makeSureInitialized();
return esTable;
@@ -151,16 +75,18 @@ public class EsExternalTable extends ExternalTable {
@Override
public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
TEsTable tEsTable = new TEsTable();
- TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.ES_TABLE, fullSchema.size(), 0,
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.ES_TABLE, schema.size(), 0,
getName(), "");
tTableDescriptor.setEsTable(tEsTable);
return tTableDescriptor;
}
private EsTable toEsTable() {
+ List<Column> schema = getFullSchema();
EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
- EsTable esTable = new EsTable(this.id, this.name, this.fullSchema,
TableType.ES_EXTERNAL_TABLE);
+ EsTable esTable = new EsTable(this.id, this.name, schema,
TableType.ES_EXTERNAL_TABLE);
esTable.setIndexName(name);
esTable.setClient(esCatalog.getEsRestClient());
esTable.setUserName(esCatalog.getUsername());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index c5fa7b9875..87d1cccf5d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -19,15 +19,18 @@ package org.apache.doris.catalog.external;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DatabaseProperty;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang.NotImplementedException;
@@ -87,7 +90,8 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>,
this.extCatalog = extCatalog;
}
- public void setTableExtCatalog(ExternalCatalog extCatalog) {}
+ public void setTableExtCatalog(ExternalCatalog extCatalog) {
+ }
public void setUnInitialized() {
this.initialized = false;
@@ -97,7 +101,26 @@ public class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>,
return initialized;
}
- public void makeSureInitialized() {}
+ public final synchronized void makeSureInitialized() {
+ if (!initialized) {
+ if (!Env.getCurrentEnv().isMaster()) {
+ // Forward to master and wait the journal to replay.
+ MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
+ try {
+ remoteExecutor.forward(extCatalog.getId(), id, -1);
+ } catch (Exception e) {
+ Util.logAndThrowRuntimeException(LOG,
+ String.format("failed to forward init external db
%s operation to master", name), e);
+ }
+ return;
+ }
+ init();
+ }
+ }
+
+ protected void init() {
+ throw new NotImplementedException();
+ }
public T getTableForReplay(long tableId) {
throw new NotImplementedException();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index f713de7419..36189c9a00 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -19,12 +19,14 @@ package org.apache.doris.catalog.external;
import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TTableDescriptor;
@@ -56,26 +58,20 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
protected String name;
@SerializedName(value = "type")
protected TableType type = null;
- @SerializedName(value = "fullSchema")
- protected volatile List<Column> fullSchema = null;
- @SerializedName(value = "initialized")
- protected boolean initialized = false;
@SerializedName(value = "timestamp")
protected long timestamp;
-
- protected ExternalCatalog catalog;
@SerializedName(value = "dbName")
protected String dbName;
+
protected boolean objectCreated = false;
+ protected ExternalCatalog catalog;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
/**
* No args constructor for persist.
*/
public ExternalTable() {
- this.initialized = false;
this.objectCreated = false;
- this.fullSchema = null;
}
/**
@@ -93,9 +89,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
this.catalog = catalog;
this.dbName = dbName;
this.type = type;
- this.initialized = false;
this.objectCreated = false;
- this.fullSchema = null;
}
public void setCatalog(ExternalCatalog catalog) {
@@ -106,17 +100,10 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
return false;
}
- public void setUnInitialized() {
- this.initialized = false;
- }
-
- public void replayInitTable(List<Column> schema) {
- fullSchema = schema;
- initialized = true;
+ public void makeSureInitialized() {
+ throw new NotImplementedException();
}
- public void makeSureInitialized() {}
-
@Override
public void readLock() {
this.rwLock.readLock().lock();
@@ -226,27 +213,35 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
@Override
public List<Column> getFullSchema() {
- throw new NotImplementedException();
+ makeSureInitialized();
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ return cache.getSchema(dbName, name);
}
@Override
public List<Column> getBaseSchema() {
- throw new NotImplementedException();
+ return getFullSchema();
}
@Override
public List<Column> getBaseSchema(boolean full) {
- throw new NotImplementedException();
+ return getFullSchema();
}
+
@Override
public void setNewFullSchema(List<Column> newSchema) {
- this.fullSchema = newSchema;
}
@Override
public Column getColumn(String name) {
- throw new NotImplementedException();
+ List<Column> schema = getFullSchema();
+ for (Column column : schema) {
+ if (name.equals(column.getName())) {
+ return column;
+ }
+ }
+ return null;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index ee7c589019..decef86caa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -19,12 +19,10 @@ package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.MasterCatalogExecutor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -68,7 +66,6 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> impl
Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
HMSExternalTable table =
getTableForReplay(log.getRefreshTableIds().get(i));
- table.setUnInitialized();
tmpTableNameToId.put(table.getName(), table.getId());
tmpIdToTbl.put(table.getId(), table);
}
@@ -89,24 +86,8 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> impl
}
}
- public synchronized void makeSureInitialized() {
- if (!initialized) {
- if (!Env.getCurrentEnv().isMaster()) {
- // Forward to master and wait the journal to replay.
- MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
- try {
- remoteExecutor.forward(extCatalog.getId(), id, -1);
- } catch (Exception e) {
- Util.logAndThrowRuntimeException(LOG,
- String.format("failed to forward init external db
%s operation to master", name), e);
- }
- return;
- }
- init();
- }
- }
-
- private void init() {
+ @Override
+ protected void init() {
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
initDatabaseLog.setCatalogId(extCatalog.getId());
@@ -121,7 +102,6 @@ public class HMSExternalDatabase extends
ExternalDatabase<HMSExternalTable> impl
tblId = tableNameToId.get(tableName);
tmpTableNameToId.put(tableName, tblId);
HMSExternalTable table = idToTbl.get(tblId);
- table.setUnInitialized();
tmpIdToTbl.put(tblId, table);
initDatabaseLog.addRefreshTable(tblId);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 3d8b2e5778..6e12f7e30a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,15 +18,9 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
-import org.apache.doris.datasource.InitTableLog;
-import org.apache.doris.datasource.PooledHiveMetaStoreClient;
-import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -34,9 +28,6 @@ import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -44,6 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Hive metastore external table.
@@ -61,6 +53,8 @@ public class HMSExternalTable extends ExternalTable {
}
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable =
null;
+ private List<Column> partitionColumns;
+
private DLAType dlaType = DLAType.UNKNOWN;
public enum DLAType {
@@ -106,21 +100,6 @@ public class HMSExternalTable extends ExternalTable {
}
objectCreated = true;
}
- if (!initialized) {
- if (!Env.getCurrentEnv().isMaster()) {
- fullSchema = null;
- // Forward to master and wait the journal to replay.
- MasterCatalogExecutor remoteExecutor = new
MasterCatalogExecutor();
- try {
- remoteExecutor.forward(catalog.getId(),
catalog.getDbNullable(dbName).getId(), id);
- } catch (Exception e) {
- Util.logAndThrowRuntimeException(LOG,
- String.format("failed to forward init external
table %s operation to master", name), e);
- }
- return;
- }
- init();
- }
}
/**
@@ -160,50 +139,10 @@ public class HMSExternalTable extends ExternalTable {
* Now we only support three file input format hive tables:
parquet/orc/text. And they must be managed_table.
*/
private boolean supportedHiveTable() {
- // boolean isManagedTable =
remoteTable.getTableType().equalsIgnoreCase("MANAGED_TABLE");
- // TODO: try to support EXTERNAL_TABLE
- boolean isManagedTable = true;
String inputFileFormat = remoteTable.getSd().getInputFormat();
boolean supportedFileFormat = inputFileFormat != null &&
SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
LOG.debug("hms table {} is {} with file format: {}", name,
remoteTable.getTableType(), inputFileFormat);
- return isManagedTable && supportedFileFormat;
- }
-
- private void init() {
- boolean schemaChanged = false;
- List<Column> tmpSchema = Lists.newArrayList();
- if (dlaType.equals(DLAType.UNKNOWN)) {
- schemaChanged = true;
- } else {
- List<FieldSchema> schema = ((HMSExternalCatalog)
catalog).getClient().getSchema(dbName, name);
- for (FieldSchema field : schema) {
- int columnId = (int) Env.getCurrentEnv().getNextId();
- tmpSchema.add(new Column(field.getName(),
-
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
- true, null, field.getComment(), true, null, columnId));
- }
- if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
- schemaChanged = true;
- } else {
- for (int i = 0; i < fullSchema.size(); i++) {
- if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
- schemaChanged = true;
- break;
- }
- }
- }
- }
- if (schemaChanged) {
- timestamp = System.currentTimeMillis();
- fullSchema = tmpSchema;
- }
- initialized = true;
- InitTableLog initTableLog = new InitTableLog();
- initTableLog.setCatalogId(catalog.getId());
- initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
- initTableLog.setTableId(id);
- initTableLog.setSchema(fullSchema);
- Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
+ return supportedFileFormat;
}
/**
@@ -220,36 +159,25 @@ public class HMSExternalTable extends ExternalTable {
return remoteTable;
}
- @Override
- public boolean isView() {
- return remoteTable.isSetViewOriginalText() ||
remoteTable.isSetViewExpandedText();
- }
-
- @Override
- public List<Column> getFullSchema() {
+ public List<Type> getPartitionColumnTypes() {
makeSureInitialized();
- return fullSchema;
+ initPartitionColumns();
+ return partitionColumns.stream().map(c ->
c.getType()).collect(Collectors.toList());
}
- @Override
- public List<Column> getBaseSchema() {
- return getFullSchema();
+ public List<Column> getPartitionColumns() {
+ makeSureInitialized();
+ initPartitionColumns();
+ return partitionColumns;
}
- @Override
- public List<Column> getBaseSchema(boolean full) {
- return getFullSchema();
+ public List<String> getPartitionColumnNames() {
+ return getPartitionColumns().stream().map(c ->
c.getName()).collect(Collectors.toList());
}
@Override
- public Column getColumn(String name) {
- makeSureInitialized();
- for (Column column : fullSchema) {
- if (name.equals(column.getName())) {
- return column;
- }
- }
- return null;
+ public boolean isView() {
+ return remoteTable.isSetViewOriginalText() ||
remoteTable.isSetViewExpandedText();
}
@Override
@@ -321,8 +249,9 @@ public class HMSExternalTable extends ExternalTable {
@Override
public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
- TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, fullSchema.size(), 0,
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
tTableDescriptor.setHiveTable(tHiveTable);
return tTableDescriptor;
@@ -340,12 +269,29 @@ public class HMSExternalTable extends ExternalTable {
return catalog.getCatalogProperty().getS3Properties();
}
- public List<Partition> getHivePartitions(ExprNodeGenericFuncDesc
hivePartitionPredicate) throws DdlException {
- List<Partition> hivePartitions = Lists.newArrayList();
- PooledHiveMetaStoreClient client = ((HMSExternalCatalog)
catalog).getClient();
- client.listPartitionsByExpr(remoteTable.getDbName(),
remoteTable.getTableName(),
-
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
hivePartitions);
- return hivePartitions;
+ private void initPartitionColumns() {
+ if (partitionColumns != null) {
+ return;
+ }
+ synchronized (this) {
+ if (partitionColumns != null) {
+ return;
+ }
+ Set<String> partitionKeys =
remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
+ .collect(Collectors.toSet());
+ partitionColumns =
Lists.newArrayListWithCapacity(partitionKeys.size());
+ for (String partitionKey : partitionKeys) {
+ // Do not use "getColumn()", which will cause dead loop
+ List<Column> schema = getFullSchema();
+ for (Column column : schema) {
+ if (partitionKey.equals(column.getName())) {
+ partitionColumns.add(column);
+ break;
+ }
+ }
+ }
+ LOG.debug("get {} partition columns for table: {}",
partitionColumns.size(), name);
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e22913fe52..f0c02c879c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1888,5 +1888,33 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static long max_replica_count_when_schema_change = 100000;
+
+ /**
+ * Max cache num of hive partition.
+ * Decrease this value if FE's memory is small
+ */
+ @ConfField(mutable = false, masterOnly = false)
+ public static long max_hive_partition_cache_num = 100000;
+
+ /**
+ * Max cache num of external catalog's file
+ * Decrease this value if FE's memory is small
+ */
+ @ConfField(mutable = false, masterOnly = false)
+ public static long max_external_file_cache_num = 100000;
+
+ /**
+ * Max cache num of external table's schema
+ * Decrease this value if FE's memory is small
+ */
+ @ConfField(mutable = false, masterOnly = false)
+ public static long max_external_schema_cache_num = 10000;
+
+ /**
+ * The expiration time of a cache object after last access of it.
+ * For external schema cache and hive meta cache.
+ */
+ @ConfField(mutable = false, masterOnly = false)
+ public static long external_cache_expire_time_minutes_after_access = 24 *
60; // 1 day
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java
new file mode 100644
index 0000000000..bbb5ec80dc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+public class CacheException extends RuntimeException {
+ public CacheException(String format, Throwable cause, Object... msg) {
+ super(String.format(format, msg), cause);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index e1687b0f7f..39ec822f6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -95,6 +95,7 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
CatalogIf catalog = idToCatalog.remove(catalogId);
if (catalog != null) {
nameToCatalog.remove(catalog.getName());
+
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getName());
}
return catalog;
}
@@ -423,16 +424,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
db.replayInitDb(log, catalog);
}
- public void replayInitExternalTable(InitTableLog log) {
- ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
- Preconditions.checkArgument(catalog != null);
- ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
- Preconditions.checkArgument(db != null);
- ExternalTable table = db.getTableForReplay(log.getTableId());
- Preconditions.checkArgument(table != null);
- table.replayInitTable(log.getSchema());
- }
-
public void replayRefreshExternalDb(ExternalObjectLog log) {
writeLock();
try {
@@ -445,15 +436,11 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
}
public void replayRefreshExternalTable(ExternalObjectLog log) {
- writeLock();
- try {
- ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
- ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
- ExternalTable table = db.getTableForReplay(log.getTableId());
- table.setUnInitialized();
- } finally {
- writeUnlock();
- }
+ ExternalCatalog catalog = (ExternalCatalog)
idToCatalog.get(log.getCatalogId());
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog)
+ .invalidateCache(db.getFullName(), table.getName());
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index 29123395ed..c5a410eb3b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
@@ -190,4 +191,10 @@ public class EsExternalCatalog extends ExternalCatalog {
super.gsonPostProcess();
setProperties(this.catalogProperty.getProperties());
}
+
+ @Override
+ public List<Column> getSchema(String dbName, String tblName) {
+ makeSureInitialized();
+ return EsUtil.genColumnsFromEs(getEsRestClient(), tblName, null);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index ff82a4dda0..0663fed2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
@@ -62,14 +63,14 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
protected CatalogProperty catalogProperty = new CatalogProperty();
@SerializedName(value = "initialized")
private boolean initialized = false;
-
- // Cache of db name to db id
@SerializedName(value = "idToDb")
protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
// db name does not contains "default_cluster"
protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
+ private ExternalSchemaCache schemaCache;
+
/**
* @return names of database in this catalog.
*/
@@ -135,6 +136,8 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
throw new NotImplementedException();
}
+ public abstract List<Column> getSchema(String dbName, String tblName);
+
@Override
public long getId() {
return id;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
new file mode 100644
index 0000000000..eb9123f814
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Cache meta of external catalog
+ * 1. Meta for hive meta store, mainly for partition.
+ * 2. Table Schema cahce.
+ */
+public class ExternalMetaCacheMgr {
+ private static final Logger LOG =
LogManager.getLogger(ExternalMetaCacheMgr.class);
+
+ // catalog id -> HiveMetaStoreCache
+ private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
+ // catalog id -> table schema cache
+ private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
+ private Executor executor;
+
+ public ExternalMetaCacheMgr() {
+ executor = ThreadPoolManager.newDaemonCacheThreadPool(10,
"ExternalMetaCacheMgr", false);
+ }
+
+ public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
+ HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
+ if (cache == null) {
+ synchronized (cacheMap) {
+ if (!cacheMap.containsKey(catalog.getId())) {
+ cacheMap.put(catalog.getId(), new
HiveMetaStoreCache(catalog, executor));
+ }
+ cache = cacheMap.get(catalog.getId());
+ }
+ }
+ return cache;
+ }
+
+ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
+ ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
+ if (cache == null) {
+ synchronized (schemaCacheMap) {
+ if (!schemaCacheMap.containsKey(catalog.getId())) {
+ schemaCacheMap.put(catalog.getId(), new
ExternalSchemaCache(catalog, executor));
+ }
+ cache = schemaCacheMap.get(catalog.getId());
+ }
+ }
+ return cache;
+ }
+
+ public void removeCache(String catalogId) {
+ if (cacheMap.remove(catalogId) != null) {
+ LOG.info("remove hive metastore cache for catalog {}" + catalogId);
+ }
+ if (schemaCacheMap.remove(catalogId) != null) {
+ LOG.info("remove schema cache for catalog {}" + catalogId);
+ }
+ }
+
+ public void removeCache(long catalogId, String dbName, String tblName) {
+ ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
+ if (schemaCache != null) {
+ schemaCache.invalidateCache(dbName, tblName);
+ LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName,
tblName, catalogId);
+ }
+ HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+ if (metaCache != null) {
+ metaCache.invalidateCache(dbName, tblName);
+ LOG.debug("invalid meta cache for {}.{} in catalog {}", dbName,
tblName, catalogId);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
new file mode 100644
index 0000000000..718525ae6a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.Config;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import lombok.Data;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+// The schema cache for external table
+public class ExternalSchemaCache {
+ private static final Logger LOG =
LogManager.getLogger(ExternalSchemaCache.class);
+ private ExternalCatalog catalog;
+
+ private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
+
+ public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) {
+ this.catalog = catalog;
+ init(executor);
+ initMetrics();
+ }
+
+ private void init(Executor executor) {
+ schemaCache =
CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build(CacheLoader.asyncReloading(new
CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
+ @Override
+ public ImmutableList<Column> load(SchemaCacheKey key)
throws Exception {
+ return loadSchema(key);
+ }
+ }, executor));
+ }
+
+ private void initMetrics() {
+ // schema cache
+ GaugeMetric<Long> schemaCacheGauge = new
GaugeMetric<Long>("external_schema_cache",
+ Metric.MetricUnit.NOUNIT, "external schema cache number") {
+ @Override
+ public Long getValue() {
+ return schemaCache.size();
+ }
+ };
+ schemaCacheGauge.addLabel(new MetricLabel("catalog",
catalog.getName()));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge);
+ }
+
+ private ImmutableList<Column> loadSchema(SchemaCacheKey key) {
+ ImmutableList<Column> schema =
ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("load schema for {} in catalog {}", key,
catalog.getName());
+ }
+ return schema;
+ }
+
+ public List<Column> getSchema(String dbName, String tblName) {
+ SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+ try {
+ return schemaCache.get(key);
+ } catch (ExecutionException e) {
+ throw new CacheException("failed to get schema for %s in catalog
%s", e, key, catalog.getName());
+ }
+ }
+
+ public void invalidateCache(String dbName, String tblName) {
+ SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+ schemaCache.invalidate(key);
+ }
+
+ @Data
+ public static class SchemaCacheKey {
+ private String dbName;
+ private String tblName;
+
+ public SchemaCacheKey(String dbName, String tblName) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof SchemaCacheKey)) {
+ return false;
+ }
+ return dbName.equals(((SchemaCacheKey) obj).dbName) &&
tblName.equals(((SchemaCacheKey) obj).tblName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tblName);
+ }
+
+ @Override
+ public String toString() {
+ return "SchemaCacheKey{" + "dbName='" + dbName + '\'' + ",
tblName='" + tblName + '\'' + '}';
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java
new file mode 100644
index 0000000000..fa2652b867
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+public class HMSClientException extends RuntimeException {
+ public HMSClientException(String format, Throwable cause, Object... msg) {
+ super(String.format(format, msg), cause);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index f348e93787..fdb2ef2981 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
@@ -26,6 +27,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
@@ -159,4 +161,17 @@ public class HMSExternalCatalog extends ExternalCatalog {
makeSureInitialized();
return client;
}
+
+ @Override
+ public List<Column> getSchema(String dbName, String tblName) {
+ makeSureInitialized();
+ List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
+ List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
+ for (FieldSchema field : schema) {
+ tmpSchema.add(new Column(field.getName(),
+
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+ true, null, field.getComment(), true, null, -1));
+ }
+ return tmpSchema;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
index c2f3567f56..be7e54eba1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -40,13 +40,15 @@ import java.util.List;
import java.util.Queue;
/**
- * A hive metastore client pool for a specific hive conf.
+ * A hive metastore client pool for a specific catalog with hive configuration.
*/
public class PooledHiveMetaStoreClient {
private static final Logger LOG =
LogManager.getLogger(PooledHiveMetaStoreClient.class);
- private Queue<CachedClient> clientPool = new LinkedList<>();
private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
+ private static final short MAX_LIST_PARTITION_NUM = 10000;
+
+ private Queue<CachedClient> clientPool = new LinkedList<>();
private final int poolSize;
private final HiveConf hiveConf;
@@ -62,7 +64,7 @@ public class PooledHiveMetaStoreClient {
try (CachedClient client = getClient()) {
return client.client.getAllDatabases();
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to get all database from hms
client", e);
}
}
@@ -70,7 +72,7 @@ public class PooledHiveMetaStoreClient {
try (CachedClient client = getClient()) {
return client.client.getAllTables(dbName);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to get all tables for db %s",
e, dbName);
}
}
@@ -78,17 +80,33 @@ public class PooledHiveMetaStoreClient {
try (CachedClient client = getClient()) {
return client.client.tableExists(dbName, tblName);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to check if table %s in db %s
exists", e, tblName, dbName);
+ }
+ }
+
+ public List<String> listPartitionNames(String dbName, String tblName) {
+ try (CachedClient client = getClient()) {
+ return client.client.listPartitionNames(dbName, tblName,
MAX_LIST_PARTITION_NUM);
+ } catch (Exception e) {
+ throw new HMSClientException("failed to list partition names for
table %s in db %s", e, tblName, dbName);
+ }
+ }
+
+ public Partition getPartition(String dbName, String tblName, List<String>
partitionValues) {
+ try (CachedClient client = getClient()) {
+ return client.client.getPartition(dbName, tblName,
partitionValues);
+ } catch (Exception e) {
+ throw new HMSClientException("failed to get partition for table %s
in db %s with value %s", e, tblName,
+ dbName, partitionValues);
}
}
- public boolean listPartitionsByExpr(String dbName, String tblName,
- byte[] partitionPredicatesInBytes, List<Partition> hivePartitions)
{
+ public List<Partition> getPartitionsByFilter(String dbName, String
tblName, String filter) {
try (CachedClient client = getClient()) {
- return client.client.listPartitionsByExpr(dbName, tblName,
partitionPredicatesInBytes,
- null, (short) -1, hivePartitions);
+ return client.client.listPartitionsByFilter(dbName, tblName,
filter, MAX_LIST_PARTITION_NUM);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to get partition by filter
for table %s in db %s", e, tblName,
+ dbName);
}
}
@@ -96,7 +114,7 @@ public class PooledHiveMetaStoreClient {
try (CachedClient client = getClient()) {
return client.client.getTable(dbName, tblName);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to get table %s in db %s from
hms client", e, tblName, dbName);
}
}
@@ -104,7 +122,7 @@ public class PooledHiveMetaStoreClient {
try (CachedClient client = getClient()) {
return client.client.getSchema(dbName, tblName);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new HMSClientException("failed to get schema for table %s in
db %s", e, tblName, dbName);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
new file mode 100644
index 0000000000..388c847559
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import lombok.Data;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+// The cache of a hms catalog. 3 kind of caches:
+// 1. partitionValuesCache: cache the partition values of a table, for
partition prune.
+// 2. partitionCache: cache the partition info(location, input format, etc.)
of a table.
+// 3. fileCache: cache the files of a location.
+public class HiveMetaStoreCache {
+ private static final Logger LOG =
LogManager.getLogger(HiveMetaStoreCache.class);
+ private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
+
+ private HMSExternalCatalog catalog;
+
+ // cache from <dbname-tblname> -> <values of partitions>
+ private LoadingCache<PartitionValueCacheKey,
ImmutableList<ListPartitionItem>> partitionValuesCache;
+ // cache from <dbname-tblname-partition_values> -> <partition info>
+ private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
+ // cache from <location> -> <file list>
+ private LoadingCache<FileCacheKey, ImmutableList<InputSplit>> fileCache;
+
+ public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
+ this.catalog = catalog;
+ init(executor);
+ initMetrics();
+ }
+
+ private void init(Executor executor) {
+ partitionValuesCache =
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build(CacheLoader.asyncReloading(
+ new CacheLoader<PartitionValueCacheKey,
ImmutableList<ListPartitionItem>>() {
+ @Override
+ public ImmutableList<ListPartitionItem>
load(PartitionValueCacheKey key) throws Exception {
+ return loadPartitionValues(key);
+ }
+ }, executor));
+
+ partitionCache =
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build(CacheLoader.asyncReloading(new
CacheLoader<PartitionCacheKey, HivePartition>() {
+ @Override
+ public HivePartition load(PartitionCacheKey key) throws
Exception {
+ return loadPartitions(key);
+ }
+ }, executor));
+
+ fileCache =
CacheBuilder.newBuilder().maximumSize(Config.max_external_file_cache_num)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build(CacheLoader.asyncReloading(new
CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() {
+ @Override
+ public ImmutableList<InputSplit> load(FileCacheKey key)
throws Exception {
+ return loadFiles(key);
+ }
+ }, executor));
+ }
+
+ private void initMetrics() {
+ // partition value
+ GaugeMetric<Long> valueCacheGauge = new
GaugeMetric<Long>("hive_meta_cache",
+ Metric.MetricUnit.NOUNIT, "hive partition value cache number")
{
+ @Override
+ public Long getValue() {
+ return partitionValuesCache.size();
+ }
+ };
+ valueCacheGauge.addLabel(new MetricLabel("type", "partition_value"));
+ valueCacheGauge.addLabel(new MetricLabel("catalog",
catalog.getName()));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(valueCacheGauge);
+ // partition
+ GaugeMetric<Long> partitionCacheGauge = new
GaugeMetric<Long>("hive_meta_cache",
+ Metric.MetricUnit.NOUNIT, "hive partition cache number") {
+ @Override
+ public Long getValue() {
+ return partitionCache.size();
+ }
+ };
+ partitionCacheGauge.addLabel(new MetricLabel("type", "partition"));
+ partitionCacheGauge.addLabel(new MetricLabel("catalog",
catalog.getName()));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(partitionCacheGauge);
+ // file
+ GaugeMetric<Long> fileCacheGauge = new
GaugeMetric<Long>("hive_meta_cache",
+ Metric.MetricUnit.NOUNIT, "hive file cache number") {
+ @Override
+ public Long getValue() {
+ return fileCache.size();
+ }
+ };
+ fileCacheGauge.addLabel(new MetricLabel("type", "file"));
+ fileCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName()));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge);
+ }
+
+ private ImmutableList<ListPartitionItem>
loadPartitionValues(PartitionValueCacheKey key) {
+ // partition name format: nation=cn/city=beijing
+ List<String> partitionNames =
catalog.getClient().listPartitionNames(key.dbName, key.tblName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("load #{} partitions for {} in catalog {}",
partitionNames.size(), key, catalog.getName());
+ }
+ List<ListPartitionItem> partitionValues =
Lists.newArrayListWithExpectedSize(partitionNames.size());
+ for (String partitionName : partitionNames) {
+ partitionValues.add(toListPartitionItem(partitionName, key.types));
+ }
+ return ImmutableList.copyOf(partitionValues);
+ }
+
+ private ListPartitionItem toListPartitionItem(String partitionName,
List<Type> types) {
+ // Partition name will be in format: nation=cn/city=beijing
+ // parse it to get values "cn" and "beijing"
+ String[] parts = partitionName.split("/");
+ Preconditions.checkState(parts.length == types.size(), partitionName +
" vs. " + types);
+ List<PartitionValue> values =
Lists.newArrayListWithExpectedSize(types.size());
+ for (String part : parts) {
+ String[] kv = part.split("=");
+ Preconditions.checkState(kv.length == 2, partitionName);
+ values.add(new PartitionValue(kv[1]));
+ }
+ try {
+ PartitionKey key =
PartitionKey.createListPartitionKeyWithTypes(values, types);
+ return new ListPartitionItem(Lists.newArrayList(key));
+ } catch (AnalysisException e) {
+ throw new CacheException("failed to convert hive partition %s to
list partition in catalog %s",
+ e, partitionName, catalog.getName());
+ }
+ }
+
+ private HivePartition loadPartitions(PartitionCacheKey key) {
+ Partition partition = catalog.getClient().getPartition(key.dbName,
key.tblName, key.values);
+ StorageDescriptor sd = partition.getSd();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("load partition format: {}, location: {} for {} in
catalog {}",
+ sd.getInputFormat(), sd.getLocation(), key,
catalog.getName());
+ }
+ // TODO: more info?
+ return new HivePartition(sd.getInputFormat(), sd.getLocation(),
key.values);
+ }
+
+ private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
+ String finalLocation = convertToS3IfNecessary(key.location);
+ Configuration conf = getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ // For Tez engine, it may generate subdirectories for "union" query.
+ // So there may be files and directories in the table directory at the
same time. eg:
+ // /user/hive/warehouse/region_tmp_union_all2/000000_0
+ // /user/hive/warehouse/region_tmp_union_all2/1
+ // /user/hive/warehouse/region_tmp_union_all2/2
+ // So we need to set this config to support visit dir recursively.
+ // Otherwise, getSplits() may throw exception: "Not a file xxx"
+ //
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
+ jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive",
"true");
+ FileInputFormat.setInputPaths(jobConf, finalLocation);
+ try {
+ InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(conf,
key.inputFormat, false);
+ InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("load #{} files for {} in catalog {}",
splits.length, key, catalog.getName());
+ }
+ return ImmutableList.copyOf(splits);
+ } catch (Exception e) {
+ throw new CacheException("failed to get input splits for %s in
catalog %s", e, key, catalog.getName());
+ }
+ }
+
+ // convert oss:// to s3://
+ private String convertToS3IfNecessary(String location) {
+ LOG.debug("try convert location to s3 prefix: " + location);
+ if (location.startsWith(FeConstants.FS_PREFIX_COS)
+ || location.startsWith(FeConstants.FS_PREFIX_BOS)
+ || location.startsWith(FeConstants.FS_PREFIX_BOS)
+ || location.startsWith(FeConstants.FS_PREFIX_OSS)
+ || location.startsWith(FeConstants.FS_PREFIX_S3A)
+ || location.startsWith(FeConstants.FS_PREFIX_S3N)) {
+ int pos = location.indexOf("://");
+ if (pos == -1) {
+ throw new RuntimeException("No '://' found in location: " +
location);
+ }
+ return "s3" + location.substring(pos);
+ }
+ return location;
+ }
+
+ private Configuration getConfiguration() {
+ Configuration configuration = new HdfsConfiguration();
+ for (Map.Entry<String, String> entry :
catalog.getCatalogProperty().getProperties().entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ Map<String, String> s3Properties =
catalog.getCatalogProperty().getS3Properties();
+ for (Map.Entry<String, String> entry : s3Properties.entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return configuration;
+ }
+
+ public ImmutableList<ListPartitionItem> getPartitionValues(String dbName,
String tblName, List<Type> types) {
+ PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, types);
+ try {
+ return partitionValuesCache.get(key);
+ } catch (ExecutionException e) {
+ throw new CacheException("failed to get partition values for %s in
catalog %s", e, key, catalog.getName());
+ }
+ }
+
+ public List<InputSplit> getFilesByPartitions(List<HivePartition>
partitions) {
+ long start = System.currentTimeMillis();
+ List<FileCacheKey> keys =
Lists.newArrayListWithExpectedSize(partitions.size());
+ partitions.stream().forEach(p -> keys.add(new
FileCacheKey(p.getPath(), p.getInputFormat())));
+
+ Stream<FileCacheKey> stream;
+ if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
+ stream = keys.stream();
+ } else {
+ stream = keys.parallelStream();
+ }
+ List<ImmutableList<InputSplit>> fileLists = stream.map(k -> {
+ try {
+ return fileCache.get(k);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize(
+ fileLists.stream().mapToInt(l -> l.size()).sum());
+ fileLists.stream().forEach(l -> retFiles.addAll(l));
+ LOG.debug("get #{} files from #{} partitions in catalog {} cost: {}
ms",
+ retFiles.size(), partitions.size(), catalog.getName(),
(System.currentTimeMillis() - start));
+ return retFiles;
+ }
+
+ public List<HivePartition> getAllPartitions(String dbName, String name,
List<List<String>> partitionValuesList) {
+ long start = System.currentTimeMillis();
+ List<PartitionCacheKey> keys =
Lists.newArrayListWithExpectedSize(partitionValuesList.size());
+ partitionValuesList.stream().forEach(p -> keys.add(new
PartitionCacheKey(dbName, name, p)));
+
+ Stream<PartitionCacheKey> stream;
+ if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
+ stream = keys.stream();
+ } else {
+ stream = keys.parallelStream();
+ }
+ List<HivePartition> partitions = stream.map(k -> {
+ try {
+ return partitionCache.get(k);
+ } catch (ExecutionException e) {
+ throw new CacheException("failed to get partition for %s in
catalog %s", e, k, catalog.getName());
+ }
+ }).collect(Collectors.toList());
+ LOG.debug("get #{} partitions in catalog {} cost: {} ms",
partitions.size(), catalog.getName(),
+ (System.currentTimeMillis() - start));
+ return partitions;
+ }
+
+ public void invalidateCache(String dbName, String tblName) {
+ PartitionValueCacheKey key = new PartitionValueCacheKey(dbName,
tblName, null);
+ partitionValuesCache.invalidate(key);
+ // TODO: find a way to invalidate partitionCache and fileCache
+ }
+
+ /**
+ * The Key of hive partition value cache
+ */
+ @Data
+ public static class PartitionValueCacheKey {
+ private String dbName;
+ private String tblName;
+ // not in key
+ private List<Type> types;
+
+ public PartitionValueCacheKey(String dbName, String tblName,
List<Type> types) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.types = types;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof PartitionValueCacheKey)) {
+ return false;
+ }
+ return dbName.equals(((PartitionValueCacheKey) obj).dbName)
+ && tblName.equals(((PartitionValueCacheKey) obj).tblName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tblName);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionValueCacheKey{" + "dbName='" + dbName + '\'' + ",
tblName='" + tblName + '\'' + '}';
+ }
+ }
+
+ @Data
+ public static class PartitionCacheKey {
+ private String dbName;
+ private String tblName;
+ private List<String> values;
+
+ public PartitionCacheKey(String dbName, String tblName, List<String>
values) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.values = values;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof PartitionCacheKey)) {
+ return false;
+ }
+ return dbName.equals(((PartitionCacheKey) obj).dbName)
+ && tblName.equals(((PartitionCacheKey) obj).tblName)
+ && Objects.equals(values, ((PartitionCacheKey)
obj).values);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tblName, values);
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionCacheKey{" + "dbName='" + dbName + '\'' + ",
tblName='" + tblName + '\'' + ", values="
+ + values + '}';
+ }
+ }
+
+ @Data
+ public static class FileCacheKey {
+ private String location;
+ // not in key
+ private String inputFormat;
+
+ public FileCacheKey(String location, String inputFormat) {
+ this.location = location;
+ this.inputFormat = inputFormat;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof FileCacheKey)) {
+ return false;
+ }
+ return location.equals(((FileCacheKey) obj).location);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(location);
+ }
+
+ @Override
+ public String toString() {
+ return "FileCacheKey{" + "location='" + location + '\'' + ",
inputFormat='" + inputFormat + '\'' + '}';
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
new file mode 100644
index 0000000000..e5b5178e75
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class HivePartition {
+ private String inputFormat;
+ private String path;
+ private List<String> partitionValues;
+
+ public HivePartition(String inputFormat, String path, List<String>
partitionValues) {
+ // eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+ this.inputFormat = inputFormat;
+ // eg:
hdfs://hk-dev01:8121/user/doris/parquet/partition_table/nation=cn/city=beijing
+ this.path = path;
+ // eg: cn, beijing
+ this.partitionValues = partitionValues;
+ }
+
+ @Override
+ public String toString() {
+ return "HivePartition{"
+ + "inputFormat='" + inputFormat + '\''
+ + ", path='" + path + '\''
+ + ", partitionValues=" + partitionValues + '}';
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 39bd5e7dfd..c93ea8d14a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -45,7 +45,6 @@ import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
-import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalCursor;
@@ -951,8 +950,7 @@ public class EditLog {
break;
}
case OperationType.OP_INIT_EXTERNAL_TABLE: {
- final InitTableLog log = (InitTableLog) journal.getData();
- env.getCatalogMgr().replayInitExternalTable(log);
+ // Do nothing.
break;
}
default: {
@@ -1626,10 +1624,6 @@ public class EditLog {
logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
}
- public void logInitExternalTable(InitTableLog log) {
- logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log);
- }
-
public Journal getJournal() {
return this.journal;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 82304acb23..6204fc1836 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -243,6 +243,7 @@ public class OperationType {
public static final short OP_REFRESH_EXTERNAL_DB = 326;
public static final short OP_INIT_EXTERNAL_DB = 327;
public static final short OP_REFRESH_EXTERNAL_TABLE = 328;
+ @Deprecated
public static final short OP_INIT_EXTERNAL_TABLE = 329;
// scheduler job and task 330-350
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index c522823622..8523830c66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -141,7 +141,6 @@ public abstract class ScanNode extends PlanNode {
columnNameToRange.put(column.getName(), columnRange);
}
}
-
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 45a7b21747..0cf6661846 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -164,6 +164,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
switch (type) {
case QUERY:
+ // prepare for partition prune
+ computeColumnFilter();
if (this.desc.getTable() instanceof HMSExternalTable) {
HMSExternalTable hmsTable = (HMSExternalTable)
this.desc.getTable();
initHMSExternalTable(hmsTable);
@@ -199,13 +201,13 @@ public class ExternalFileScanNode extends
ExternalScanNode {
FileScanProviderIf scanProvider;
switch (hmsTable.getDlaType()) {
case HUDI:
- scanProvider = new HudiScanProvider(hmsTable, desc);
+ scanProvider = new HudiScanProvider(hmsTable, desc,
columnNameToRange);
break;
case ICEBERG:
- scanProvider = new IcebergScanProvider(hmsTable, desc);
+ scanProvider = new IcebergScanProvider(hmsTable, desc,
columnNameToRange);
break;
case HIVE:
- scanProvider = new HiveScanProvider(hmsTable, desc);
+ scanProvider = new HiveScanProvider(hmsTable, desc,
columnNameToRange);
break;
default:
throw new UserException("Unknown table type: " +
hmsTable.getDlaType());
@@ -511,9 +513,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
- StringBuilder output = new StringBuilder(prefix);
- // output.append(fileTable.getExplainString(prefix));
-
+ StringBuilder output = new StringBuilder();
if (!conjuncts.isEmpty()) {
output.append(prefix).append("predicates:
").append(getExplainString(conjuncts)).append("\n");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index a5e8a7aabc..17e6d3417f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -22,16 +22,23 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.HiveBucketUtil;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
@@ -40,23 +47,20 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -75,9 +79,13 @@ public class HiveScanProvider extends HMSTableScanProvider {
protected final TupleDescriptor desc;
- public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) {
+ protected Map<String, ColumnRange> columnNameToRange;
+
+ public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
this.desc = desc;
+ this.columnNameToRange = columnNameToRange;
}
@Override
@@ -127,80 +135,69 @@ public class HiveScanProvider extends
HMSTableScanProvider {
}
@Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws IOException,
UserException {
- // eg:
- // oss://buckts/data_dir
- // hdfs://hosts/data_dir
- String location = getRemoteHiveTable().getSd().getLocation();
- List<String> partitionKeys =
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName)
- .collect(Collectors.toList());
- List<Partition> hivePartitions = new ArrayList<>();
-
- if (partitionKeys.size() > 0) {
- ExprNodeGenericFuncDesc hivePartitionPredicate =
HiveMetaStoreClientHelper.convertToHivePartitionExpr(exprs,
- partitionKeys, hmsTable.getName());
-
hivePartitions.addAll(hmsTable.getHivePartitions(hivePartitionPredicate));
- }
+ public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
+ long start = System.currentTimeMillis();
+ try {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
+ // 1. get ListPartitionItems from cache
+ ImmutableList<ListPartitionItem> partitionItems;
+ List<Type> partitionColumnTypes =
hmsTable.getPartitionColumnTypes();
+ if (!partitionColumnTypes.isEmpty()) {
+ partitionItems =
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
+ partitionColumnTypes);
+ } else {
+ partitionItems = ImmutableList.of();
+ }
- String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+ List<InputSplit> allFiles = Lists.newArrayList();
+ if (!partitionItems.isEmpty()) {
+ // 2. prune partitions by expr
+ Map<Long, PartitionItem> keyItemMap = Maps.newHashMap();
+ long pid = 0;
+ for (ListPartitionItem partitionItem : partitionItems) {
+ keyItemMap.put(pid++, partitionItem);
+ }
+ ListPartitionPrunerV2 pruner = new
ListPartitionPrunerV2(keyItemMap,
+ hmsTable.getPartitionColumns(), columnNameToRange);
+ Collection<Long> filteredPartitionIds = pruner.prune();
- Configuration configuration = setConfiguration();
- InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration,
inputFormatName, false);
- List<InputSplit> splits;
- if (!hivePartitions.isEmpty()) {
- try {
- splits = hivePartitions.stream().flatMap(x -> {
- try {
- return getSplitsByPath(inputFormat, configuration,
x.getSd().getLocation()).stream();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- } catch (RuntimeException e) {
- throw new IOException(e);
+ // 3. get partitions from cache
+ List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
+ for (Long id : filteredPartitionIds) {
+ ListPartitionItem listPartitionItem = (ListPartitionItem)
keyItemMap.get(id);
+
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
+ }
+ List<HivePartition> partitions =
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
+ partitionValuesList);
+ // 4. get all files of partitions
+ getFileSplitByPartitions(cache, partitions, allFiles);
+ } else {
+ // unpartitioned table, create a dummy partition to save
location and inputformat,
+ // so that we can unify the interface.
+ HivePartition dummyPartition = new
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
+ hmsTable.getRemoteTable().getSd().getLocation(), null);
+ getFileSplitByPartitions(cache,
Lists.newArrayList(dummyPartition), allFiles);
}
- } else {
- splits = getSplitsByPath(inputFormat, configuration, location);
+ LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
+ allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
+ return allFiles;
+ } catch (Throwable t) {
+ LOG.warn("get file split failed for table: {}",
hmsTable.getName(), t);
+ throw new UserException("get file split failed for table: " +
hmsTable.getName(), t);
}
- return HiveBucketUtil.getPrunedSplitsByBuckets(splits,
hmsTable.getName(), exprs,
- getRemoteHiveTable().getSd().getBucketCols(),
getRemoteHiveTable().getSd().getNumBuckets(),
- getRemoteHiveTable().getParameters());
- }
-
- private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat,
Configuration configuration,
- String location) throws IOException {
- String finalLocation = convertToS3IfNecessary(location);
- JobConf jobConf = new JobConf(configuration);
- // For Tez engine, it may generate subdirectoies for "union" query.
- // So there may be files and directories in the table directory at the
same time. eg:
- // /user/hive/warehouse/region_tmp_union_all2/000000_0
- // /user/hive/warehouse/region_tmp_union_all2/1
- // /user/hive/warehouse/region_tmp_union_all2/2
- // So we need to set this config to support visit dir recursively.
- // Otherwise, getSplits() may throw exception: "Not a file xxx"
- //
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
- jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive",
"true");
- FileInputFormat.setInputPaths(jobConf, finalLocation);
- InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
- return Lists.newArrayList(splits);
}
- // convert oss:// to s3://
- private String convertToS3IfNecessary(String location) throws IOException {
- LOG.debug("try convert location to s3 prefix: " + location);
- if (location.startsWith(FeConstants.FS_PREFIX_COS)
- || location.startsWith(FeConstants.FS_PREFIX_BOS)
- || location.startsWith(FeConstants.FS_PREFIX_BOS)
- || location.startsWith(FeConstants.FS_PREFIX_OSS)
- || location.startsWith(FeConstants.FS_PREFIX_S3A)
- || location.startsWith(FeConstants.FS_PREFIX_S3N)) {
- int pos = location.indexOf("://");
- if (pos == -1) {
- throw new IOException("No '://' found in location: " +
location);
- }
- return "s3" + location.substring(pos);
+ private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
+ List<InputSplit> allFiles) {
+ List<InputSplit> files = cache.getFilesByPartitions(partitions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get #{} files from #{} partitions: {}: {}",
files.size(), partitions.size(),
+ Joiner.on(",")
+ .join(files.stream().limit(10).map(f ->
((FileSplit) f).getPath())
+ .collect(Collectors.toList())));
}
- return location;
+ allFiles.addAll(files);
}
protected Configuration setConfiguration() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
index d4c95b2549..59274ec521 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
@@ -21,10 +21,12 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileFormatType;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* A file scan provider for hudi.
@@ -32,8 +34,9 @@ import java.util.List;
*/
public class HudiScanProvider extends HiveScanProvider {
- public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) {
- super(hmsTable, desc);
+ public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ super(hmsTable, desc, columnNameToRange);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index f82e277f41..37a5266559 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,6 @@ import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -48,8 +48,9 @@ import java.util.Map;
*/
public class IcebergScanProvider extends HiveScanProvider {
- public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor
desc) {
- super(hmsTable, desc);
+ public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
+ super(hmsTable, desc, columnNameToRange);
}
@Override
@@ -69,7 +70,7 @@ public class IcebergScanProvider extends HiveScanProvider {
}
@Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws IOException,
UserException {
+ public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
List<Expression> expressions = new ArrayList<>();
for (Expr conjunct : exprs) {
Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index eae1829603..561b2de316 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -39,8 +39,8 @@ import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Joiner;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
@@ -56,6 +56,7 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
@Override
public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException
{
+ long start = System.currentTimeMillis();
try {
List<InputSplit> inputSplits = getSplits(context.conjuncts);
this.inputSplitNum = inputSplits.size();
@@ -100,10 +101,9 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- LOG.info(
- "Assign to backend " +
curLocations.getLocations().get(0).getBackendId() + " with table split: "
- + fileSplit.getPath() + " ( " +
fileSplit.getStart() + "," + fileSplit.getLength() + ")"
- + " loaction: " +
Joiner.on("|").join(split.getLocations()));
+ LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
+ curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
+ fileSplit.getLength(),
Joiner.on("|").join(split.getLocations()));
fileSplitStrategy.update(fileSplit);
// Add a new location when it's can be split
@@ -117,6 +117,8 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
if
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
> 0) {
scanRangeLocations.add(curLocations);
}
+ LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+ scanRangeLocations.size(), (System.currentTimeMillis() -
start));
} catch (IOException e) {
throw new UserException(e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 58a8e10478..c22cb0c2d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -583,7 +583,7 @@ public class StmtExecutor implements ProfileWriter {
throw e;
} catch (UserException e) {
// analysis exception only print message, not print the stack
- LOG.warn("execute Exception. {}, {}",
context.getQueryIdentifier(), e.getMessage(), e);
+ LOG.warn("execute Exception. {}, {}",
context.getQueryIdentifier(), e.getMessage());
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]