Jackie-Jiang commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2785216063
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+ private final int _maxRowsAcrossSegments;
+ private final int _numRowsWithoutChangeLimit;
+ private long _rowsWithoutChange;
+ private boolean _rowBudgetReached;
+ private boolean _noChangeLimitReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+ private BaseResultsBlock.EarlyTerminationReason _earlyTerminationReason =
+ BaseResultsBlock.EarlyTerminationReason.NONE;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ _maxRowsAcrossSegments = maxRows != null ? maxRows : UNLIMITED_ROWS;
+ Integer numRowsWithoutChange =
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryContext.getQueryOptions());
+ _numRowsWithoutChangeLimit = numRowsWithoutChange != null ?
numRowsWithoutChange : UNLIMITED_ROWS;
+ Long maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : UNLIMITED_TIME_NANOS;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
- return resultsBlock.getDistinctTable().isSatisfied();
+ if (_rowBudgetReached || _noChangeLimitReached) {
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ if (resultsBlock.getDistinctTable().isSatisfied()) {
+ return true;
+ }
+ BaseResultsBlock.EarlyTerminationReason blockReason =
resultsBlock.getEarlyTerminationReason();
+ if (blockReason ==
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+ applyBlockEarlyTerminationReason(blockReason);
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ if (_timeLimitReached || hasExceededTimeLimit()) {
+ _timeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ return false;
}
@Override
public void mergeResultsBlocks(DistinctResultsBlock mergedBlock,
DistinctResultsBlock blockToMerge) {
+ int sizeBefore = mergedBlock.getDistinctTable().size();
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
+ int sizeAfter = mergedBlock.getDistinctTable().size();
+ mergedBlock.setNumDocsScanned(mergedBlock.getNumDocsScanned() +
blockToMerge.getNumDocsScanned());
+ if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
+ if (sizeBefore == sizeAfter) {
+ _rowsWithoutChange += blockToMerge.getNumDocsScanned();
+ if (_rowsWithoutChange >= _numRowsWithoutChangeLimit) {
+ _noChangeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
+ }
+ } else {
+ _rowsWithoutChange = 0;
+ }
+ }
+ if (_maxRowsAcrossSegments != UNLIMITED_ROWS
+ && mergedBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+ _rowBudgetReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+ }
+ BaseResultsBlock.EarlyTerminationReason blockReason =
blockToMerge.getEarlyTerminationReason();
+ if (blockReason ==
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+ applyBlockEarlyTerminationReason(blockReason);
+ }
+ if (!_rowBudgetReached && !_noChangeLimitReached && (_timeLimitReached ||
hasExceededTimeLimit())) {
+ _timeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+ }
+ applyEarlyTerminationReasonIfNeeded(mergedBlock);
+ }
+
+ private boolean hasExceededTimeLimit() {
+ return _maxExecutionTimeNs != UNLIMITED_TIME_NANOS && System.nanoTime() -
_startTimeNs >= _maxExecutionTimeNs;
+ }
+
+ private void
applyBlockEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason
reason) {
+ switch (reason) {
+ case DISTINCT_TIME_LIMIT:
+ _timeLimitReached = true;
+ updateEarlyTerminationReason(reason);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason reason) {
+ if (reason == BaseResultsBlock.EarlyTerminationReason.NONE ||
_earlyTerminationReason == reason) {
+ return;
+ }
Review Comment:
(minor) This check is redundant
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are
enforced consistently
+ * while processing each {@link
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+ private int _rowsRemaining = UNLIMITED_ROWS;
+ private int _numRowsProcessed = 0;
+ private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+ private int _numRowsWithoutChange = 0;
+ private boolean _numRowsWithoutChangeLimitReached = false;
+ private boolean _maxRowsLimitReached = false;
+ private boolean _trackingEnabled = false;
+ // Absolute deadline (in nanos from the configured time supplier). A
deadline stays consistent with the time source
+ // and enables budget checks.
+ private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+ private boolean _timeLimitReached = false;
+ private LongSupplier _timeSupplier = System::nanoTime;
+
+ @VisibleForTesting
+ public void setTimeSupplier(LongSupplier timeSupplier) {
+ if (timeSupplier == null || timeSupplier == _timeSupplier) {
+ return;
+ }
+ _timeSupplier = timeSupplier;
+ }
+
+ public void setMaxRowsToProcess(int maxRows) {
Review Comment:
After pushing the logic into `DistinctExecutor`, all the setters can be
moved into a constructor
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java:
##########
@@ -29,6 +30,70 @@ public interface DistinctExecutor {
// TODO: Tune the initial capacity
int MAX_INITIAL_CAPACITY = 10000;
+ /**
+ * Sets the maximum number of rows to process across all blocks.
Implementations should respect this limit and avoid
+ * reading more rows once exhausted. Default implementation is a no-op for
executors that do not support it.
+ */
+ default void setMaxRowsToProcess(int maxRows) {
+ }
+
+ /**
+ * Returns the remaining number of rows that can be processed.
Implementations that do not support early termination
+ * should return {@link Integer#MAX_VALUE}.
+ */
+ default int getRemainingRowsToProcess() {
+ return Integer.MAX_VALUE;
+ }
+
+ /**
+ * Sets the maximum number of rows to scan without adding any new distinct
value before early-terminating.
+ */
+ default void setNumRowsWithoutChangeInDistinct(int
numRowsWithoutChangeInDistinct) {
+ }
+
+ /**
+ * Sets the time supplier used by the executor to evaluate time-based early
termination. Implementations should call
+ * {@link LongSupplier#getAsLong()} in nanoseconds and treat it as a
monotonic clock (e.g. {@link System#nanoTime()}).
+ */
+ default void setTimeSupplier(LongSupplier timeSupplier) {
Review Comment:
Seems not needed
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +175,78 @@ private boolean processWithoutNull(BlockValSet
blockValueSet, int numDocs) {
public DistinctTable getResult() {
return _distinctTable;
}
+
+ @Override
+ public int getNumDistinctRowsCollected() {
+ return _distinctTable.size();
+ }
+
+ @Override
+ public int getRemainingRowsToProcess() {
+ return _earlyTerminationContext.getRemainingRowsToProcess();
+ }
+
+ private int clampToRemaining(int numDocs) {
+ return _earlyTerminationContext.clampToRemaining(numDocs);
+ }
+
+ private void recordRowProcessed(boolean distinctChanged) {
+ _earlyTerminationContext.recordRowProcessed(distinctChanged);
+ }
+
+ private boolean shouldStopProcessing() {
+ return _earlyTerminationContext.shouldStopProcessing();
+ }
+
+ private boolean shouldStopProcessingWithoutTime() {
+ return _earlyTerminationContext.shouldStopProcessingWithoutTime();
+ }
+
+ private boolean processSVWithTracking(S values, int from, int to,
RoaringBitmap nullBitmap,
+ boolean trackDistinctChange) {
+ boolean limitReached = false;
+ boolean nullAdded = _distinctTable.hasNull();
+ for (int docId = from; docId < to; docId++) {
+ boolean distinctChanged = false;
+ if (nullBitmap != null && nullBitmap.contains(docId)) {
+ if (!nullAdded) {
+ _distinctTable.addNull();
+ nullAdded = true;
+ distinctChanged = true;
+ }
+ } else {
+ if (trackDistinctChange) {
+ int sizeBefore = _distinctTable.size();
+ limitReached = processSV(values, docId, docId + 1);
Review Comment:
Avoid using batch API. It is always processing a single row using a loop.
I'm surprised this overhead is not shown in the benchmark.
To reduce the overhead, consider fully processing a block, then track if the
distinct is changed. We don't need super fine gained track here. Same for other
executors
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+ private final int _maxRowsAcrossSegments;
+ private final int _numRowsWithoutChangeLimit;
+ private long _rowsWithoutChange;
+ private boolean _rowBudgetReached;
+ private boolean _noChangeLimitReached;
+ private boolean _timeLimitReached;
+ private final long _maxExecutionTimeNs;
+ private final long _startTimeNs;
+ private BaseResultsBlock.EarlyTerminationReason _earlyTerminationReason =
+ BaseResultsBlock.EarlyTerminationReason.NONE;
+
+ public DistinctResultsBlockMerger(QueryContext queryContext) {
+ Integer maxRows =
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+ _maxRowsAcrossSegments = maxRows != null ? maxRows : UNLIMITED_ROWS;
+ Integer numRowsWithoutChange =
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryContext.getQueryOptions());
+ _numRowsWithoutChangeLimit = numRowsWithoutChange != null ?
numRowsWithoutChange : UNLIMITED_ROWS;
+ Long maxExecutionTimeMs =
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+ _maxExecutionTimeNs = maxExecutionTimeMs != null ?
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+ : UNLIMITED_TIME_NANOS;
+ _startTimeNs = System.nanoTime();
+ }
@Override
public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
- return resultsBlock.getDistinctTable().isSatisfied();
+ if (_rowBudgetReached || _noChangeLimitReached) {
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ if (resultsBlock.getDistinctTable().isSatisfied()) {
+ return true;
+ }
+ BaseResultsBlock.EarlyTerminationReason blockReason =
resultsBlock.getEarlyTerminationReason();
+ if (blockReason ==
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+ applyBlockEarlyTerminationReason(blockReason);
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ if (_timeLimitReached || hasExceededTimeLimit()) {
+ _timeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+ applyEarlyTerminationReasonIfNeeded(resultsBlock);
+ return true;
+ }
+ return false;
}
@Override
public void mergeResultsBlocks(DistinctResultsBlock mergedBlock,
DistinctResultsBlock blockToMerge) {
+ int sizeBefore = mergedBlock.getDistinctTable().size();
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
+ int sizeAfter = mergedBlock.getDistinctTable().size();
+ mergedBlock.setNumDocsScanned(mergedBlock.getNumDocsScanned() +
blockToMerge.getNumDocsScanned());
+ if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
+ if (sizeBefore == sizeAfter) {
+ _rowsWithoutChange += blockToMerge.getNumDocsScanned();
+ if (_rowsWithoutChange >= _numRowsWithoutChangeLimit) {
+ _noChangeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
+ }
+ } else {
+ _rowsWithoutChange = 0;
+ }
+ }
+ if (_maxRowsAcrossSegments != UNLIMITED_ROWS
+ && mergedBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+ _rowBudgetReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+ }
+ BaseResultsBlock.EarlyTerminationReason blockReason =
blockToMerge.getEarlyTerminationReason();
+ if (blockReason ==
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+ applyBlockEarlyTerminationReason(blockReason);
+ }
+ if (!_rowBudgetReached && !_noChangeLimitReached && (_timeLimitReached ||
hasExceededTimeLimit())) {
+ _timeLimitReached = true;
+
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+ }
+ applyEarlyTerminationReasonIfNeeded(mergedBlock);
+ }
+
+ private boolean hasExceededTimeLimit() {
+ return _maxExecutionTimeNs != UNLIMITED_TIME_NANOS && System.nanoTime() -
_startTimeNs >= _maxExecutionTimeNs;
+ }
+
+ private void
applyBlockEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason
reason) {
Review Comment:
Rename this method since it only handles time limit
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
*/
package org.apache.pinot.core.operator.combine.merger;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
public class DistinctResultsBlockMerger implements
ResultsBlockMerger<DistinctResultsBlock> {
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+ private final int _maxRowsAcrossSegments;
+ private final int _numRowsWithoutChangeLimit;
+ private long _rowsWithoutChange;
Review Comment:
(nit) Move the non-final variables below the final ones
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -117,6 +117,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String,
String> queryOptions) {
return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
extraPassiveTimeoutMsString, 0);
}
+ @Nullable
+ public static Long getMaxExecutionTimeMsInDistinct(Map<String, String>
queryOptions) {
+ String maxExecutionTimeMs =
queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
+ return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT,
maxExecutionTimeMs, 0);
Review Comment:
Use `checkedParseLongPositive()`. It should not be 0
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +175,78 @@ private boolean processWithoutNull(BlockValSet
blockValueSet, int numDocs) {
public DistinctTable getResult() {
return _distinctTable;
}
+
+ @Override
+ public int getNumDistinctRowsCollected() {
+ return _distinctTable.size();
+ }
+
+ @Override
+ public int getRemainingRowsToProcess() {
+ return _earlyTerminationContext.getRemainingRowsToProcess();
+ }
+
+ private int clampToRemaining(int numDocs) {
+ return _earlyTerminationContext.clampToRemaining(numDocs);
+ }
+
+ private void recordRowProcessed(boolean distinctChanged) {
+ _earlyTerminationContext.recordRowProcessed(distinctChanged);
+ }
+
+ private boolean shouldStopProcessing() {
+ return _earlyTerminationContext.shouldStopProcessing();
+ }
+
+ private boolean shouldStopProcessingWithoutTime() {
+ return _earlyTerminationContext.shouldStopProcessingWithoutTime();
+ }
+
+ private boolean processSVWithTracking(S values, int from, int to,
RoaringBitmap nullBitmap,
Review Comment:
(minor) `from` is always 0
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -108,35 +110,37 @@ private IntDistinctTable
createIntDistinctTable(DataSchema dataSchema, Dictionar
int numValuesToKeep = Math.min(limit, dictLength);
IntDistinctTable distinctTable =
new IntDistinctTable(dataSchema, limit,
_queryContext.isNullHandlingEnabled(), orderByExpression);
+ int rowsProcessed = 0;
Review Comment:
Most changes in this file no longer apply
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -32,26 +33,94 @@
public abstract class BaseSingleColumnDistinctExecutor<T extends
DistinctTable, S, M> implements DistinctExecutor {
protected final ExpressionContext _expression;
protected final T _distinctTable;
+ private final DistinctEarlyTerminationContext _earlyTerminationContext = new
DistinctEarlyTerminationContext();
public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T
distinctTable) {
_expression = expression;
_distinctTable = distinctTable;
}
+ @Override
+ public void setMaxRowsToProcess(int maxRows) {
+ _earlyTerminationContext.setMaxRowsToProcess(maxRows);
+ }
+
+ @Override
+ public void setNumRowsWithoutChangeInDistinct(int
numRowsWithoutChangeInDistinct) {
+
_earlyTerminationContext.setNumRowsWithoutChangeInDistinct(numRowsWithoutChangeInDistinct);
+ }
+
+ @Override
+ public void setTimeSupplier(LongSupplier timeSupplier) {
+ _earlyTerminationContext.setTimeSupplier(timeSupplier);
+ }
+
+ @Override
+ public void setRemainingTimeNanos(long remainingTimeNanos) {
+ _earlyTerminationContext.setRemainingTimeNanos(remainingTimeNanos);
+ }
+
+ @Override
+ public boolean isNumRowsWithoutChangeLimitReached() {
+ return _earlyTerminationContext.isNumRowsWithoutChangeLimitReached();
+ }
+
+ @Override
+ public boolean isMaxRowsLimitReached() {
+ return _earlyTerminationContext.isMaxRowsLimitReached();
+ }
+
+ @Override
+ public boolean isTimeLimitReached() {
+ return _earlyTerminationContext.isTimeLimitReached();
+ }
+
+ @Override
+ public int getNumRowsProcessed() {
+ return _earlyTerminationContext.getNumRowsProcessed();
+ }
+
@Override
public boolean process(ValueBlock valueBlock) {
+ if (!_earlyTerminationContext.isTrackingEnabled()) {
+ BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+ return processWithoutTracking(blockValueSet, valueBlock.getNumDocs());
+ }
+ if (shouldStopProcessing()) {
+ return true;
+ }
Review Comment:
Given this check involves fetching time, consider only calling it once when
finishing processing the block. Same for other executors
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java:
##########
@@ -29,6 +30,70 @@ public interface DistinctExecutor {
// TODO: Tune the initial capacity
int MAX_INITIAL_CAPACITY = 10000;
+ /**
+ * Sets the maximum number of rows to process across all blocks.
Implementations should respect this limit and avoid
+ * reading more rows once exhausted. Default implementation is a no-op for
executors that do not support it.
+ */
+ default void setMaxRowsToProcess(int maxRows) {
+ }
+
+ /**
+ * Returns the remaining number of rows that can be processed.
Implementations that do not support early termination
+ * should return {@link Integer#MAX_VALUE}.
+ */
+ default int getRemainingRowsToProcess() {
+ return Integer.MAX_VALUE;
Review Comment:
This is internal only interface. No need to provide default impl
##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java:
##########
@@ -40,31 +43,91 @@
*/
public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
private static final String EXPLAIN_NAME = "DISTINCT";
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
private final IndexSegment _indexSegment;
private final QueryContext _queryContext;
private final BaseProjectOperator<?> _projectOperator;
private int _numDocsScanned = 0;
+ private final int _maxRowsInDistinct;
+ private final int _numRowsWithoutChangeInDistinct;
+ private final long _maxExecutionTimeNs;
+ private boolean _hitNoChangeLimit = false;
+ private boolean _hitMaxRowsLimit = false;
+ private boolean _hitTimeLimit = false;
public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
BaseProjectOperator<?> projectOperator) {
_indexSegment = indexSegment;
_queryContext = queryContext;
_projectOperator = projectOperator;
+ Map<String, String> queryOptions = queryContext.getQueryOptions();
Review Comment:
All the processing logic can be pushed into `DistinctExecutor`. I don't see
why we need to handle the query option parsing here, and all the execution
logic in a separate place.
The logic can be well encapsulated and much simpler if we change
`DistinctExecutor.getResult()` to return a `DistinctResultsBlock`. No other
change is needed to `DistinctExecutor` class.
If you want the accurate docs processed, keep `getNumRowsProcessed()`
##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are
enforced consistently
+ * while processing each {@link
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+ private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+ private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+ private int _rowsRemaining = UNLIMITED_ROWS;
+ private int _numRowsProcessed = 0;
+ private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+ private int _numRowsWithoutChange = 0;
+ private boolean _numRowsWithoutChangeLimitReached = false;
+ private boolean _maxRowsLimitReached = false;
+ private boolean _trackingEnabled = false;
+ // Absolute deadline (in nanos from the configured time supplier). A
deadline stays consistent with the time source
+ // and enables budget checks.
+ private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+ private boolean _timeLimitReached = false;
+ private LongSupplier _timeSupplier = System::nanoTime;
+
+ @VisibleForTesting
+ public void setTimeSupplier(LongSupplier timeSupplier) {
+ if (timeSupplier == null || timeSupplier == _timeSupplier) {
+ return;
+ }
+ _timeSupplier = timeSupplier;
Review Comment:
```suggestion
_timeSupplier = timeSupplier;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]