Jackie-Jiang commented on code in PR #16308:
URL: https://github.com/apache/pinot/pull/16308#discussion_r2274783175


##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecords.java:
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+public class SortedRecords {

Review Comment:
   Do we need this helper class? Seems `size` is always equal to 
`records.length`, and we can directly pass around `Record[]`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,
+      ExecutorService executorService, Comparator<Record> comparator) {
+    super(dataSchema);
+    assert queryContext.getGroupByExpressions() != null;
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = new Record[_resultSize];
+    _nextIdx = 0;
+  }
+
+  public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, 
QueryContext queryContext,
+      int resultSize, ExecutorService executorService, Comparator<Record> 
comparator) {
+    super(dataSchema);

Review Comment:
   (nit) You might want to put `assert queryContext.getGroupByExpressions() != 
null;`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.data.table.SortedRecords;
+import org.apache.pinot.core.data.table.SortedRecordsMerger;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * <p>Pair-wise Combine operator for sort-aggregation</p>
+ *
+ * <p>In this algorithm, an {@link AtomicReference} is used as a "pit" to store
+ * the processed {@link SortedRecordTable} to be merged.</p>
+ *
+ * <p>Each worker thread first processes a segment to a {@link 
SortedRecordTable},
+ * then greedily take waiting tables from the pit and merge them in, until 
there
+ * is no table waiting, then the merged table is placed in the pit. The worker
+ * thread then proceed to process the next segment.</p>
+ *
+ * <p>When there is a table that merged together {@code _numOperators} tables, 
it
+ * is put into {@code _satisfiedTable} as the combine result.</p>
+ *
+ * <p>This pair-wise approach allows higher level of parallelism for the first 
rounds
+ * of combine, while keeping the processing in a streaming fashion without
+ * having to wait for all segments to be ready.</p>
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+  private volatile DataSchema _dataSchema;
+
+  private final AtomicReference<SortedRecords> _waitingRecords;
+  private final Comparator<Record> _recordKeyComparator;
+  private final SortedRecordsMerger _sortedRecordsMerger;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (queryContext.shouldSortAggregateUnderSafeTrim());
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingRecords = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+    _sortedRecordsMerger =
+        GroupByUtils.getSortedReduceMerger(queryContext, 
queryContext.getLimit(), _recordKeyComparator);
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        if (_dataSchema == null) {
+          _dataSchema = resultsBlock.getDataSchema();
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {
+          
_waitingRecords.set(GroupByUtils.getAndPopulateSortedRecords(resultsBlock));
+          break;
+        }
+
+        SortedRecords records =

Review Comment:
   This involves extra memory overhead. We can directly put `resultsBlock` in, 
and when getting the reference out, do a `instanceof` check and only extract 
the records when needed



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java:
##########
@@ -324,8 +325,28 @@ private Collection<Record> getUnsortedTopRecords(Map<Key, 
Record> recordsMap, in
    * Trims the aggregation results using a heap and returns the top records.
    * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
    */
-  public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator 
groupKeyGenerator,
+  public List<IntermediateRecord> sortInSegmentResults(GroupKeyGenerator 
groupKeyGenerator,
       GroupByResultHolder[] groupByResultHolders, int size) {
+    // getNumKeys() does not count nulls
+    assert groupKeyGenerator.getNumKeys() <= size;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
groupKeyGenerator.getGroupKeys();
+
+    // Initialize a heap with the first 'size' groups
+    List<IntermediateRecord> arr = new ArrayList<>();
+    while (groupKeyIterator.hasNext()) {
+      arr.add(getIntermediateRecord(groupKeyIterator.next(), 
groupByResultHolders));
+    }
+
+    arr.sort(_intermediateRecordComparator);
+    return arr;
+  }
+
+  /**
+   * Trims the aggregation results using a heap and returns the top records.
+   * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
+   */
+  public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator 
groupKeyGenerator,
+      GroupByResultHolder[] groupByResultHolders, int size, boolean 
sortedOutput) {

Review Comment:
   (minor)
   ```suggestion
         GroupByResultHolder[] groupByResultHolders, int size, boolean 
sortOutput) {
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;

Review Comment:
   Any specific reason keeping this? Seems this is always set to `_records` in 
`finish()`
   Was it made `protected` so that test can access it? I don't see usage of it 
from test



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,
+      ExecutorService executorService, Comparator<Record> comparator) {
+    super(dataSchema);
+    assert queryContext.getGroupByExpressions() != null;
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = new Record[_resultSize];
+    _nextIdx = 0;
+  }
+
+  public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, 
QueryContext queryContext,
+      int resultSize, ExecutorService executorService, Comparator<Record> 
comparator) {
+    super(dataSchema);
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = sortedRecords._records;
+    _nextIdx = sortedRecords._size;
+  }
+
+  /// Only used when creating SortedRecordTable from unique, sorted segment 
groupby results
+  @Override
+  public boolean upsert(Record record) {
+    if (_nextIdx == _resultSize) {
+      // enough records
+      return false;
+    }
+    _records[_nextIdx++] = record;
+    return true;
+  }
+
+  @Override
+  public boolean upsert(Key key, Record record) {
+    throw new UnsupportedOperationException("method unused for 
SortedRecordTable");
+  }
+
+  /// Merge a segment result into self, saving an allocation of 
SortedRecordTable
+  public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock 
block) {
+    List<IntermediateRecord> segmentRecords = block.getIntermediateRecords();
+    if (segmentRecords.isEmpty() || size() == 0) {
+      segmentRecords.forEach(x -> upsert(x._record));
+      return this;
+    }
+    mergeSegmentRecords(segmentRecords, segmentRecords.size());
+    return this;
+  }
+
+  private void finalizeRecordMerge(Record[] records, int newIdx) {
+    _records = records;
+    _nextIdx = newIdx;
+  }
+
+  /// Merge in that._records, update _records _curIdx
+  private void mergeSegmentRecords(List<IntermediateRecord> records2, int mj) {

Review Comment:
   Seems `mj` is always `records2.size()`, and we don't need to pass it



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java:
##########
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SortedRecordTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryErrorMessage;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.trace.Tracing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Combine operator for group-by queries.
+ */
+@SuppressWarnings("rawtypes")
+public class SortedGroupByCombineOperator extends 
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+  private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
+
+  // We use a CountDownLatch to track if all Futures are finished by the query 
timeout, and cancel the unfinished
+  // _futures (try to interrupt the execution if it already started).
+  private final CountDownLatch _operatorLatch;
+
+  private volatile boolean _groupsTrimmed;
+  private volatile boolean _numGroupsLimitReached;
+  private volatile boolean _numGroupsWarningLimitReached;
+
+  private final AtomicReference<SortedRecordTable> _waitingTable;
+  private final AtomicReference<SortedRecordTable> _satisfiedTable;
+  private final Comparator<Record> _recordKeyComparator;
+
+  public SortedGroupByCombineOperator(List<Operator> operators, QueryContext 
queryContext,
+      ExecutorService executorService) {
+    super(null, operators, overrideMaxExecutionThreads(queryContext, 
operators.size()), executorService);
+
+    assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext));
+    _operatorLatch = new CountDownLatch(_numTasks);
+    _waitingTable = new AtomicReference<>();
+    _satisfiedTable = new AtomicReference<>();
+    _recordKeyComparator = 
OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(),
+        queryContext.getGroupByExpressions(), 
queryContext.isNullHandlingEnabled());
+  }
+
+  /**
+   * For group-by queries, when maxExecutionThreads is not explicitly 
configured, override it to create as many tasks as
+   * the default number of query worker threads (or the number of operators / 
segments if that's lower).
+   */
+  private static QueryContext overrideMaxExecutionThreads(QueryContext 
queryContext, int numOperators) {
+    int maxExecutionThreads = queryContext.getMaxExecutionThreads();
+    if (maxExecutionThreads <= 0) {
+      queryContext.setMaxExecutionThreads(Math.min(numOperators, 
ResourceManager.DEFAULT_QUERY_WORKER_THREADS));
+    }
+    return queryContext;
+  }
+
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  /**
+   * Executes query on one sorted segment in a worker thread and merges the 
results into the sorted record table.
+   */
+  @Override
+  protected void processSegments() {
+    int operatorId;
+    while (_processingException.get() == null && (operatorId = 
_nextOperatorId.getAndIncrement()) < _numOperators) {
+      Operator operator = _operators.get(operatorId);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        GroupByResultsBlock resultsBlock = (GroupByResultsBlock) 
operator.nextBlock();
+        if (resultsBlock.isGroupsTrimmed()) {
+          _groupsTrimmed = true;
+        }
+        // Set groups limit reached flag.
+        if (resultsBlock.isNumGroupsLimitReached()) {
+          _numGroupsLimitReached = true;
+        }
+        if (resultsBlock.isNumGroupsWarningLimitReached()) {
+          _numGroupsWarningLimitReached = true;
+        }
+        // short-circuit one segment case
+        if (_numOperators == 1) {

Review Comment:
   Consider adding the above explanation as comments in the code



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,
+      ExecutorService executorService, Comparator<Record> comparator) {
+    super(dataSchema);
+    assert queryContext.getGroupByExpressions() != null;
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = new Record[_resultSize];
+    _nextIdx = 0;
+  }
+
+  public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, 
QueryContext queryContext,
+      int resultSize, ExecutorService executorService, Comparator<Record> 
comparator) {
+    super(dataSchema);
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = sortedRecords._records;
+    _nextIdx = sortedRecords._size;
+  }
+
+  /// Only used when creating SortedRecordTable from unique, sorted segment 
groupby results
+  @Override
+  public boolean upsert(Record record) {
+    if (_nextIdx == _resultSize) {
+      // enough records
+      return false;
+    }
+    _records[_nextIdx++] = record;
+    return true;
+  }
+
+  @Override
+  public boolean upsert(Key key, Record record) {
+    throw new UnsupportedOperationException("method unused for 
SortedRecordTable");
+  }
+
+  /// Merge a segment result into self, saving an allocation of 
SortedRecordTable
+  public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock 
block) {
+    List<IntermediateRecord> segmentRecords = block.getIntermediateRecords();
+    if (segmentRecords.isEmpty() || size() == 0) {
+      segmentRecords.forEach(x -> upsert(x._record));

Review Comment:
   We can directly setup `_records` and `_nextIdx` to reduce checks in 
`upsert()`



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java:
##########
@@ -139,24 +138,47 @@ protected GroupByResultsBlock getNextBlock() {
     // TODO: Currently the groups are not trimmed if there is no ordering 
specified. Consider ordering on group-by
     //       columns if no ordering is specified.
     int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize();
-    if (trimSize > 0) {
-      if (groupByExecutor.getNumGroups() > trimSize) {
-        TableResizer tableResizer = new TableResizer(_dataSchema, 
_queryContext);
-        Collection<IntermediateRecord> intermediateRecords = 
groupByExecutor.trimGroupByResult(trimSize, tableResizer);
-        // trim groupKeyGenerator after getting intermediateRecords
-        groupByExecutor.getGroupKeyGenerator().close();
-
-        
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
 1);
-        boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag 
only if it's not safe
-        GroupByResultsBlock resultsBlock = new 
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
-        resultsBlock.setGroupsTrimmed(unsafeTrim);
-        resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
-        
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
-        return resultsBlock;
-      }
+    boolean unsafeTrim = _queryContext.isUnsafeTrim();
+
+    if (trimSize == 0) {

Review Comment:
   Same here. Why do we need this extra check?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java:
##########
@@ -324,8 +325,28 @@ private Collection<Record> getUnsortedTopRecords(Map<Key, 
Record> recordsMap, in
    * Trims the aggregation results using a heap and returns the top records.
    * This method is to be called from individual segment if the intermediate 
results need to be trimmed.
    */
-  public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator 
groupKeyGenerator,
+  public List<IntermediateRecord> sortInSegmentResults(GroupKeyGenerator 
groupKeyGenerator,
       GroupByResultHolder[] groupByResultHolders, int size) {
+    // getNumKeys() does not count nulls
+    assert groupKeyGenerator.getNumKeys() <= size;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = 
groupKeyGenerator.getGroupKeys();
+
+    // Initialize a heap with the first 'size' groups
+    List<IntermediateRecord> arr = new ArrayList<>();

Review Comment:
   Initialize the array with expected size



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java:
##########
@@ -187,28 +187,48 @@ protected GroupByResultsBlock getNextBlock() {
     // - There are more groups than the trim size
     // TODO: Currently the groups are not trimmed if there is no ordering 
specified. Consider ordering on group-by
     //       columns if no ordering is specified.
+    // TODO: extract common logic with GroupByOperator
     int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize();
-    if (trimSize > 0) {
-      if (groupKeyGenerator.getNumKeys() > trimSize) {
-        TableResizer tableResizer = new TableResizer(_dataSchema, 
_queryContext);
-        Collection<IntermediateRecord> intermediateRecords =
-            tableResizer.trimInSegmentResults(groupKeyGenerator, 
groupByResultHolders, trimSize);
-        // Release the resources used by the group key generator
-        groupKeyGenerator.close();
+    boolean unsafeTrim = _queryContext.isUnsafeTrim();
 
-        
ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED,
 1);
-        boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag 
only if it's not safe
-        GroupByResultsBlock resultsBlock = new 
GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext);
-        resultsBlock.setGroupsTrimmed(unsafeTrim);
-        resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached);
-        
resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached);
-        return resultsBlock;
-      }
+    if (trimSize == 0) {

Review Comment:
   Is this even possible? Is this a bugfix?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,
+      ExecutorService executorService, Comparator<Record> comparator) {
+    super(dataSchema);
+    assert queryContext.getGroupByExpressions() != null;
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = new Record[_resultSize];
+    _nextIdx = 0;
+  }
+
+  public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, 
QueryContext queryContext,
+      int resultSize, ExecutorService executorService, Comparator<Record> 
comparator) {
+    super(dataSchema);
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = sortedRecords._records;
+    _nextIdx = sortedRecords._size;
+  }
+
+  /// Only used when creating SortedRecordTable from unique, sorted segment 
groupby results
+  @Override
+  public boolean upsert(Record record) {
+    if (_nextIdx == _resultSize) {
+      // enough records
+      return false;
+    }
+    _records[_nextIdx++] = record;
+    return true;
+  }
+
+  @Override
+  public boolean upsert(Key key, Record record) {
+    throw new UnsupportedOperationException("method unused for 
SortedRecordTable");
+  }
+
+  /// Merge a segment result into self, saving an allocation of 
SortedRecordTable
+  public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock 
block) {
+    List<IntermediateRecord> segmentRecords = block.getIntermediateRecords();
+    if (segmentRecords.isEmpty() || size() == 0) {
+      segmentRecords.forEach(x -> upsert(x._record));
+      return this;
+    }
+    mergeSegmentRecords(segmentRecords, segmentRecords.size());
+    return this;
+  }
+
+  private void finalizeRecordMerge(Record[] records, int newIdx) {
+    _records = records;
+    _nextIdx = newIdx;
+  }
+
+  /// Merge in that._records, update _records _curIdx
+  private void mergeSegmentRecords(List<IntermediateRecord> records2, int mj) {

Review Comment:
   This is almost identical to `SortedRecordsMerger.mergeSortedRecords()`, do 
we need to keep both of them? Can we extract some common part?
   If the input type is different, one trick is to pass in a lambda to extract 
the `Record` from the given index



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,

Review Comment:
   (minor) We can change this to call the other constructor to reduce duplicates



##########
pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java:
##########
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.core.util.QueryMultiThreadingUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+
+
+/**
+ * Table used for merging of sorted group-by aggregation
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class SortedRecordTable extends BaseTable {
+  private final ExecutorService _executorService;
+  private final int _resultSize;
+  private final int _numKeyColumns;
+  private final AggregationFunction[] _aggregationFunctions;
+
+  protected Record[] _topRecords;
+
+  private Record[] _records;
+  private int _nextIdx;
+  private final Comparator<Record> _comparator;
+  private final int _numThreadsExtractFinalResult;
+  private final int _chunkSizeExtractFinalResult;
+
+  public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, 
int resultSize,
+      ExecutorService executorService, Comparator<Record> comparator) {
+    super(dataSchema);
+    assert queryContext.getGroupByExpressions() != null;
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = new Record[_resultSize];
+    _nextIdx = 0;
+  }
+
+  public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, 
QueryContext queryContext,
+      int resultSize, ExecutorService executorService, Comparator<Record> 
comparator) {
+    super(dataSchema);
+    _numKeyColumns = queryContext.getGroupByExpressions().size();
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    _executorService = executorService;
+    _comparator = comparator;
+    _resultSize = resultSize;
+    _numThreadsExtractFinalResult = 
Math.min(queryContext.getNumThreadsExtractFinalResult(),
+        Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
+    _chunkSizeExtractFinalResult = 
queryContext.getChunkSizeExtractFinalResult();
+    _records = sortedRecords._records;
+    _nextIdx = sortedRecords._size;
+  }
+
+  /// Only used when creating SortedRecordTable from unique, sorted segment 
groupby results
+  @Override
+  public boolean upsert(Record record) {
+    if (_nextIdx == _resultSize) {
+      // enough records
+      return false;
+    }
+    _records[_nextIdx++] = record;
+    return true;
+  }
+
+  @Override
+  public boolean upsert(Key key, Record record) {
+    throw new UnsupportedOperationException("method unused for 
SortedRecordTable");
+  }
+
+  /// Merge a segment result into self, saving an allocation of 
SortedRecordTable
+  public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock 
block) {

Review Comment:
   (minor) This can return `void` given it is always merging into the current 
table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to