This is an automated email from the ASF dual-hosted git repository.
vvivekiyer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fecaca97d0 Add config for logging queries before processing on broker
instances (#16056)
fecaca97d0 is described below
commit fecaca97d0421c73ac327e519a23d3bcdf595993
Author: Dino Occhialini <[email protected]>
AuthorDate: Wed Jun 11 16:31:17 2025 -0700
Add config for logging queries before processing on broker instances
(#16056)
* Add config for duplicate sql query logging
* Fix config naming typo
---
.../java/org/apache/pinot/broker/querylog/QueryLogger.java | 12 ++++++++----
.../org/apache/pinot/broker/querylog/QueryLoggerTest.java | 14 +++++++-------
.../java/org/apache/pinot/spi/utils/CommonConstants.java | 3 +++
3 files changed, 18 insertions(+), 11 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
index 97a6bde0fe..20a94d78e8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -50,6 +50,7 @@ public class QueryLogger {
private final int _maxQueryLengthToLog;
private final RateLimiter _logRateLimiter;
private final boolean _enableIpLogging;
+ private final boolean _logBeforeProcessing;
private final Logger _logger;
private final RateLimiter _droppedLogRateLimiter;
private final AtomicLong _numDroppedLogs = new AtomicLong(0L);
@@ -59,23 +60,26 @@ public class QueryLogger {
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND)),
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH,
Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH),
config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
- Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING), LOGGER,
RateLimiter.create(1)
+ Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING),
+ config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_BEFORE_PROCESSING,
+ Broker.DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING), LOGGER,
RateLimiter.create(1)
// log once a second for dropped log count
);
}
@VisibleForTesting
- QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean
enableIpLogging, Logger logger,
- RateLimiter droppedLogRateLimiter) {
+ QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean
enableIpLogging, boolean logBeforeProcessing,
+ Logger logger, RateLimiter droppedLogRateLimiter) {
_logRateLimiter = logRateLimiter;
_maxQueryLengthToLog = maxQueryLengthToLog;
_enableIpLogging = enableIpLogging;
_logger = logger;
_droppedLogRateLimiter = droppedLogRateLimiter;
+ _logBeforeProcessing = logBeforeProcessing;
}
public void log(long requestId, String query) {
- if (!checkRateLimiter(null)) {
+ if (!_logBeforeProcessing || !checkRateLimiter(null)) {
return;
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
index dcd41e078f..3cb82436e4 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -87,7 +87,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -125,7 +125,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -141,7 +141,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -155,7 +155,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
QueryLogger.QueryLogParams params = generateParams(true, true, 0, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -169,7 +169,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
QueryLogger.QueryLogParams params = generateParams(false, false, 1, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -183,7 +183,7 @@ public class QueryLoggerTest {
// Given:
Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
QueryLogger.QueryLogParams params = generateParams(false, false, 0, 1456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
// When:
queryLogger.log(params);
@@ -217,7 +217,7 @@ public class QueryLoggerTest {
}).thenReturn(true);
QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456);
- QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
_logger, _droppedRateLimiter);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true,
true, _logger, _droppedRateLimiter);
ExecutorService executorService = Executors.newFixedThreadPool(4);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 137eaa9149..bd76f381c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -320,6 +320,9 @@ public class CommonConstants {
public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH =
Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
"pinot.broker.query.log.maxRatePerSecond";
+ public static final String CONFIG_OF_BROKER_QUERY_LOG_BEFORE_PROCESSING =
+ "pinot.broker.query.log.logBeforeProcessing";
+ public static final boolean DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING =
true;
public static final String CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING =
"pinot.broker.query.enable.null.handling";
public static final String CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION =
"pinot.broker.enable.query.cancellation";
public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
10_000d;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]