This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new fe5380ec9 IMPALA-11558: Ensure one Kudu client created (FE) for the
specified Kudu master addresses
fe5380ec9 is described below
commit fe5380ec925ba0aa88f5754f460e30d2f68999b9
Author: hexianqing <[email protected]>
AuthorDate: Mon Sep 26 19:18:24 2022 +0800
IMPALA-11558: Ensure one Kudu client created (FE) for the specified Kudu
master addresses
Creating Kudu clients is very expensive as each will fetch
metadata from the Kudu master, so we have to ensure only one
Kudu client created for a given Kudu master address.
The solution is to ensure KuduUtil.getKuduClient created only
one KuduClient for the specified Kudu master addresses by using
'computeIfAbsent' of the ConcurrentHashMap.
Testing:
- Manually ran a stress test: scan of a Kudu table, 1000
concurrent queries and verified the untracked memory
- Added concurrent tests for KuduUtil.getKuduClient
- Ran the full set of verifications in Impala Public Jenkins
Change-Id: I1003556d3afc8e8216142cac4007a4c99046caeb
Reviewed-on: http://gerrit.cloudera.org:8080/19046
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../main/java/org/apache/impala/util/KuduUtil.java | 13 ++--
.../java/org/apache/impala/util/KuduUtilTest.java | 80 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 5 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 7037c921a..dcaa4da09 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -87,16 +87,14 @@ public class KuduUtil {
* fetching tablet metadata.
*/
public static KuduClient getKuduClient(String kuduMasters) {
- KuduClient client = kuduClients_.get(kuduMasters);
- if (client == null) {
+ KuduClient client = kuduClients_.computeIfAbsent(kuduMasters, k -> {
KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName());
- client = b.build();
- kuduClients_.put(kuduMasters, client);
- }
+ return b.build();
+ });
return client;
}
@@ -479,4 +477,9 @@ public class KuduUtil {
kuduPartitionExpr.analyze(analyzer);
return kuduPartitionExpr;
}
+
+ // Used for test assertions
+ public static int getkuduClientsSize() {
+ return kuduClients_.size();
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java
b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java
new file mode 100644
index 000000000..192d0db3b
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+import static org.junit.Assert.*;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.List;
+
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.apache.kudu.client.KuduClient;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Unit tests for KuduUtil functions.
+ */
+public class KuduUtilTest {
+ private static TBackendGflags origFlags;
+
+ @BeforeClass
+ public static void setup() {
+ // The original BackendConfig need to be mocked, we are saving the values
here, so
+ // they can be restored and not break other tests
+ if (BackendConfig.INSTANCE == null) {
+ BackendConfig.create(new TBackendGflags());
+ }
+ origFlags = BackendConfig.INSTANCE.getBackendCfg();
+ }
+
+ @AfterClass
+ public static void teardown() {
+ BackendConfig.create(origFlags);
+ }
+
+ @Test
+ public void testGetKuduClient() {
+ int size = KuduUtil.getkuduClientsSize();
+ int concurrent = 5;
+ CountDownLatch latch = new CountDownLatch(concurrent);
+ List<KuduClient> clients = new CopyOnWriteArrayList<>();
+ for (int i = 0; i < concurrent; i++) {
+ new Thread() {
+ public void run() {
+ KuduClient client = KuduUtil.getKuduClient("master0");
+ clients.add(client);
+ latch.countDown();
+ }
+ }.start();
+ }
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ for (int i = 1; i < concurrent; i++) {
+ assertSame(clients.get(0), clients.get(i));
+ }
+ KuduUtil.getKuduClient("master1");
+ assertEquals(size + 2, KuduUtil.getkuduClientsSize());
+ }
+}
\ No newline at end of file