This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 835949b645 Automatically detect whether a v1 query could have run on
the v2 query engine (#13628)
835949b645 is described below
commit 835949b64529b8e5c0bb49474cdec2e17c9050f7
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Jul 30 15:20:22 2024 +0530
Automatically detect whether a v1 query could have run on the v2 query
engine (#13628)
---
.../BaseSingleStageBrokerRequestHandler.java | 59 +++++++++
.../requesthandler/GrpcBrokerRequestHandler.java | 2 +
.../SingleConnectionBrokerRequestHandler.java | 2 +
.../apache/pinot/common/metrics/BrokerMeter.java | 12 +-
.../tests/JmxMetricsIntegrationTest.java | 145 +++++++++++++++++++++
.../org/apache/pinot/query/QueryEnvironment.java | 21 +++
.../pinot/query/parser/utils/ParserUtils.java | 16 +--
.../apache/pinot/spi/utils/CommonConstants.java | 8 ++
8 files changed, 254 insertions(+), 11 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 27e5c19cac..6a839d5e36 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -29,9 +29,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -138,6 +142,9 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
protected final Map<Long, QueryServers> _queriesById;
+ protected final boolean _enableMultistageMigrationMetric;
+ protected ExecutorService _multistageCompileExecutor;
+ protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String
brokerId,
BrokerRoutingManager routingManager, AccessControlFactory
accessControlFactory,
@@ -155,12 +162,53 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
+
+ _enableMultistageMigrationMetric =
_config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
+ Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
+ if (_enableMultistageMigrationMetric) {
+ _multistageCompileExecutor = Executors.newSingleThreadExecutor();
+ _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
+ }
+
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query
response limit: {}, query log max length: {}, "
+ "query log max rate: {}, query cancellation enabled: {}",
getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit,
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
}
+ @Override
+ public void start() {
+ if (_enableMultistageMigrationMetric) {
+ _multistageCompileExecutor.submit(() -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ Pair<String, String> query;
+ try {
+ query = _multistageCompileQueryQueue.take();
+ } catch (InterruptedException e) {
+ // Exit gracefully when the thread is interrupted, presumably when
this single thread executor is shutdown.
+ // Since this task is all that this single thread is doing,
there's no need to preserve the thread's
+ // interrupt status flag.
+ return;
+ }
+ String queryString = query.getLeft();
+ String database = query.getRight();
+
+ // Check if the query is a v2 supported query
+ if (!ParserUtils.canCompileWithMultiStageEngine(queryString,
database, _tableCache)) {
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE,
1);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void shutDown() {
+ if (_enableMultistageMigrationMetric) {
+ _multistageCompileExecutor.shutdownNow();
+ }
+ }
+
@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not
enabled on broker");
@@ -478,8 +526,19 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
}
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES,
1);
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
_brokerMetrics.addValueToTableGauge(rawTableName,
BrokerGauge.REQUEST_SIZE, query.length());
+ if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
+ // Check if the query is a v2 supported query
+ String database =
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(),
httpHeaders);
+ // Attempt to add the query to the compile queue; drop if queue is full
+ if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
+ LOGGER.trace("Not compiling query `{}` using the multi-stage query
engine because the query queue is full",
+ query);
+ }
+ }
+
// Prepare OFFLINE and REALTIME requests
BrokerRequest offlineBrokerRequest = null;
BrokerRequest realtimeBrokerRequest = null;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 96e06ffec5..0484476a41 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -63,10 +63,12 @@ public class GrpcBrokerRequestHandler extends
BaseSingleStageBrokerRequestHandle
@Override
public void start() {
+ super.start();
}
@Override
public void shutDown() {
+ super.shutDown();
_streamingQueryClient.shutdown();
_streamingReduceService.shutDown();
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index a51634c983..349affe6c8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -82,12 +82,14 @@ public class SingleConnectionBrokerRequestHandler extends
BaseSingleStageBrokerR
@Override
public void start() {
+ super.start();
_failureDetector.register(this);
_failureDetector.start();
}
@Override
public void shutDown() {
+ super.shutDown();
_failureDetector.stop();
_queryRouter.shutDown();
_brokerReduceService.shutDown();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index d6b7af9840..086d5ffd9e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -35,6 +35,13 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* At this moment this counter does not include queries executed in
multi-stage mode.
*/
QUERIES("queries", false),
+ /**
+ * Number of single-stage queries that have been started.
+ * <p>
+ * Unlike {@link #QUERIES}, this metric is global and not attached to a
particular table.
+ * That means it can be used to know how many single-stage queries have been
started in total.
+ */
+ QUERIES_GLOBAL("queries", true),
/**
* Number of multi-stage queries that have been started.
* <p>
@@ -49,7 +56,10 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* sum of this metric across all tables should be greater or equal than
{@link #MULTI_STAGE_QUERIES_GLOBAL}.
*/
MULTI_STAGE_QUERIES("queries", false),
-
+ /**
+ * Number of single-stage queries executed that would not have successfully
run on the multi-stage query engine as is.
+ */
+ SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
// These metrics track the exceptions caught during query execution in
broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
new file mode 100644
index 0000000000..2e1873d3f8
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests that verify JMX metrics emitted by various Pinot components.
+ */
+public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet {
+
+ private static final int NUM_BROKERS = 1;
+ private static final int NUM_SERVERS = 1;
+
+ private static final MBeanServer MBEAN_SERVER =
ManagementFactory.getPlatformMBeanServer();
+ private static final String PINOT_JMX_METRICS_DOMAIN =
"\"org.apache.pinot.common.metrics\"";
+ private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\"";
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBrokers(NUM_BROKERS);
+ startServers(NUM_SERVERS);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createOfflineTableConfig();
+ addTableConfig(tableConfig);
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Test
+ public void testMultiStageMigrationMetric() throws Exception {
+ ObjectName multiStageMigrationMetric = new
ObjectName(PINOT_JMX_METRICS_DOMAIN,
+ new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
+ "name", "\"pinot.broker.singleStageQueriesInvalidMultiStage\"")));
+
+ ObjectName queriesGlobalMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN,
+ new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
+ "name", "\"pinot.broker.queriesGlobal\"")));
+
+ // Some queries are run during setup to ensure that all the docs are loaded
+ long initialQueryCount = (Long)
MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count");
+ assertTrue(initialQueryCount > 0L);
+ assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric,
"Count"), 0L);
+
+ postQuery("SELECT COUNT(*) FROM mytable");
+
+ // Run some queries that are known to not work as is with the multi-stage
query engine
+
+ // Type differences
+ // STRING is VARCHAR in v2
+ JsonNode response = postQuery("SELECT CAST(ArrTime AS STRING) FROM
mytable");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+ // LONG is BIGINT in v2
+ response = postQuery("SELECT CAST(ArrTime AS LONG) FROM mytable");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+ // FLOAT_ARRAY is FLOAT ARRAY in v2
+ response = postQuery("SELECT CAST(DivAirportIDs AS FLOAT_ARRAY) FROM
mytable");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+ // MV column requires ARRAY_TO_MV wrapper to be used in filter predicates
+ response = postQuery("SELECT COUNT(*) FROM mytable WHERE DivAirports =
'JFK'");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+ // Unsupported function
+ response = postQuery("SELECT AirlineID, count(*) FROM mytable WHERE
IN_SUBQUERY(airlineID, 'SELECT "
+ + "ID_SET(AirlineID) FROM mytable WHERE Carrier = ''AA''') = 1 GROUP
BY AirlineID;");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+ // Repeated columns in an ORDER BY query
+ response = postQuery("SELECT AirTime, AirTime FROM mytable ORDER BY
AirTime");
+ assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+ assertEquals((Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric,
"Count"), initialQueryCount + 8L);
+
+ AtomicLong multiStageMigrationMetricValue = new AtomicLong();
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ multiStageMigrationMetricValue.set((Long)
MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"));
+ return multiStageMigrationMetricValue.get() == 6L;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 5000, "Expected value of MBean
'pinot.broker.singleStageQueriesInvalidMultiStage' to be: "
+ + 6L + "; actual value: " + multiStageMigrationMetricValue.get());
+
+ assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric,
"Count"), 6L);
+ }
+
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
"true");
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 1a54eec610..cdcfc2f173 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -71,6 +71,8 @@ import org.apache.pinot.query.validate.BytesCastVisitor;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -79,6 +81,7 @@ import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
* <p>It provide the higher level entry interface to convert a SQL string into
a {@link DispatchableSubPlan}.
*/
public class QueryEnvironment {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryEnvironment.class);
private static final CalciteConnectionConfig CONNECTION_CONFIG;
static {
@@ -200,6 +203,24 @@ public class QueryEnvironment {
}
}
+ /**
+ * Returns whether the query can be successfully compiled in this query
environment
+ */
+ public boolean canCompileQuery(String query) {
+ try (PlannerContext plannerContext = getPlannerContext()) {
+ SqlNode sqlNode =
CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
+ if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
+ sqlNode = ((SqlExplain) sqlNode).getExplicandum();
+ }
+ compileQuery(sqlNode, plannerContext);
+ LOGGER.debug("Successfully compiled query using the multi-stage query
engine: `{}`", query);
+ return true;
+ } catch (Exception e) {
+ LOGGER.warn("Encountered an error while compiling query `{}` using the
multi-stage query engine", query, e);
+ return false;
+ }
+ }
+
/**
* Results of planning a query
*/
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
index f914e6ba8d..cbee4e7117 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
@@ -35,15 +35,11 @@ public class ParserUtils {
*/
public static boolean canCompileWithMultiStageEngine(String query, String
database, TableCache tableCache) {
// try to parse and compile the query with the Calcite planner used by the
multi-stage query engine
- try {
- LOGGER.info("Trying to compile query `{}` using the multi-stage query
engine", query);
- QueryEnvironment queryEnvironment = new QueryEnvironment(database,
tableCache, null);
- queryEnvironment.getTableNamesForQuery(query);
- LOGGER.info("Successfully compiled query using the multi-stage query
engine: `{}`", query);
- return true;
- } catch (Exception e) {
- LOGGER.error("Encountered an error while compiling query `{}` using the
multi-stage query engine", query, e);
- return false;
- }
+ long compileStartTime = System.currentTimeMillis();
+ LOGGER.debug("Trying to compile query `{}` using the multi-stage query
engine", query);
+ QueryEnvironment queryEnvironment = new QueryEnvironment(database,
tableCache, null);
+ boolean canCompile = queryEnvironment.canCompileQuery(query);
+ LOGGER.debug("Multi-stage query compilation time = {}ms",
System.currentTimeMillis() - compileStartTime);
+ return canCompile;
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 6abdec73e0..a069031b69 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -348,6 +348,14 @@ public class CommonConstants {
public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS =
"pinot.broker.new.segment.expiration.seconds";
public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS =
TimeUnit.MINUTES.toSeconds(5);
+ // If this config is set to true, the broker will check every query
executed using the v1 query engine and attempt
+ // to determine whether the query could have successfully been run on the
v2 / multi-stage query engine. If not,
+ // a counter metric will be incremented - if this counter remains 0 during
regular query workload execution, it
+ // signals that users can potentially migrate their query workload to the
multistage query engine.
+ public static final String
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC
+ = "pinot.broker.enable.multistage.migration.metric";
+ public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC =
false;
+
public static class Request {
public static final String SQL = "sql";
public static final String TRACE = "trace";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]