Jackie-Jiang commented on code in PR #17247:
URL: https://github.com/apache/pinot/pull/17247#discussion_r2785216063


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private final int _maxRowsAcrossSegments;
+  private final int _numRowsWithoutChangeLimit;
+  private long _rowsWithoutChange;
+  private boolean _rowBudgetReached;
+  private boolean _noChangeLimitReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+  private BaseResultsBlock.EarlyTerminationReason _earlyTerminationReason =
+      BaseResultsBlock.EarlyTerminationReason.NONE;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+    _maxRowsAcrossSegments = maxRows != null ? maxRows : UNLIMITED_ROWS;
+    Integer numRowsWithoutChange = 
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryContext.getQueryOptions());
+    _numRowsWithoutChangeLimit = numRowsWithoutChange != null ? 
numRowsWithoutChange : UNLIMITED_ROWS;
+    Long maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : UNLIMITED_TIME_NANOS;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
-    return resultsBlock.getDistinctTable().isSatisfied();
+    if (_rowBudgetReached || _noChangeLimitReached) {
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    if (resultsBlock.getDistinctTable().isSatisfied()) {
+      return true;
+    }
+    BaseResultsBlock.EarlyTerminationReason blockReason = 
resultsBlock.getEarlyTerminationReason();
+    if (blockReason == 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+      applyBlockEarlyTerminationReason(blockReason);
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    if (_timeLimitReached || hasExceededTimeLimit()) {
+      _timeLimitReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    return false;
   }
 
   @Override
   public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, 
DistinctResultsBlock blockToMerge) {
+    int sizeBefore = mergedBlock.getDistinctTable().size();
     
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
+    int sizeAfter = mergedBlock.getDistinctTable().size();
+    mergedBlock.setNumDocsScanned(mergedBlock.getNumDocsScanned() + 
blockToMerge.getNumDocsScanned());
+    if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
+      if (sizeBefore == sizeAfter) {
+        _rowsWithoutChange += blockToMerge.getNumDocsScanned();
+        if (_rowsWithoutChange >= _numRowsWithoutChangeLimit) {
+          _noChangeLimitReached = true;
+          
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
+        }
+      } else {
+        _rowsWithoutChange = 0;
+      }
+    }
+    if (_maxRowsAcrossSegments != UNLIMITED_ROWS
+        && mergedBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+      _rowBudgetReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+    }
+    BaseResultsBlock.EarlyTerminationReason blockReason = 
blockToMerge.getEarlyTerminationReason();
+    if (blockReason == 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+      applyBlockEarlyTerminationReason(blockReason);
+    }
+    if (!_rowBudgetReached && !_noChangeLimitReached && (_timeLimitReached || 
hasExceededTimeLimit())) {
+      _timeLimitReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+    }
+    applyEarlyTerminationReasonIfNeeded(mergedBlock);
+  }
+
+  private boolean hasExceededTimeLimit() {
+    return _maxExecutionTimeNs != UNLIMITED_TIME_NANOS && System.nanoTime() - 
_startTimeNs >= _maxExecutionTimeNs;
+  }
+
+  private void 
applyBlockEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason 
reason) {
+    switch (reason) {
+      case DISTINCT_TIME_LIMIT:
+        _timeLimitReached = true;
+        updateEarlyTerminationReason(reason);
+        break;
+      default:
+        break;
+    }
+  }
+
+  private void 
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason reason) {
+    if (reason == BaseResultsBlock.EarlyTerminationReason.NONE || 
_earlyTerminationReason == reason) {
+      return;
+    }

Review Comment:
   (minor) This check is redundant



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row 
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query 
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are 
enforced consistently
+ * while processing each {@link 
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private int _rowsRemaining = UNLIMITED_ROWS;
+  private int _numRowsProcessed = 0;
+  private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+  private int _numRowsWithoutChange = 0;
+  private boolean _numRowsWithoutChangeLimitReached = false;
+  private boolean _maxRowsLimitReached = false;
+  private boolean _trackingEnabled = false;
+  // Absolute deadline (in nanos from the configured time supplier). A 
deadline stays consistent with the time source
+  // and enables budget checks.
+  private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+  private boolean _timeLimitReached = false;
+  private LongSupplier _timeSupplier = System::nanoTime;
+
+  @VisibleForTesting
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    if (timeSupplier == null || timeSupplier == _timeSupplier) {
+      return;
+    }
+    _timeSupplier = timeSupplier;
+  }
+
+  public void setMaxRowsToProcess(int maxRows) {

Review Comment:
   After pushing the logic into `DistinctExecutor`, all the setters can be 
moved into a constructor



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java:
##########
@@ -29,6 +30,70 @@ public interface DistinctExecutor {
   // TODO: Tune the initial capacity
   int MAX_INITIAL_CAPACITY = 10000;
 
+  /**
+   * Sets the maximum number of rows to process across all blocks. 
Implementations should respect this limit and avoid
+   * reading more rows once exhausted. Default implementation is a no-op for 
executors that do not support it.
+   */
+  default void setMaxRowsToProcess(int maxRows) {
+  }
+
+  /**
+   * Returns the remaining number of rows that can be processed. 
Implementations that do not support early termination
+   * should return {@link Integer#MAX_VALUE}.
+   */
+  default int getRemainingRowsToProcess() {
+    return Integer.MAX_VALUE;
+  }
+
+  /**
+   * Sets the maximum number of rows to scan without adding any new distinct 
value before early-terminating.
+   */
+  default void setNumRowsWithoutChangeInDistinct(int 
numRowsWithoutChangeInDistinct) {
+  }
+
+  /**
+   * Sets the time supplier used by the executor to evaluate time-based early 
termination. Implementations should call
+   * {@link LongSupplier#getAsLong()} in nanoseconds and treat it as a 
monotonic clock (e.g. {@link System#nanoTime()}).
+   */
+  default void setTimeSupplier(LongSupplier timeSupplier) {

Review Comment:
   Seems not needed



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +175,78 @@ private boolean processWithoutNull(BlockValSet 
blockValueSet, int numDocs) {
   public DistinctTable getResult() {
     return _distinctTable;
   }
+
+  @Override
+  public int getNumDistinctRowsCollected() {
+    return _distinctTable.size();
+  }
+
+  @Override
+  public int getRemainingRowsToProcess() {
+    return _earlyTerminationContext.getRemainingRowsToProcess();
+  }
+
+  private int clampToRemaining(int numDocs) {
+    return _earlyTerminationContext.clampToRemaining(numDocs);
+  }
+
+  private void recordRowProcessed(boolean distinctChanged) {
+    _earlyTerminationContext.recordRowProcessed(distinctChanged);
+  }
+
+  private boolean shouldStopProcessing() {
+    return _earlyTerminationContext.shouldStopProcessing();
+  }
+
+  private boolean shouldStopProcessingWithoutTime() {
+    return _earlyTerminationContext.shouldStopProcessingWithoutTime();
+  }
+
+  private boolean processSVWithTracking(S values, int from, int to, 
RoaringBitmap nullBitmap,
+      boolean trackDistinctChange) {
+    boolean limitReached = false;
+    boolean nullAdded = _distinctTable.hasNull();
+    for (int docId = from; docId < to; docId++) {
+      boolean distinctChanged = false;
+      if (nullBitmap != null && nullBitmap.contains(docId)) {
+        if (!nullAdded) {
+          _distinctTable.addNull();
+          nullAdded = true;
+          distinctChanged = true;
+        }
+      } else {
+        if (trackDistinctChange) {
+          int sizeBefore = _distinctTable.size();
+          limitReached = processSV(values, docId, docId + 1);

Review Comment:
   Avoid using batch API. It is always processing a single row using a loop. 
I'm surprised this overhead is not shown in the benchmark.
   
   To reduce the overhead, consider fully processing a block, then track if the 
distinct is changed. We don't need super fine gained track here. Same for other 
executors



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private final int _maxRowsAcrossSegments;
+  private final int _numRowsWithoutChangeLimit;
+  private long _rowsWithoutChange;
+  private boolean _rowBudgetReached;
+  private boolean _noChangeLimitReached;
+  private boolean _timeLimitReached;
+  private final long _maxExecutionTimeNs;
+  private final long _startTimeNs;
+  private BaseResultsBlock.EarlyTerminationReason _earlyTerminationReason =
+      BaseResultsBlock.EarlyTerminationReason.NONE;
+
+  public DistinctResultsBlockMerger(QueryContext queryContext) {
+    Integer maxRows = 
QueryOptionsUtils.getMaxRowsInDistinct(queryContext.getQueryOptions());
+    _maxRowsAcrossSegments = maxRows != null ? maxRows : UNLIMITED_ROWS;
+    Integer numRowsWithoutChange = 
QueryOptionsUtils.getNumRowsWithoutChangeInDistinct(queryContext.getQueryOptions());
+    _numRowsWithoutChangeLimit = numRowsWithoutChange != null ? 
numRowsWithoutChange : UNLIMITED_ROWS;
+    Long maxExecutionTimeMs = 
QueryOptionsUtils.getMaxExecutionTimeMsInDistinct(queryContext.getQueryOptions());
+    _maxExecutionTimeNs = maxExecutionTimeMs != null ? 
TimeUnit.MILLISECONDS.toNanos(maxExecutionTimeMs)
+        : UNLIMITED_TIME_NANOS;
+    _startTimeNs = System.nanoTime();
+  }
 
   @Override
   public boolean isQuerySatisfied(DistinctResultsBlock resultsBlock) {
-    return resultsBlock.getDistinctTable().isSatisfied();
+    if (_rowBudgetReached || _noChangeLimitReached) {
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    if (resultsBlock.getDistinctTable().isSatisfied()) {
+      return true;
+    }
+    BaseResultsBlock.EarlyTerminationReason blockReason = 
resultsBlock.getEarlyTerminationReason();
+    if (blockReason == 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+      applyBlockEarlyTerminationReason(blockReason);
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    if (_timeLimitReached || hasExceededTimeLimit()) {
+      _timeLimitReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+      applyEarlyTerminationReasonIfNeeded(resultsBlock);
+      return true;
+    }
+    return false;
   }
 
   @Override
   public void mergeResultsBlocks(DistinctResultsBlock mergedBlock, 
DistinctResultsBlock blockToMerge) {
+    int sizeBefore = mergedBlock.getDistinctTable().size();
     
mergedBlock.getDistinctTable().mergeDistinctTable(blockToMerge.getDistinctTable());
+    int sizeAfter = mergedBlock.getDistinctTable().size();
+    mergedBlock.setNumDocsScanned(mergedBlock.getNumDocsScanned() + 
blockToMerge.getNumDocsScanned());
+    if (_numRowsWithoutChangeLimit != UNLIMITED_ROWS) {
+      if (sizeBefore == sizeAfter) {
+        _rowsWithoutChange += blockToMerge.getNumDocsScanned();
+        if (_rowsWithoutChange >= _numRowsWithoutChangeLimit) {
+          _noChangeLimitReached = true;
+          
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_NO_NEW_VALUES);
+        }
+      } else {
+        _rowsWithoutChange = 0;
+      }
+    }
+    if (_maxRowsAcrossSegments != UNLIMITED_ROWS
+        && mergedBlock.getNumDocsScanned() >= _maxRowsAcrossSegments) {
+      _rowBudgetReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS);
+    }
+    BaseResultsBlock.EarlyTerminationReason blockReason = 
blockToMerge.getEarlyTerminationReason();
+    if (blockReason == 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT) {
+      applyBlockEarlyTerminationReason(blockReason);
+    }
+    if (!_rowBudgetReached && !_noChangeLimitReached && (_timeLimitReached || 
hasExceededTimeLimit())) {
+      _timeLimitReached = true;
+      
updateEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason.DISTINCT_TIME_LIMIT);
+    }
+    applyEarlyTerminationReasonIfNeeded(mergedBlock);
+  }
+
+  private boolean hasExceededTimeLimit() {
+    return _maxExecutionTimeNs != UNLIMITED_TIME_NANOS && System.nanoTime() - 
_startTimeNs >= _maxExecutionTimeNs;
+  }
+
+  private void 
applyBlockEarlyTerminationReason(BaseResultsBlock.EarlyTerminationReason 
reason) {

Review Comment:
   Rename this method since it only handles time limit



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/DistinctResultsBlockMerger.java:
##########
@@ -18,18 +18,131 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class DistinctResultsBlockMerger implements 
ResultsBlockMerger<DistinctResultsBlock> {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private final int _maxRowsAcrossSegments;
+  private final int _numRowsWithoutChangeLimit;
+  private long _rowsWithoutChange;

Review Comment:
   (nit) Move the non-final variables below the final ones



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java:
##########
@@ -117,6 +117,12 @@ public static Long getExtraPassiveTimeoutMs(Map<String, 
String> queryOptions) {
     return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, 
extraPassiveTimeoutMsString, 0);
   }
 
+  @Nullable
+  public static Long getMaxExecutionTimeMsInDistinct(Map<String, String> 
queryOptions) {
+    String maxExecutionTimeMs = 
queryOptions.get(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT);
+    return checkedParseLong(QueryOptionKey.MAX_EXECUTION_TIME_MS_IN_DISTINCT, 
maxExecutionTimeMs, 0);

Review Comment:
   Use `checkedParseLongPositive()`. It should not be 0



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -106,4 +175,78 @@ private boolean processWithoutNull(BlockValSet 
blockValueSet, int numDocs) {
   public DistinctTable getResult() {
     return _distinctTable;
   }
+
+  @Override
+  public int getNumDistinctRowsCollected() {
+    return _distinctTable.size();
+  }
+
+  @Override
+  public int getRemainingRowsToProcess() {
+    return _earlyTerminationContext.getRemainingRowsToProcess();
+  }
+
+  private int clampToRemaining(int numDocs) {
+    return _earlyTerminationContext.clampToRemaining(numDocs);
+  }
+
+  private void recordRowProcessed(boolean distinctChanged) {
+    _earlyTerminationContext.recordRowProcessed(distinctChanged);
+  }
+
+  private boolean shouldStopProcessing() {
+    return _earlyTerminationContext.shouldStopProcessing();
+  }
+
+  private boolean shouldStopProcessingWithoutTime() {
+    return _earlyTerminationContext.shouldStopProcessingWithoutTime();
+  }
+
+  private boolean processSVWithTracking(S values, int from, int to, 
RoaringBitmap nullBitmap,

Review Comment:
   (minor) `from` is always 0



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedDistinctOperator.java:
##########
@@ -108,35 +110,37 @@ private IntDistinctTable 
createIntDistinctTable(DataSchema dataSchema, Dictionar
     int numValuesToKeep = Math.min(limit, dictLength);
     IntDistinctTable distinctTable =
         new IntDistinctTable(dataSchema, limit, 
_queryContext.isNullHandlingEnabled(), orderByExpression);
+    int rowsProcessed = 0;

Review Comment:
   Most changes in this file no longer apply



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/BaseSingleColumnDistinctExecutor.java:
##########
@@ -32,26 +33,94 @@
 public abstract class BaseSingleColumnDistinctExecutor<T extends 
DistinctTable, S, M> implements DistinctExecutor {
   protected final ExpressionContext _expression;
   protected final T _distinctTable;
+  private final DistinctEarlyTerminationContext _earlyTerminationContext = new 
DistinctEarlyTerminationContext();
 
   public BaseSingleColumnDistinctExecutor(ExpressionContext expression, T 
distinctTable) {
     _expression = expression;
     _distinctTable = distinctTable;
   }
 
+  @Override
+  public void setMaxRowsToProcess(int maxRows) {
+    _earlyTerminationContext.setMaxRowsToProcess(maxRows);
+  }
+
+  @Override
+  public void setNumRowsWithoutChangeInDistinct(int 
numRowsWithoutChangeInDistinct) {
+    
_earlyTerminationContext.setNumRowsWithoutChangeInDistinct(numRowsWithoutChangeInDistinct);
+  }
+
+  @Override
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    _earlyTerminationContext.setTimeSupplier(timeSupplier);
+  }
+
+  @Override
+  public void setRemainingTimeNanos(long remainingTimeNanos) {
+    _earlyTerminationContext.setRemainingTimeNanos(remainingTimeNanos);
+  }
+
+  @Override
+  public boolean isNumRowsWithoutChangeLimitReached() {
+    return _earlyTerminationContext.isNumRowsWithoutChangeLimitReached();
+  }
+
+  @Override
+  public boolean isMaxRowsLimitReached() {
+    return _earlyTerminationContext.isMaxRowsLimitReached();
+  }
+
+  @Override
+  public boolean isTimeLimitReached() {
+    return _earlyTerminationContext.isTimeLimitReached();
+  }
+
+  @Override
+  public int getNumRowsProcessed() {
+    return _earlyTerminationContext.getNumRowsProcessed();
+  }
+
   @Override
   public boolean process(ValueBlock valueBlock) {
+    if (!_earlyTerminationContext.isTrackingEnabled()) {
+      BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+      return processWithoutTracking(blockValueSet, valueBlock.getNumDocs());
+    }
+    if (shouldStopProcessing()) {
+      return true;
+    }

Review Comment:
   Given this check involves fetching time, consider only calling it once when 
finishing processing the block. Same for other executors



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java:
##########
@@ -29,6 +30,70 @@ public interface DistinctExecutor {
   // TODO: Tune the initial capacity
   int MAX_INITIAL_CAPACITY = 10000;
 
+  /**
+   * Sets the maximum number of rows to process across all blocks. 
Implementations should respect this limit and avoid
+   * reading more rows once exhausted. Default implementation is a no-op for 
executors that do not support it.
+   */
+  default void setMaxRowsToProcess(int maxRows) {
+  }
+
+  /**
+   * Returns the remaining number of rows that can be processed. 
Implementations that do not support early termination
+   * should return {@link Integer#MAX_VALUE}.
+   */
+  default int getRemainingRowsToProcess() {
+    return Integer.MAX_VALUE;

Review Comment:
   This is internal only interface. No need to provide default impl



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java:
##########
@@ -40,31 +43,91 @@
  */
 public class DistinctOperator extends BaseOperator<DistinctResultsBlock> {
   private static final String EXPLAIN_NAME = "DISTINCT";
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
 
   private final IndexSegment _indexSegment;
   private final QueryContext _queryContext;
   private final BaseProjectOperator<?> _projectOperator;
 
   private int _numDocsScanned = 0;
+  private final int _maxRowsInDistinct;
+  private final int _numRowsWithoutChangeInDistinct;
+  private final long _maxExecutionTimeNs;
+  private boolean _hitNoChangeLimit = false;
+  private boolean _hitMaxRowsLimit = false;
+  private boolean _hitTimeLimit = false;
 
   public DistinctOperator(IndexSegment indexSegment, QueryContext queryContext,
       BaseProjectOperator<?> projectOperator) {
     _indexSegment = indexSegment;
     _queryContext = queryContext;
     _projectOperator = projectOperator;
+    Map<String, String> queryOptions = queryContext.getQueryOptions();

Review Comment:
   All the processing logic can be pushed into `DistinctExecutor`. I don't see 
why we need to handle the query option parsing here, and all the execution 
logic in a separate place.
   The logic can be well encapsulated and much simpler if we change 
`DistinctExecutor.getResult()` to return a `DistinctResultsBlock`. No other 
change is needed to `DistinctExecutor` class.
   If you want the accurate docs processed, keep `getNumRowsProcessed()`



##########
pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctEarlyTerminationContext.java:
##########
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.function.LongSupplier;
+
+/**
+ * Tracks per-block early-termination budgets for distinct executors (row 
limits and no-change limits).
+ * <p>All distinct executors should delegate to this class so that query 
options such as
+ * {@code maxRowsInDistinct} and {@code numRowsWithoutChangeInDistinct} are 
enforced consistently
+ * while processing each {@link 
org.apache.pinot.core.operator.blocks.ValueBlock}.</p>
+ */
+public class DistinctEarlyTerminationContext {
+  private static final int UNLIMITED_ROWS = Integer.MAX_VALUE;
+  private static final long UNLIMITED_TIME_NANOS = Long.MAX_VALUE;
+
+  private int _rowsRemaining = UNLIMITED_ROWS;
+  private int _numRowsProcessed = 0;
+  private int _numRowsWithoutChangeLimit = UNLIMITED_ROWS;
+  private int _numRowsWithoutChange = 0;
+  private boolean _numRowsWithoutChangeLimitReached = false;
+  private boolean _maxRowsLimitReached = false;
+  private boolean _trackingEnabled = false;
+  // Absolute deadline (in nanos from the configured time supplier). A 
deadline stays consistent with the time source
+  // and enables budget checks.
+  private long _deadlineTimeNanos = UNLIMITED_TIME_NANOS;
+  private boolean _timeLimitReached = false;
+  private LongSupplier _timeSupplier = System::nanoTime;
+
+  @VisibleForTesting
+  public void setTimeSupplier(LongSupplier timeSupplier) {
+    if (timeSupplier == null || timeSupplier == _timeSupplier) {
+      return;
+    }
+    _timeSupplier = timeSupplier;

Review Comment:
   ```suggestion
       _timeSupplier = timeSupplier;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to