This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c8c502f75a [fix](iceberg)Different catalogs should use different client pools (#46694) 4c8c502f75a is described below commit 4c8c502f75ada9789225bbf85181e8e7ca8ccc28 Author: wuwenchi <wuwen...@selectdb.com> AuthorDate: Fri Jan 10 14:09:55 2025 +0800 [fix](iceberg)Different catalogs should use different client pools (#46694) ### What problem does this PR solve? Problem Summary: 1. This cached client pool should belong to the catalog level, each catalog has its own pool. Otherwise, different dlf configurations will be connected to the same dlf. 2. The catalog name should be used to initialize the catalog, not the uid in the dlf. --- .../datasource/iceberg/HiveCompatibleCatalog.java | 4 +-- .../iceberg/IcebergDLFExternalCatalog.java | 6 ++-- .../iceberg/IcebergGlueExternalCatalog.java | 6 ---- .../doris/datasource/iceberg/dlf/DLFCatalog.java | 2 +- .../iceberg/dlf/client/DLFCachedClientPool.java | 19 ++++------ .../dlf/client/IcebergDLFExternalCatalogTest.java | 41 ++++++++++++++++++++++ 6 files changed, 53 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java index 6431b02308b..49123d2b8f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/HiveCompatibleCatalog.java @@ -45,11 +45,11 @@ public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog impleme protected Configuration conf; protected ClientPool<IMetaStoreClient, TException> clients; protected FileIO fileIO; - protected String uid; + protected String catalogName; public void initialize(String name, FileIO fileIO, ClientPool<IMetaStoreClient, TException> clients) { - this.uid = name; + this.catalogName = name; this.fileIO = fileIO; this.clients = clients; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java index e4d8b2f55c4..d3f192754ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java @@ -22,8 +22,6 @@ import org.apache.doris.datasource.iceberg.dlf.DLFCatalog; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; -import com.aliyun.datalake.metastore.common.DataLakeConfig; - import java.util.Map; public class IcebergDLFExternalCatalog extends IcebergExternalCatalog { @@ -43,8 +41,8 @@ public class IcebergDLFExternalCatalog extends IcebergExternalCatalog { dlfCatalog.setConf(getConfiguration()); // initialize catalog Map<String, String> catalogProperties = catalogProperty.getHadoopProperties(); - String dlfUid = catalogProperties.get(DataLakeConfig.CATALOG_USER_ID); - dlfCatalog.initialize(dlfUid, catalogProperties); + String catalogName = getName(); + dlfCatalog.initialize(catalogName, catalogProperties); catalog = dlfCatalog; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java index ffe48e68a49..f9f602033c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java @@ -27,7 +27,6 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; -import java.util.List; import java.util.Map; public class IcebergGlueExternalCatalog extends IcebergExternalCatalog { @@ -61,9 +60,4 @@ public class IcebergGlueExternalCatalog extends IcebergExternalCatalog { glueCatalog.initialize(getName(), catalogProperties); catalog = glueCatalog; } - - @Override - protected List<String> listDatabaseNames() { - return metadataOps.listDatabaseNames(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java index b9ffc006c61..c47ff7248d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/DLFCatalog.java @@ -46,7 +46,7 @@ public class DLFCatalog extends HiveCompatibleCatalog { protected TableOperations newTableOps(TableIdentifier tableIdentifier) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); - return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName); + return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.catalogName, dbName, tableName); } protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java index 23b814c13b8..9de0981e980 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java @@ -31,13 +31,14 @@ import java.util.concurrent.TimeUnit; public class DLFCachedClientPool implements ClientPool<IMetaStoreClient, TException> { - private static volatile Cache<String, DLFClientPool> clientPoolCache; - private static final Object clientPoolCacheLock = new Object(); + private Cache<String, DLFClientPool> clientPoolCache; private final Configuration conf; private final String endpoint; private final int clientPoolSize; private final long evictionInterval; + // This cached client pool should belong to the catalog level, + // each catalog has its own pool public DLFCachedClientPool(Configuration conf, Map<String, String> properties) { this.conf = conf; this.endpoint = conf.get("", ""); @@ -63,16 +64,10 @@ public class DLFCachedClientPool implements ClientPool<IMetaStoreClient, TExcept } private void initializeClientPoolCache() { - if (clientPoolCache == null) { - synchronized (clientPoolCacheLock) { - if (clientPoolCache == null) { - clientPoolCache = Caffeine.newBuilder() - .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) - .removalListener((key, value, cause) -> ((DLFClientPool) value).close()) - .build(); - } - } - } + clientPoolCache = Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener((key, value, cause) -> ((DLFClientPool) value).close()) + .build(); } protected DLFClientPool clientPool() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/dlf/client/IcebergDLFExternalCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/dlf/client/IcebergDLFExternalCatalogTest.java new file mode 100644 index 00000000000..bbd39b7b71b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/dlf/client/IcebergDLFExternalCatalogTest.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.dlf.client; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; + +public class IcebergDLFExternalCatalogTest { + @Test + public void testDatabaseList() { + HashMap<String, String> props = new HashMap<>(); + Configuration conf = new Configuration(); + + DLFCachedClientPool cachedClientPool1 = new DLFCachedClientPool(conf, props); + DLFCachedClientPool cachedClientPool2 = new DLFCachedClientPool(conf, props); + DLFClientPool dlfClientPool1 = cachedClientPool1.clientPool(); + DLFClientPool dlfClientPool2 = cachedClientPool2.clientPool(); + // This cache should belong to the catalog level, + // so the object addresses of clients in different pools must be different + Assert.assertNotSame(dlfClientPool1, dlfClientPool2); + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org