This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e1efc1414e 10688: Handling the Join route parsing (#10692)
e1efc1414e is described below
commit e1efc1414e0a9e9b80fef4c79fcc5fde1a590c34
Author: Abhishek Sharma <[email protected]>
AuthorDate: Sun May 7 14:04:29 2023 -0400
10688: Handling the Join route parsing (#10692)
* Changes for handling join route parsing.
* Added functionality for detrmining the common broker hosting all the
tables.
---
.../java/org/apache/pinot/client/BrokerCache.java | 16 ++++-
.../pinot/client/BrokerCacheUpdaterPeriodic.java | 2 +-
.../org/apache/pinot/client/BrokerSelector.java | 4 +-
.../java/org/apache/pinot/client/Connection.java | 27 ++++----
.../client/ControllerBasedBrokerSelector.java | 4 +-
.../apache/pinot/client/DynamicBrokerSelector.java | 22 +++----
.../apache/pinot/client/SimpleBrokerSelector.java | 2 +-
.../apache/pinot/client/UpdatableBrokerCache.java | 2 +-
.../pinot/client/utils/BrokerSelectorUtils.java | 72 ++++++++++++++++++++++
.../apache/pinot/client/PreparedStatementTest.java | 3 +-
10 files changed, 118 insertions(+), 36 deletions(-)
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
index 7bc2bc9e6f..530f59958d 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCache.java
@@ -24,6 +24,7 @@ import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -34,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
+import org.apache.pinot.client.utils.BrokerSelectorUtils;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -188,9 +190,17 @@ public class BrokerCache {
_brokerData = getBrokerData(responses);
}
- public String getBroker(String tableName) {
- List<String> brokers =
- (tableName == null) ? _brokerData.getBrokers() :
_brokerData.getTableToBrokerMap().get(tableName);
+ public String getBroker(String... tableNames) {
+ List<String> brokers = null;
+ if (tableNames != null) {
+ // returning list of common brokers hosting all the tables.
+ brokers =
BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+ _brokerData.getTableToBrokerMap());
+ }
+
+ if (brokers == null || brokers.isEmpty()) {
+ brokers = _brokerData.getBrokers();
+ }
return brokers.get(_random.nextInt(brokers.size()));
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
index 54d88ba8cd..3c4080bac2 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerCacheUpdaterPeriodic.java
@@ -65,7 +65,7 @@ public class BrokerCacheUpdaterPeriodic implements
UpdatableBrokerCache {
}
}
- public String getBroker(String tableName) {
+ public String getBroker(String... tableName) {
return _brokerCache.getBroker(tableName);
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
index 7da5948ea0..6b43da27cd 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java
@@ -23,10 +23,10 @@ import java.util.List;
public interface BrokerSelector {
/**
* Returns the broker address in the form host:port
- * @param table
+ * @param tableNames
* @return
*/
- String selectBroker(String table);
+ String selectBroker(String... tableNames);
/**
* Returns list of all brokers.
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
index c236446d3d..612754b1cd 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java
@@ -25,7 +25,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
-import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,8 +117,8 @@ public class Connection {
*/
public ResultSetGroup execute(@Nullable String tableName, String query)
throws PinotClientException {
- tableName = tableName == null ? resolveTableName(query) : tableName;
- String brokerHostPort = _brokerSelector.selectBroker(tableName);
+ String[] tableNames = (tableName == null) ? resolveTableName(query) : new
String[]{tableName};
+ String brokerHostPort = _brokerSelector.selectBroker(tableNames);
if (brokerHostPort == null) {
throw new PinotClientException("Could not find broker to query for
table: " + tableName);
}
@@ -175,22 +176,26 @@ public class Connection {
*/
public Future<ResultSetGroup> executeAsync(@Nullable String tableName,
String query)
throws PinotClientException {
- tableName = tableName == null ? resolveTableName(query) : tableName;
- String brokerHostPort = _brokerSelector.selectBroker(tableName);
- if (brokerHostPort == null) {
- throw new PinotClientException("Could not find broker to query for
statement: " + query);
- }
+ String[] tableNames = (tableName == null) ? resolveTableName(query) : new
String[]{tableName};
+ String brokerHostPort = _brokerSelector.selectBroker(tableNames);
return new
ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
}
+ /**
+ * Returns the name of all the tables used in a sql query.
+ *
+ * @return name of all the tables used in a sql query.
+ */
@Nullable
- private static String resolveTableName(String query) {
+ private static String[] resolveTableName(String query) {
try {
- return
CalciteSqlCompiler.compileToBrokerRequest(query).querySource.tableName;
+ SqlNodeAndOptions sqlNodeAndOptions =
CalciteSqlParser.compileToSqlNodeAndOptions(query);
+ List<String> tableNames =
CalciteSqlParser.extractTableNamesFromNode(sqlNodeAndOptions.getSqlNode());
+ return tableNames.toArray(new String[0]);
} catch (Exception e) {
LOGGER.error("Cannot parse table name from query: {}", query, e);
- return null;
}
+ return null;
}
/**
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
index d49ed48dec..cdc2190454 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ControllerBasedBrokerSelector.java
@@ -55,8 +55,8 @@ public class ControllerBasedBrokerSelector implements
BrokerSelector {
@Override
- public String selectBroker(String table) {
- return _brokerCache.getBroker(table);
+ public String selectBroker(String... tableNames) {
+ return _brokerCache.getBroker(tableNames);
}
@Override
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
index 7c7fcfc8b6..729372ee33 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java
@@ -21,6 +21,7 @@ package org.apache.pinot.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@ import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
+import org.apache.pinot.client.utils.BrokerSelectorUtils;
/**
@@ -86,24 +88,16 @@ public class DynamicBrokerSelector implements
BrokerSelector, IZkDataListener {
@Nullable
@Override
- public String selectBroker(String table) {
- if (table != null) {
- String tableName =
- table.replace(ExternalViewReader.OFFLINE_SUFFIX,
"").replace(ExternalViewReader.REALTIME_SUFFIX, "");
- List<String> list = _tableToBrokerListMapRef.get().get(tableName);
- if (list != null && !list.isEmpty()) {
- return list.get(RANDOM.nextInt(list.size()));
- }
- // In case tableName is formatted as <db>.<table>
- int idx = tableName.indexOf('.');
- if (idx > 0) {
- tableName = tableName.substring(idx + 1);
- }
- list = _tableToBrokerListMapRef.get().get(tableName);
+ public String selectBroker(String... tableNames) {
+ if (tableNames != null) {
+ // getting list of brokers hosting all the tables.
+ List<String> list =
BrokerSelectorUtils.getTablesCommonBrokers(Arrays.asList(tableNames),
+ _tableToBrokerListMapRef.get());
if (list != null && !list.isEmpty()) {
return list.get(RANDOM.nextInt(list.size()));
}
}
+
// Return a broker randomly if table is null or no broker is found for the
specified table.
List<String> list = _allBrokerListRef.get();
if (list != null && !list.isEmpty()) {
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
index 51f55cfd1f..547ba94ed3 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java
@@ -37,7 +37,7 @@ public class SimpleBrokerSelector implements BrokerSelector {
}
@Override
- public String selectBroker(String table) {
+ public String selectBroker(String... tableNames) {
return _brokerList.get(_random.nextInt(_brokerList.size()));
}
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
index 1e19dff789..b3db7fff4d 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/UpdatableBrokerCache.java
@@ -38,7 +38,7 @@ public interface UpdatableBrokerCache {
* @param tableName
* @return Broker address corresponding to the table
*/
- String getBroker(String tableName);
+ String getBroker(String... tableName);
/**
* Returns all the brokers currently in the cache
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
new file mode 100644
index 0000000000..1ca15255cd
--- /dev/null
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/utils/BrokerSelectorUtils.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.client.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.client.ExternalViewReader;
+
+
+public class BrokerSelectorUtils {
+
+ private BrokerSelectorUtils() {
+ }
+
+ /**
+ *
+ * @param tableNames: List of table names.
+ * @param brokerData: map holding data for table hosting on brokers.
+ * @return list of common brokers hosting all the tables.
+ */
+ public static List<String> getTablesCommonBrokers(List<String> tableNames,
Map<String, List<String>> brokerData) {
+ List<List<String>> tablesBrokersList = new ArrayList<>();
+ for (String name: tableNames) {
+ String tableName = getTableNameWithoutSuffix(name);
+ int idx = tableName.indexOf('.');
+
+ if (brokerData.containsKey(tableName)) {
+ tablesBrokersList.add(brokerData.get(tableName));
+ } else if (idx > 0) {
+ // In case tableName is formatted as <db>.<table>
+ tableName = tableName.substring(idx + 1);
+ tablesBrokersList.add(brokerData.get(tableName));
+ }
+ }
+
+ // return null if tablesBrokersList is empty or contains null
+ if (tablesBrokersList.isEmpty()
+ || tablesBrokersList.stream().anyMatch(Objects::isNull)) {
+ return null;
+ }
+
+ List<String> commonBrokers = tablesBrokersList.get(0);
+ for (int i = 1; i < tablesBrokersList.size(); i++) {
+ commonBrokers.retainAll(tablesBrokersList.get(i));
+ }
+ return commonBrokers;
+ }
+
+ private static String getTableNameWithoutSuffix(String tableName) {
+ return
+ tableName.replace(ExternalViewReader.OFFLINE_SUFFIX, "").
+ replace(ExternalViewReader.REALTIME_SUFFIX, "");
+ }
+}
diff --git
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
index 404b1b279a..f2ba5f5843 100644
---
a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
+++
b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/PreparedStatementTest.java
@@ -36,7 +36,8 @@ public class PreparedStatementTest {
public void testPreparedStatementWithDynamicBroker() {
// Create a connection with dynamic broker selector.
BrokerSelector mockBrokerSelector = Mockito.mock(BrokerSelector.class);
-
Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString())).thenAnswer(i
-> i.getArgument(0));
+ Mockito.when(mockBrokerSelector.selectBroker(Mockito.anyString()))
+ .thenAnswer(i -> i.getArgument(0));
Connection connection = new Connection(mockBrokerSelector,
_dummyPinotClientTransport);
PreparedStatement preparedStatement = connection.prepareStatement("SELECT
foo FROM bar WHERE baz = ?");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]