This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e1efc1414e 10688: Handling the Join route parsing (#10692)
e1efc1414e is described below

commit e1efc1414e0a9e9b80fef4c79fcc5fde1a590c34
Author: Abhishek Sharma <[email protected]>
AuthorDate: Sun May 7 14:04:29 2023 -0400

    10688: Handling the Join route parsing (#10692)
    
    * Changes for handling join route parsing.
    * Added functionality for detrmining the common broker hosting all the 
tables.
---
 .../java/org/apache/pinot/client/BrokerCache.java  | 16 ++++-
 .../pinot/client/BrokerCacheUpdaterPeriodic.java   |  2 +-
 .../org/apache/pinot/client/BrokerSelector.java    |  4 +-
 .../java/org/apache/pinot/client/Connection.java   | 27 ++++----
 .../client/ControllerBasedBrokerSelector.java      |  4 +-
 .../apache/pinot/client/DynamicBrokerSelector.java | 22 +++----
 .../apache/pinot/client/SimpleBrokerSelector.java  |  2 +-
 .../apache/pinot/client/UpdatableBrokerCache.java  |  2 +-
 .../pinot/client/utils/BrokerSelectorUtils.java    | 72 ++++++++++++++++++++++
 .../apache/pinot/client/PreparedStatementTest.java |  3 +-
 10 files changed, 118 insertions(+), 36 deletions(-)

diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
index 7bc2bc9e6f..530f59958d 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -24,6 +24,7 @@ import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.JdkSslContext;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.BrokerSelectorUtils;
 import org.apache.pinot.client.utils.ConnectionUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -188,9 +190,17 @@ public class BrokerCache {
     _brokerData = getBrokerData(responses);
   }
 
-  public String getBroker(String tableName) {
-    List<String> brokers =
-        (tableName == null) ? _brokerData.getBrokers() : 
_brokerData.getTableToBrokerMap().get(tableName);
+  public String getBroker(String... tableNames) {
+    List<String> brokers = null;
+    if (tableNames != null) {
+       // returning list of common brokers hosting all the tables.
+       brokers = 
BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+           _brokerData.getTableToBrokerMap());
+    }
+
+    if (brokers == null || brokers.isEmpty()) {
+      brokers = _brokerData.getBrokers();
+    }
     return brokers.get(_random.nextInt(brokers.size()));
   }
 
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
index 54d88ba8cd..3c4080bac2 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
@@ -65,7 +65,7 @@ public class BrokerCacheUpdaterPeriodic implements 
UpdatableBrokerCache {
     }
   }
 
-  public String getBroker(String tableName) {
+  public String getBroker(String... tableName) {
     return _brokerCache.getBroker(tableName);
   }
 
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
index 7da5948ea0..6b43da27cd 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
@@ -23,10 +23,10 @@ import java.util.List;
 public interface BrokerSelector {
   /**
    * Returns the broker address in the form host:port
-   * @param table
+   * @param tableNames
    * @return
    */
-  String selectBroker(String table);
+  String selectBroker(String... tableNames);
 
   /**
    * Returns list of all brokers.
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index c236446d3d..612754b1cd 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -25,7 +25,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,8 +117,8 @@ public class Connection {
    */
   public ResultSetGroup execute(@Nullable String tableName, String query)
       throws PinotClientException {
-    tableName = tableName == null ? resolveTableName(query) : tableName;
-    String brokerHostPort = _brokerSelector.selectBroker(tableName);
+    String[] tableNames = (tableName == null) ? resolveTableName(query) : new 
String[]{tableName};
+    String brokerHostPort = _brokerSelector.selectBroker(tableNames);
     if (brokerHostPort == null) {
       throw new PinotClientException("Could not find broker to query for 
table: " + tableName);
     }
@@ -175,22 +176,26 @@ public class Connection {
    */
   public Future<ResultSetGroup> executeAsync(@Nullable String tableName, 
String query)
       throws PinotClientException {
-    tableName = tableName == null ? resolveTableName(query) : tableName;
-    String brokerHostPort = _brokerSelector.selectBroker(tableName);
-    if (brokerHostPort == null) {
-      throw new PinotClientException("Could not find broker to query for 
statement: " + query);
-    }
+    String[] tableNames = (tableName == null) ? resolveTableName(query) : new 
String[]{tableName};
+    String brokerHostPort = _brokerSelector.selectBroker(tableNames);
     return new 
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
   }
 
+  /**
+   * Returns the name of all the tables used in a sql query.
+   *
+   * @return name of all the tables used in a sql query.
+   */
   @Nullable
-  private static String resolveTableName(String query) {
+  private static String[] resolveTableName(String query) {
     try {
-      return 
CalciteSqlCompiler.compileToBrokerRequest(query).querySource.tableName;
+      SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(query);
+      List<String> tableNames = 
CalciteSqlParser.extractTableNamesFromNode(sqlNodeAndOptions.getSqlNode());
+      return tableNames.toArray(new String[0]);
     } catch (Exception e) {
       LOGGER.error("Cannot parse table name from query: {}", query, e);
-      return null;
     }
+    return null;
   }
 
   /**
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
index d49ed48dec..cdc2190454 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
@@ -55,8 +55,8 @@ public class ControllerBasedBrokerSelector implements 
BrokerSelector {
 
 
   @Override
-  public String selectBroker(String table) {
-    return _brokerCache.getBroker(table);
+  public String selectBroker(String... tableNames) {
+    return _brokerCache.getBroker(tableNames);
   }
 
   @Override
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 7c7fcfc8b6..729372ee33 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -21,6 +21,7 @@ package org.apache.pinot.client;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.pinot.client.utils.BrokerSelectorUtils;
 
 
 /**
@@ -86,24 +88,16 @@ public class DynamicBrokerSelector implements 
BrokerSelector, IZkDataListener {
 
   @Nullable
   @Override
-  public String selectBroker(String table) {
-    if (table != null) {
-      String tableName =
-          table.replace(ExternalViewReader.OFFLINE_SUFFIX, 
"").replace(ExternalViewReader.REALTIME_SUFFIX, "");
-      List<String> list = _tableToBrokerListMapRef.get().get(tableName);
-      if (list != null && !list.isEmpty()) {
-        return list.get(RANDOM.nextInt(list.size()));
-      }
-      // In case tableName is formatted as <db>.<table>
-      int idx = tableName.indexOf('.');
-      if (idx > 0) {
-        tableName = tableName.substring(idx + 1);
-      }
-      list = _tableToBrokerListMapRef.get().get(tableName);
+  public String selectBroker(String... tableNames) {
+    if (tableNames != null) {
+      // getting list of brokers hosting all the tables.
+      List<String> list = 
BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+          _tableToBrokerListMapRef.get());
       if (list != null && !list.isEmpty()) {
         return list.get(RANDOM.nextInt(list.size()));
       }
     }
+
     // Return a broker randomly if table is null or no broker is found for the 
specified table.
     List<String> list = _allBrokerListRef.get();
     if (list != null && !list.isEmpty()) {
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
index 51f55cfd1f..547ba94ed3 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
@@ -37,7 +37,7 @@ public class SimpleBrokerSelector implements BrokerSelector {
   }
 
   @Override
-  public String selectBroker(String table) {
+  public String selectBroker(String... tableNames) {
     return _brokerList.get(_random.nextInt(_brokerList.size()));
   }
 
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
index 1e19dff789..b3db7fff4d 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
@@ -38,7 +38,7 @@ public interface UpdatableBrokerCache {
    * @param tableName
    * @return Broker address corresponding to the table
    */
-  String getBroker(String tableName);
+  String getBroker(String... tableName);
 
   /**
    * Returns all the brokers currently in the cache
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
new file mode 100644
index 0000000000..1ca15255cd
--- /dev/null
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.client.ExternalViewReader;
+
+
+public class BrokerSelectorUtils {
+
+  private BrokerSelectorUtils() {
+  }
+
+  /**
+   *
+   * @param tableNames: List of table names.
+   * @param brokerData: map holding data for table hosting on brokers.
+   * @return list of common brokers hosting all the tables.
+   */
+  public static List<String> getTablesCommonBrokers(List<String> tableNames, 
Map<String, List<String>> brokerData) {
+    List<List<String>> tablesBrokersList = new ArrayList<>();
+    for (String name: tableNames) {
+      String tableName = getTableNameWithoutSuffix(name);
+      int idx = tableName.indexOf('.');
+
+      if (brokerData.containsKey(tableName)) {
+        tablesBrokersList.add(brokerData.get(tableName));
+      } else if (idx > 0) {
+        // In case tableName is formatted as <db>.<table>
+        tableName = tableName.substring(idx + 1);
+        tablesBrokersList.add(brokerData.get(tableName));
+      }
+    }
+
+    // return null if tablesBrokersList is empty or contains null
+    if (tablesBrokersList.isEmpty()
+        || tablesBrokersList.stream().anyMatch(Objects::isNull)) {
+      return null;
+    }
+
+    List<String> commonBrokers = tablesBrokersList.get(0);
+    for (int i = 1; i < tablesBrokersList.size(); i++) {
+      commonBrokers.retainAll(tablesBrokersList.get(i));
+    }
+    return commonBrokers;
+  }
+
+  private static String getTableNameWithoutSuffix(String tableName) {
+    return
+        tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
+            replace(ExternalViewReader.REALTIME_SUFFIX, "");
+  }
+}
diff --git 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
index 404b1b279a..f2ba5f5843 100644
--- 
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
+++ 
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
@@ -36,7 +36,8 @@ public class PreparedStatementTest {
   public void testPreparedStatementWithDynamicBroker() {
     // Create a connection with dynamic broker selector.
     BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
-    
Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString())).thenAnswer(i 
-> i.getArgument(0));
+    Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString()))
+        .thenAnswer(i -> i.getArgument(0));
     Connection connection = new Connection(mockBrokerSelector, 
_dummyPinotClientTransport);
 
     PreparedStatement preparedStatement = connection.prepareStatement("SELECT 
foo FROM bar WHERE baz = ?");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to