This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 891e5438de Fix TableDoesNotExistError for hybrid tables in MSE queries
in controller API (#16102)
891e5438de is described below
commit 891e5438de18a53ca59e64aac58cd52de9763460
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Sat Jun 14 02:37:49 2025 +0530
Fix TableDoesNotExistError for hybrid tables in MSE queries in controller
API (#16102)
---
.../api/resources/PinotQueryResource.java | 90 +++++++---------------
.../tests/HybridClusterIntegrationTest.java | 53 +++++++++++++
2 files changed, 81 insertions(+), 62 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index ceaaa8bf07..af3e2eefb0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -68,7 +68,6 @@ import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.parser.utils.ParserUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
@@ -240,31 +239,8 @@ public class PinotQueryResource {
private List<String> getInstanceIds(String query, List<String> tableNames,
String database) {
List<String> instanceIds;
if (!tableNames.isEmpty()) {
- List<TableConfig> tableConfigList = getListTableConfigs(tableNames,
database);
- List<LogicalTableConfig> logicalTableConfigList = null;
- // First check for table configs, if not found, check for logical table
configs.
- if (tableConfigList.size() != tableNames.size()) {
- logicalTableConfigList = getListLogicalTableConfigs(tableNames,
database);
- // If config is not found for all tables, then find the tables that
are not found.
- if ((tableConfigList.size() + logicalTableConfigList.size()) !=
tableNames.size()) {
- Set<String> tableNamesFoundSet = new HashSet<>();
- for (TableConfig tableConfig : tableConfigList) {
- tableNamesFoundSet.add(tableConfig.getTableName());
- }
- for (LogicalTableConfig logicalTableConfig : logicalTableConfigList)
{
- tableNamesFoundSet.add(logicalTableConfig.getTableName());
- }
-
- List<String> tablesNotFound = tableNames.stream().filter(name ->
!tableNamesFoundSet.contains(name))
- .collect(Collectors.toList());
-
- throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
- "Unable to find table in cluster, table does not exist for
tables: " + tablesNotFound);
- }
- }
-
// find the unions of all the broker tenant tags of the queried tables.
- Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList,
logicalTableConfigList);
+ Set<String> brokerTenantsUnion = getBrokerTenants(tableNames, database);
if (brokerTenantsUnion.isEmpty()) {
throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find
broker tenant for tables: " + tableNames);
}
@@ -332,38 +308,44 @@ public class PinotQueryResource {
}
// given a list of tables, returns the list of tableConfigs
- private List<TableConfig> getListTableConfigs(List<String> tableNames,
String database) {
- List<TableConfig> allTableConfigList = new ArrayList<>();
+ private Set<String> getBrokerTenants(List<String> tableNames, String
database) {
+ Set<String> brokerTenants = new HashSet<>(tableNames.size());
+ List<String> tablesNotFound = new ArrayList<>(tableNames.size());
for (String tableName : tableNames) {
+ boolean found = false;
String actualTableName =
_pinotHelixResourceManager.getActualTableName(tableName, database);
- List<TableConfig> tableConfigList = new ArrayList<>();
if (_pinotHelixResourceManager.hasRealtimeTable(actualTableName)) {
-
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName)));
+
brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(actualTableName))
+ .getTenantConfig().getBroker());
+ found = true;
}
if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
-
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
+
brokerTenants.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName))
+ .getTenantConfig().getBroker());
+ found = true;
}
- // If no table configs found for the table, skip it.
- if (!tableConfigList.isEmpty()) {
- allTableConfigList.addAll(tableConfigList);
+
+ if (!found) {
+ actualTableName =
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
+ LogicalTableConfig logicalTableConfig =
+ _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
+ if (logicalTableConfig != null) {
+ brokerTenants.add(logicalTableConfig.getBrokerTenant());
+ found = true;
+ }
}
- }
- return allTableConfigList;
- }
- private List<LogicalTableConfig> getListLogicalTableConfigs(List<String>
tableNames, String database) {
- List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>();
- for (String tableName : tableNames) {
- String actualTableName =
_pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
- LogicalTableConfig logicalTableConfig =
- _pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
- if (logicalTableConfig != null) {
- allLogicalTableConfigList.add(logicalTableConfig);
+ if (!found) {
+ tablesNotFound.add(tableName);
}
}
- return allLogicalTableConfigList;
- }
+ if (!tablesNotFound.isEmpty()) {
+ throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
+ "Unable to find table in cluster, table does not exist for tables: "
+ tablesNotFound);
+ }
+ return brokerTenants;
+ }
private String selectRandomInstanceId(List<String> instanceIds) {
if (instanceIds.isEmpty()) {
@@ -388,22 +370,6 @@ public class PinotQueryResource {
return
brokerInstanceConfigs.map(InstanceConfig::getInstanceName).collect(Collectors.toList());
}
- // return the union of brokerTenants from the tables list.
- private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList,
- @Nullable List<LogicalTableConfig> logicalTableConfigList) {
- Set<String> tableBrokerTenants = new HashSet<>();
- for (TableConfig tableConfig : tableConfigList) {
- tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
- }
-
- if (logicalTableConfigList != null) {
- for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
- tableBrokerTenants.add(logicalTableConfig.getBrokerTenant());
- }
- }
- return tableBrokerTenants;
- }
-
private StreamingOutput sendRequestToBroker(String query, String instanceId,
String traceEnabled, String queryOptions,
HttpHeaders httpHeaders) {
InstanceConfig instanceConfig =
_pinotHelixResourceManager.getHelixInstanceConfig(instanceId);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index e72a054ec5..d60ce7b428 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -26,10 +26,12 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -308,4 +310,55 @@ public class HybridClusterIntegrationTest extends
BaseHybridClusterIntegrationTe
throws Exception {
super.testVirtualColumnQueries();
}
+
+ @Test(dataProvider = "useBothQueryEngines")
+ void testControllerQuerySubmit(boolean useMultiStageQueryEngine)
+ throws Exception {
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ // Hybrid Table
+ @Language("sql")
+ String query = "SELECT count(*) FROM " + getTableName();
+ JsonNode response = postQueryToController(query);
+ assertNoError(response);
+
+ // Offline table
+ String tableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+ query = "SELECT count(*) FROM " + tableName;
+ response = postQueryToController(query);
+ assertNoError(response);
+
+ tableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ query = "SELECT count(*) FROM " + tableName;
+ response = postQueryToController(query);
+ assertNoError(response);
+
+ query = "SELECT count(*) FROM unknown";
+ response = postQueryToController(query);
+ if (useMultiStageQueryEngine) {
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+ } else {
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING)
+ .containsMessage("BrokerResourceMissingError");
+ }
+ }
+
+ @Test
+ void testControllerJoinQuerySubmit()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+ // Hybrid Table
+ @Language("sql")
+ String query = "SELECT count(*) FROM unknown JOIN " + getTableName()
+ + " ON unknown.FlightNum = " + getTableName() + ".FlightNum";
+ JsonNode response = postQueryToController(query);
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+
+ query = "SELECT count(*) FROM unknown_1 JOIN unknown_2 ON "
+ + "unknown_1.FlightNum = unknown_2.FlightNum";
+ response = postQueryToController(query);
+
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
+ .containsMessage("TableDoesNotExistError");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]