Jackie-Jiang commented on code in PR #16308: URL: https://github.com/apache/pinot/pull/16308#discussion_r2274783175
########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecords.java: ########## @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +public class SortedRecords { Review Comment: Do we need this helper class? Seems `size` is always equal to `records.length`, and we can directly pass around `Record[]` ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, + ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + assert queryContext.getGroupByExpressions() != null; + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = new Record[_resultSize]; + _nextIdx = 0; + } + + public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, QueryContext queryContext, + int resultSize, ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); Review Comment: (nit) You might want to put `assert queryContext.getGroupByExpressions() != null;` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.data.table.SortedRecords; +import org.apache.pinot.core.data.table.SortedRecordsMerger; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * <p>Pair-wise Combine operator for sort-aggregation</p> + * + * <p>In this algorithm, an {@link AtomicReference} is used as a "pit" to store + * the processed {@link SortedRecordTable} to be merged.</p> + * + * <p>Each worker thread first processes a segment to a {@link SortedRecordTable}, + * then greedily take waiting tables from the pit and merge them in, until there + * is no table waiting, then the merged table is placed in the pit. The worker + * thread then proceed to process the next segment.</p> + * + * <p>When there is a table that merged together {@code _numOperators} tables, it + * is put into {@code _satisfiedTable} as the combine result.</p> + * + * <p>This pair-wise approach allows higher level of parallelism for the first rounds + * of combine, while keeping the processing in a streaming fashion without + * having to wait for all segments to be ready.</p> + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + private volatile DataSchema _dataSchema; + + private final AtomicReference<SortedRecords> _waitingRecords; + private final Comparator<Record> _recordKeyComparator; + private final SortedRecordsMerger _sortedRecordsMerger; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (queryContext.shouldSortAggregateUnderSafeTrim()); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingRecords = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + _sortedRecordsMerger = + GroupByUtils.getSortedReduceMerger(queryContext, queryContext.getLimit(), _recordKeyComparator); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + if (_dataSchema == null) { + _dataSchema = resultsBlock.getDataSchema(); + } + // short-circuit one segment case + if (_numOperators == 1) { + _waitingRecords.set(GroupByUtils.getAndPopulateSortedRecords(resultsBlock)); + break; + } + + SortedRecords records = Review Comment: This involves extra memory overhead. We can directly put `resultsBlock` in, and when getting the reference out, do a `instanceof` check and only extract the records when needed ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java: ########## @@ -324,8 +325,28 @@ private Collection<Record> getUnsortedTopRecords(Map<Key, Record> recordsMap, in * Trims the aggregation results using a heap and returns the top records. * This method is to be called from individual segment if the intermediate results need to be trimmed. */ - public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator groupKeyGenerator, + public List<IntermediateRecord> sortInSegmentResults(GroupKeyGenerator groupKeyGenerator, GroupByResultHolder[] groupByResultHolders, int size) { + // getNumKeys() does not count nulls + assert groupKeyGenerator.getNumKeys() <= size; + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupKeyGenerator.getGroupKeys(); + + // Initialize a heap with the first 'size' groups + List<IntermediateRecord> arr = new ArrayList<>(); + while (groupKeyIterator.hasNext()) { + arr.add(getIntermediateRecord(groupKeyIterator.next(), groupByResultHolders)); + } + + arr.sort(_intermediateRecordComparator); + return arr; + } + + /** + * Trims the aggregation results using a heap and returns the top records. + * This method is to be called from individual segment if the intermediate results need to be trimmed. + */ + public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator groupKeyGenerator, + GroupByResultHolder[] groupByResultHolders, int size, boolean sortedOutput) { Review Comment: (minor) ```suggestion GroupByResultHolder[] groupByResultHolders, int size, boolean sortOutput) { ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; Review Comment: Any specific reason keeping this? Seems this is always set to `_records` in `finish()` Was it made `protected` so that test can access it? I don't see usage of it from test ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, + ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + assert queryContext.getGroupByExpressions() != null; + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = new Record[_resultSize]; + _nextIdx = 0; + } + + public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, QueryContext queryContext, + int resultSize, ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = sortedRecords._records; + _nextIdx = sortedRecords._size; + } + + /// Only used when creating SortedRecordTable from unique, sorted segment groupby results + @Override + public boolean upsert(Record record) { + if (_nextIdx == _resultSize) { + // enough records + return false; + } + _records[_nextIdx++] = record; + return true; + } + + @Override + public boolean upsert(Key key, Record record) { + throw new UnsupportedOperationException("method unused for SortedRecordTable"); + } + + /// Merge a segment result into self, saving an allocation of SortedRecordTable + public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock block) { + List<IntermediateRecord> segmentRecords = block.getIntermediateRecords(); + if (segmentRecords.isEmpty() || size() == 0) { + segmentRecords.forEach(x -> upsert(x._record)); + return this; + } + mergeSegmentRecords(segmentRecords, segmentRecords.size()); + return this; + } + + private void finalizeRecordMerge(Record[] records, int newIdx) { + _records = records; + _nextIdx = newIdx; + } + + /// Merge in that._records, update _records _curIdx + private void mergeSegmentRecords(List<IntermediateRecord> records2, int mj) { Review Comment: Seems `mj` is always `records2.size()`, and we don't need to pass it ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + + private final AtomicReference<SortedRecordTable> _waitingTable; + private final AtomicReference<SortedRecordTable> _satisfiedTable; + private final Comparator<Record> _recordKeyComparator; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext)); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingTable = new AtomicReference<>(); + _satisfiedTable = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + // short-circuit one segment case + if (_numOperators == 1) { Review Comment: Consider adding the above explanation as comments in the code ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, + ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + assert queryContext.getGroupByExpressions() != null; + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = new Record[_resultSize]; + _nextIdx = 0; + } + + public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, QueryContext queryContext, + int resultSize, ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = sortedRecords._records; + _nextIdx = sortedRecords._size; + } + + /// Only used when creating SortedRecordTable from unique, sorted segment groupby results + @Override + public boolean upsert(Record record) { + if (_nextIdx == _resultSize) { + // enough records + return false; + } + _records[_nextIdx++] = record; + return true; + } + + @Override + public boolean upsert(Key key, Record record) { + throw new UnsupportedOperationException("method unused for SortedRecordTable"); + } + + /// Merge a segment result into self, saving an allocation of SortedRecordTable + public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock block) { + List<IntermediateRecord> segmentRecords = block.getIntermediateRecords(); + if (segmentRecords.isEmpty() || size() == 0) { + segmentRecords.forEach(x -> upsert(x._record)); Review Comment: We can directly setup `_records` and `_nextIdx` to reduce checks in `upsert()` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/GroupByOperator.java: ########## @@ -139,24 +138,47 @@ protected GroupByResultsBlock getNextBlock() { // TODO: Currently the groups are not trimmed if there is no ordering specified. Consider ordering on group-by // columns if no ordering is specified. int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize(); - if (trimSize > 0) { - if (groupByExecutor.getNumGroups() > trimSize) { - TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext); - Collection<IntermediateRecord> intermediateRecords = groupByExecutor.trimGroupByResult(trimSize, tableResizer); - // trim groupKeyGenerator after getting intermediateRecords - groupByExecutor.getGroupKeyGenerator().close(); - - ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED, 1); - boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag only if it's not safe - GroupByResultsBlock resultsBlock = new GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext); - resultsBlock.setGroupsTrimmed(unsafeTrim); - resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached); - resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached); - return resultsBlock; - } + boolean unsafeTrim = _queryContext.isUnsafeTrim(); + + if (trimSize == 0) { Review Comment: Same here. Why do we need this extra check? ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java: ########## @@ -324,8 +325,28 @@ private Collection<Record> getUnsortedTopRecords(Map<Key, Record> recordsMap, in * Trims the aggregation results using a heap and returns the top records. * This method is to be called from individual segment if the intermediate results need to be trimmed. */ - public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator groupKeyGenerator, + public List<IntermediateRecord> sortInSegmentResults(GroupKeyGenerator groupKeyGenerator, GroupByResultHolder[] groupByResultHolders, int size) { + // getNumKeys() does not count nulls + assert groupKeyGenerator.getNumKeys() <= size; + Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupKeyGenerator.getGroupKeys(); + + // Initialize a heap with the first 'size' groups + List<IntermediateRecord> arr = new ArrayList<>(); Review Comment: Initialize the array with expected size ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -187,28 +187,48 @@ protected GroupByResultsBlock getNextBlock() { // - There are more groups than the trim size // TODO: Currently the groups are not trimmed if there is no ordering specified. Consider ordering on group-by // columns if no ordering is specified. + // TODO: extract common logic with GroupByOperator int trimSize = _queryContext.getEffectiveSegmentGroupTrimSize(); - if (trimSize > 0) { - if (groupKeyGenerator.getNumKeys() > trimSize) { - TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext); - Collection<IntermediateRecord> intermediateRecords = - tableResizer.trimInSegmentResults(groupKeyGenerator, groupByResultHolders, trimSize); - // Release the resources used by the group key generator - groupKeyGenerator.close(); + boolean unsafeTrim = _queryContext.isUnsafeTrim(); - ServerMetrics.get().addMeteredGlobalValue(ServerMeter.AGGREGATE_TIMES_GROUPS_TRIMMED, 1); - boolean unsafeTrim = _queryContext.isUnsafeTrim(); // set trim flag only if it's not safe - GroupByResultsBlock resultsBlock = new GroupByResultsBlock(_dataSchema, intermediateRecords, _queryContext); - resultsBlock.setGroupsTrimmed(unsafeTrim); - resultsBlock.setNumGroupsLimitReached(numGroupsLimitReached); - resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached); - return resultsBlock; - } + if (trimSize == 0) { Review Comment: Is this even possible? Is this a bugfix? ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, + ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + assert queryContext.getGroupByExpressions() != null; + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = new Record[_resultSize]; + _nextIdx = 0; + } + + public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, QueryContext queryContext, + int resultSize, ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = sortedRecords._records; + _nextIdx = sortedRecords._size; + } + + /// Only used when creating SortedRecordTable from unique, sorted segment groupby results + @Override + public boolean upsert(Record record) { + if (_nextIdx == _resultSize) { + // enough records + return false; + } + _records[_nextIdx++] = record; + return true; + } + + @Override + public boolean upsert(Key key, Record record) { + throw new UnsupportedOperationException("method unused for SortedRecordTable"); + } + + /// Merge a segment result into self, saving an allocation of SortedRecordTable + public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock block) { + List<IntermediateRecord> segmentRecords = block.getIntermediateRecords(); + if (segmentRecords.isEmpty() || size() == 0) { + segmentRecords.forEach(x -> upsert(x._record)); + return this; + } + mergeSegmentRecords(segmentRecords, segmentRecords.size()); + return this; + } + + private void finalizeRecordMerge(Record[] records, int newIdx) { + _records = records; + _nextIdx = newIdx; + } + + /// Merge in that._records, update _records _curIdx + private void mergeSegmentRecords(List<IntermediateRecord> records2, int mj) { Review Comment: This is almost identical to `SortedRecordsMerger.mergeSortedRecords()`, do we need to keep both of them? Can we extract some common part? If the input type is different, one trick is to pass in a lambda to extract the `Record` from the given index ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, Review Comment: (minor) We can change this to call the other constructor to reduce duplicates ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Table used for merging of sorted group-by aggregation + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + private final int _resultSize; + private final int _numKeyColumns; + private final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _nextIdx; + private final Comparator<Record> _comparator; + private final int _numThreadsExtractFinalResult; + private final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, + ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + assert queryContext.getGroupByExpressions() != null; + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = new Record[_resultSize]; + _nextIdx = 0; + } + + public SortedRecordTable(SortedRecords sortedRecords, DataSchema dataSchema, QueryContext queryContext, + int resultSize, ExecutorService executorService, Comparator<Record> comparator) { + super(dataSchema); + _numKeyColumns = queryContext.getGroupByExpressions().size(); + _aggregationFunctions = queryContext.getAggregationFunctions(); + _executorService = executorService; + _comparator = comparator; + _resultSize = resultSize; + _numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(), + Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS)); + _chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult(); + _records = sortedRecords._records; + _nextIdx = sortedRecords._size; + } + + /// Only used when creating SortedRecordTable from unique, sorted segment groupby results + @Override + public boolean upsert(Record record) { + if (_nextIdx == _resultSize) { + // enough records + return false; + } + _records[_nextIdx++] = record; + return true; + } + + @Override + public boolean upsert(Key key, Record record) { + throw new UnsupportedOperationException("method unused for SortedRecordTable"); + } + + /// Merge a segment result into self, saving an allocation of SortedRecordTable + public SortedRecordTable mergeSortedGroupByResultBlock(GroupByResultsBlock block) { Review Comment: (minor) This can return `void` given it is always merging into the current table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
