This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1d7b63102ebc8974e8133c964917ea8052148088
Author: hexianqing <[email protected]>
AuthorDate: Mon Sep 26 19:18:24 2022 +0800

    IMPALA-11558: Ensure one Kudu client created (FE) for the specified Kudu 
master addresses
    
    Creating Kudu clients is very expensive as each will fetch
    metadata from the Kudu master, so we have to ensure only one
    Kudu client created for a given Kudu master address.
    
    The solution is to ensure KuduUtil.getKuduClient created only
    one KuduClient for the specified Kudu master addresses by using
    'computeIfAbsent' of the ConcurrentHashMap.
    
    Testing:
    - Manually ran a stress test: scan of a Kudu table, 1000
      concurrent queries and verified the untracked memory
    - Added concurrent tests for KuduUtil.getKuduClient
    - Ran the full set of verifications in Impala Public Jenkins
    
    Change-Id: I1003556d3afc8e8216142cac4007a4c99046caeb
    Reviewed-on: http://gerrit.cloudera.org:8080/19046
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/19129
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 .../main/java/org/apache/impala/util/KuduUtil.java | 13 ++--
 .../java/org/apache/impala/util/KuduUtilTest.java  | 80 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 7037c921a..dcaa4da09 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -87,16 +87,14 @@ public class KuduUtil {
    * fetching tablet metadata.
    */
   public static KuduClient getKuduClient(String kuduMasters) {
-    KuduClient client = kuduClients_.get(kuduMasters);
-    if (client == null) {
+    KuduClient client = kuduClients_.computeIfAbsent(kuduMasters, k -> {
       KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
       
b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
       
b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
       b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
       b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName());
-      client = b.build();
-      kuduClients_.put(kuduMasters, client);
-    }
+      return b.build();
+    });
     return client;
   }
 
@@ -479,4 +477,9 @@ public class KuduUtil {
     kuduPartitionExpr.analyze(analyzer);
     return kuduPartitionExpr;
   }
+
+  // Used for test assertions
+  public static int getkuduClientsSize() {
+    return kuduClients_.size();
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java
new file mode 100644
index 000000000..192d0db3b
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+import static org.junit.Assert.*;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.List;
+
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.apache.kudu.client.KuduClient;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Unit tests for KuduUtil functions.
+ */
+public class KuduUtilTest {
+  private static TBackendGflags origFlags;
+
+  @BeforeClass
+  public static void setup() {
+    // The original BackendConfig need to be mocked, we are saving the values 
here, so
+    // they can be restored and not break other tests
+    if (BackendConfig.INSTANCE == null) {
+      BackendConfig.create(new TBackendGflags());
+    }
+    origFlags = BackendConfig.INSTANCE.getBackendCfg();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    BackendConfig.create(origFlags);
+  }
+
+  @Test
+  public void testGetKuduClient() {
+    int size = KuduUtil.getkuduClientsSize();
+    int concurrent = 5;
+    CountDownLatch latch = new CountDownLatch(concurrent);
+    List<KuduClient> clients = new CopyOnWriteArrayList<>();
+    for (int i = 0; i < concurrent; i++) {
+      new Thread() {
+        public void run() {
+          KuduClient client = KuduUtil.getKuduClient("master0");
+          clients.add(client);
+          latch.countDown();
+        }
+      }.start();
+    }
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    for (int i = 1; i < concurrent; i++) {
+      assertSame(clients.get(0), clients.get(i));
+    }
+    KuduUtil.getKuduClient("master1");
+    assertEquals(size + 2, KuduUtil.getkuduClientsSize());
+  }
+}
\ No newline at end of file

Reply via email to