This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f618cf3cc4 Improve database handling in multi-stage engine (#14040)
f618cf3cc4 is described below
commit f618cf3cc412e91df19d95c09faf2fcd58c1e84e
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Sep 23 08:51:09 2024 -0700
Improve database handling in multi-stage engine (#14040)
---
.../MultiStageBrokerRequestHandler.java | 3 --
.../apache/pinot/common/utils/DatabaseUtils.java | 14 +++++---
.../rel/rules/PinotRelDistributionTraitRule.java | 23 +++++++------
.../org/apache/pinot/query/QueryEnvironment.java | 2 +-
.../apache/pinot/query/catalog/PinotCatalog.java | 7 +---
.../planner/logical/RelToPlanNodeConverter.java | 38 ++++++++++------------
.../query/runtime/queries/QueryRunnerTest.java | 3 +-
7 files changed, 44 insertions(+), 46 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d5791d1661..b683bbef0a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -54,7 +54,6 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.query.QueryEnvironment;
-import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
@@ -83,7 +82,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final WorkerManager _workerManager;
private final QueryDispatcher _queryDispatcher;
- private final PinotCatalog _catalog;
public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache) {
@@ -92,7 +90,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(hostname, port, _routingManager);
_queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port,
config));
- _catalog = new PinotCatalog(tableCache);
LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port:
{} with broker id: {}, timeout: {}ms, "
+ "query log max length: {}, query log max rate: {}", hostname,
port, _brokerId, _brokerTimeoutMs,
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit());
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
index 3691e0063c..e61bbbe4d2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DatabaseUtils.java
@@ -32,6 +32,13 @@ public class DatabaseUtils {
private DatabaseUtils() {
}
+ /**
+ * Returns the fully qualified table name. Do not prefix the database name
if it is the default database.
+ */
+ public static String constructFullyQualifiedTableName(String databaseName,
String tableName) {
+ return databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE) ?
tableName : databaseName + "." + tableName;
+ }
+
/**
* Construct the fully qualified table name i.e. {databaseName}.{tableName}
from given table name and database name
* @param tableName table/schema name
@@ -48,11 +55,8 @@ public class DatabaseUtils {
String[] tableSplit = StringUtils.split(tableName, '.');
switch (tableSplit.length) {
case 1:
- // do not concat the database name prefix if it's a 'default' database
- if (StringUtils.isNotEmpty(databaseName) &&
!databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
- return databaseName + "." + tableName;
- }
- return tableName;
+ return StringUtils.isEmpty(databaseName) ? tableName
+ : constructFullyQualifiedTableName(databaseName, tableName);
case 2:
Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table
name '%s'", tableName);
String databasePrefix = tableSplit[0];
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
index 8fbd8da202..f2aa72da84 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.calcite.rel.rules;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
@@ -30,10 +31,12 @@ import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.util.mapping.IntPair;
import org.apache.calcite.util.mapping.Mapping;
@@ -43,6 +46,7 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
+import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +65,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule
{
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRelDistributionTraitRule.class);
public PinotRelDistributionTraitRule(RelBuilderFactory factory) {
- super(operand(RelNode.class, any()));
+ super(operand(RelNode.class, any()), factory, null);
}
@Override
@@ -74,8 +78,7 @@ public class PinotRelDistributionTraitRule extends RelOptRule
{
RelNode current = call.rel(0);
List<RelNode> inputs = current.getInputs();
RelDistribution relDistribution;
-
- if (inputs == null || inputs.size() == 0) {
+ if (inputs == null || inputs.isEmpty()) {
relDistribution = computeCurrentDistribution(current);
} else {
// if there's input to the current node, attempt to derive the
RelDistribution.
@@ -167,15 +170,17 @@ public class PinotRelDistributionTraitRule extends
RelOptRule {
private static RelDistribution computeCurrentDistribution(RelNode node) {
if (node instanceof Exchange) {
return ((Exchange) node).getDistribution();
- } else if (node instanceof LogicalTableScan) {
- LogicalTableScan tableScan = (LogicalTableScan) node;
+ } else if (node instanceof TableScan) {
+ TableScan tableScan = (TableScan) node;
// convert table scan hints into rel trait
String partitionKey =
PinotHintStrategyTable.getHintOption(tableScan.getHints(),
PinotHintOptions.TABLE_HINT_OPTIONS,
PinotHintOptions.TableHintOptions.PARTITION_KEY);
if (partitionKey != null) {
- int partitionIndex = tableScan.getRowType().getField(partitionKey,
true, true).getIndex();
- return RelDistributions.hash(ImmutableList.of(partitionIndex));
+ RelDataTypeField field = tableScan.getRowType().getField(partitionKey,
true, true);
+ Preconditions.checkState(field != null, "Failed to find partition key:
%s in table: %s", partitionKey,
+ RelToPlanNodeConverter.getTableNameFromTableScan(tableScan));
+ return RelDistributions.hash(List.of(field.getIndex()));
} else {
return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED,
RelDistributions.EMPTY);
}
@@ -183,9 +188,7 @@ public class PinotRelDistributionTraitRule extends
RelOptRule {
PinotLogicalAggregate agg = (PinotLogicalAggregate) node;
AggregateNode.AggType aggType = agg.getAggType();
if (aggType == AggregateNode.AggType.FINAL || aggType ==
AggregateNode.AggType.DIRECT) {
- List<Integer> groupSetIndices = new ArrayList<>();
- agg.getGroupSet().forEach(groupSetIndices::add);
- return RelDistributions.hash(groupSetIndices);
+ return RelDistributions.hash(agg.getGroupSet().asList());
} else {
return RelDistributions.of(RelDistribution.Type.RANDOM_DISTRIBUTED,
RelDistributions.EMPTY);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index cdcfc2f173..50daaa74e7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -101,7 +101,7 @@ public class QueryEnvironment {
private final WorkerManager _workerManager;
public QueryEnvironment(String database, TableCache tableCache, @Nullable
WorkerManager workerManager) {
- PinotCatalog catalog = new PinotCatalog(database, tableCache);
+ PinotCatalog catalog = new PinotCatalog(tableCache, database);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false,
database, catalog);
_config =
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
index 7a364b56af..47db3df30a 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
@@ -53,12 +53,7 @@ public class PinotCatalog implements Schema {
* PinotCatalog needs have access to the actual {@link TableCache} object
because TableCache hosts the actual
* table available for query and processes table/segment metadata updates
when cluster status changes.
*/
- public PinotCatalog(TableCache tableCache) {
- _tableCache = tableCache;
- _databaseName = null;
- }
-
- public PinotCatalog(String databaseName, TableCache tableCache) {
+ public PinotCatalog(TableCache tableCache, String databaseName) {
_tableCache = tableCache;
_databaseName = databaseName;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index 5bd2ed3705..f161b3a253 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -19,10 +19,11 @@
package org.apache.pinot.query.planner.logical;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
@@ -31,6 +32,7 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -239,13 +241,7 @@ public final class RelToPlanNodeConverter {
}
private TableScanNode convertLogicalTableScan(LogicalTableScan node) {
- String tableName;
- List<String> qualifiedName = node.getTable().getQualifiedName();
- if (qualifiedName.size() == 1) {
- tableName = qualifiedName.get(0);
- } else {
- tableName = DatabaseUtils.translateTableName(qualifiedName.get(1),
qualifiedName.get(0));
- }
+ String tableName = getTableNameFromTableScan(node);
List<RelDataTypeField> fields = node.getRowType().getFieldList();
List<String> columns = new ArrayList<>(fields.size());
for (RelDataTypeField field : fields) {
@@ -369,20 +365,22 @@ public final class RelToPlanNodeConverter {
}
}
+ public static String getTableNameFromTableScan(TableScan tableScan) {
+ return getTableNameFromRelTable(tableScan.getTable());
+ }
+
public static Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
- Set<String> tableNames = new HashSet<>();
- List<String> qualifiedTableNames =
RelOptUtil.findAllTableQualifiedNames(relRoot);
- for (String qualifiedTableName : qualifiedTableNames) {
- // Calcite encloses table and schema names in square brackets to
properly quote and delimit them in SQL
- // statements, particularly to handle cases when they contain special
characters or reserved keywords.
- String tableName = qualifiedTableName.replaceAll("^\\[(.*)\\]$", "$1");
- String[] split = tableName.split(", ");
- if (split.length == 1) {
- tableNames.add(tableName);
- } else {
- tableNames.add(DatabaseUtils.translateTableName(split[1], split[0]));
- }
+ List<RelOptTable> tables = RelOptUtil.findAllTables(relRoot);
+ Set<String> tableNames = Sets.newHashSetWithExpectedSize(tables.size());
+ for (RelOptTable table : tables) {
+ tableNames.add(getTableNameFromRelTable(table));
}
return tableNames;
}
+
+ public static String getTableNameFromRelTable(RelOptTable table) {
+ List<String> qualifiedName = table.getQualifiedName();
+ return qualifiedName.size() == 1 ? qualifiedName.get(0)
+ : DatabaseUtils.constructFullyQualifiedTableName(qualifiedName.get(0),
qualifiedName.get(1));
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
index 271c5c6297..76802c7b77 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTest.java
@@ -282,7 +282,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
+ "GROUP BY a.col2, b.col2",
1
},
- new Object[]{"SELECT * FROM \"default.tbl-escape-naming\"", 5}
+ new Object[]{"SELECT * FROM default.\"tbl-escape-naming\"", 5},
+ new Object[]{"SELECT * FROM \"default\".\"tbl-escape-naming\"", 5}
};
//@formatter:on
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]