Jackie-Jiang commented on code in PR #16308: URL: https://github.com/apache/pinot/pull/16308#discussion_r2264093177
########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Util class used for merging of sorted group-by aggregation + */ +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + protected final int _resultSize; Review Comment: Any specific reason for the `protected` fields? Do you plan to extend this class? ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java: ########## @@ -132,6 +132,11 @@ public class QueryContext { private int _numThreadsExtractFinalResult = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT; // Parallel chunk size for final reduce private int _chunkSizeExtractFinalResult = InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT; + // Threshold to use sort aggregate for safeTrim case when LIMIT is below this + private int _sortAggregateLimitThreshold = Server.DEFAULT_SORT_AGGREGATE_LIMIT_THRESHOLD; + // Threshold of number of segments to combine to use single-threaded sequential combine instead pair-wise + // This is defaulted to number of available cores + private int _sortAggregateSingleThreadedNumSegmentsThreshold = Runtime.getRuntime().availableProcessors(); Review Comment: Let's cache this value in `CommonConstants` to avoid reading runtime on a per query basis ########## pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java: ########## @@ -58,6 +63,109 @@ public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> return getComparator(orderByExpressions, nullHandlingEnabled, 0, orderByExpressions.size()); } + /** + * get orderBy expressions on the groupBy keys when orderBy keys match groupBy keys Review Comment: (minor, convention) We usually capitalize the javadoc. Same for other places ########## pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java: ########## @@ -314,7 +317,8 @@ public AggregationFunction[] getAggregationFunctions() { } /** - * Returns the filtered aggregation functions for a query, or {@code null} if the query does not have any aggregation. + * Returns the filtered aggregation functions for a query, or {@code null} if the query does not have any + * aggregation. Review Comment: (format) Seems the format is not setup correctly. Also please disable the auto-format for javadoc and comments, and revert the unnecessary reformat in this PR ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Util class used for merging of sorted group-by aggregation Review Comment: This is not really a util class ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java: ########## @@ -34,4 +34,8 @@ public class IntermediateRecord { _record = record; _values = values; } + + static IntermediateRecord createForTest(Key key, Record record, Comparable[] values) { Review Comment: Not needed ########## pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java: ########## @@ -58,6 +63,109 @@ public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> return getComparator(orderByExpressions, nullHandlingEnabled, 0, orderByExpressions.size()); } + /** + * get orderBy expressions on the groupBy keys when orderBy keys match groupBy keys + */ + public static Comparator<Record> getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions, + List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) { + List<OrderByExpressionWithIndex> groupKeyOrderByExpressions = + getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, groupByExpressions); + Comparator<Object[]> valueComparator = getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled); + return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues()); + } + + private static Map<String, Integer> getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) { + Map<String, Integer> groupByExpressionIndexMap = new HashMap<>(); + int numGroupByExpressions = groupByExpressions.size(); + for (int i = 0; i < numGroupByExpressions; i++) { + groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), i); + } + return groupByExpressionIndexMap; + } + + /** + * orderby expression with an index with respect to its position in the group keys + */ + public static class OrderByExpressionWithIndex { + OrderByExpressionContext _orderByExpressionContext; + Integer _index; Review Comment: Should they be `final`? ########## pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java: ########## @@ -389,10 +389,11 @@ public void testCovarianceAggregationGroupBy() { GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, NUM_RECORDS * 2, NUM_RECORDS); - AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + // TODO: change all aggregationGroupByResult to intermediateRecord + List<IntermediateRecord> aggregationGroupByResult = resultsBlock.getIntermediateRecords(); Review Comment: (minor) Rename the argument to resultRecords, same for other places ########## pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java: ########## @@ -212,4 +217,31 @@ private static IndexedTable getTrimEnabledIndexedTable(DataSchema dataSchema, bo initialCapacity, executorService); } } + + /** + * do sort aggregate when is safeTrim (order by group keys with no having clause) + * and limit is smaller than threshold + * TODO: we also want to do sort aggregate under order by group key with having case, + * in this case we can check if the calculated Server trimSize is < sortAggregateLimitThreshold + * if so, we do sort aggregate and trim to trimSize during combine. + * This requires extracting Server trimSize calculation logic into QueryContext as pre-req + */ + public static boolean shouldSortAggregateUnderSafeTrim(QueryContext queryContext) { Review Comment: We can also move this into `QueryContext` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/FilteredGroupByOperator.java: ########## @@ -206,6 +210,20 @@ protected GroupByResultsBlock getNextBlock() { resultsBlock.setNumGroupsWarningLimitReached(numGroupsWarningLimitReached); return resultsBlock; } + if (GroupByUtils.shouldSortAggregateUnderSafeTrim(_queryContext)) { Review Comment: (minor) Make the code consistent with `GroupByOperator`. This way it is easier to extract common code in the future. We may also add a TODO to extract common code ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + + private final AtomicReference<SortedRecordTable> _waitingTable; + private final AtomicReference<SortedRecordTable> _satisfiedTable; + private final Comparator<Record> _recordKeyComparator; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext)); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingTable = new AtomicReference<>(); + _satisfiedTable = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + // short-circuit one segment case + if (_numOperators == 1) { + _satisfiedTable.set( + GroupByUtils.getAndPopulateSortedRecordTable(resultsBlock, _queryContext, + _queryContext.getLimit(), _executorService, _numOperators, _recordKeyComparator) + ); + break; + } + // save one call to getAndPopulateLinkedHashMapIndexedTable + // by merging the current block in if there is a waitingTable + SortedRecordTable waitingTable = _waitingTable.getAndUpdate(v -> v == null Review Comment: Alternatively, we can also make a class to help sort records (or intermediate records), then in the end construct a table with the final records ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + + private final AtomicReference<SortedRecordTable> _waitingTable; + private final AtomicReference<SortedRecordTable> _satisfiedTable; + private final Comparator<Record> _recordKeyComparator; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext)); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingTable = new AtomicReference<>(); + _satisfiedTable = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + // short-circuit one segment case + if (_numOperators == 1) { + _satisfiedTable.set( + GroupByUtils.getAndPopulateSortedRecordTable(resultsBlock, _queryContext, + _queryContext.getLimit(), _executorService, _numOperators, _recordKeyComparator) + ); + break; + } + // save one call to getAndPopulateLinkedHashMapIndexedTable + // by merging the current block in if there is a waitingTable + SortedRecordTable waitingTable = _waitingTable.getAndUpdate(v -> v == null Review Comment: (MAJOR) We should not do expensive operations here as the same operation might be applied multiple times. The conversion from `GroupByResultsBlock` to `SortedRecordTable` should happen in a single thread context ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java: ########## @@ -123,7 +123,7 @@ protected void updateExistingRecord(Key key, Record newRecord) { _lookupMap.computeIfPresent(key, (k, v) -> updateRecord(v, newRecord)); } - private Record updateRecord(Record existingRecord, Record newRecord) { + protected Record updateRecord(Record existingRecord, Record newRecord) { Review Comment: (minor) Seems not needed ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Util class used for merging of sorted group-by aggregation + */ +public class SortedRecordTable extends BaseTable { + private final ExecutorService _executorService; + protected final int _resultSize; + protected final int _numKeyColumns; + protected final AggregationFunction[] _aggregationFunctions; + + protected Record[] _topRecords; + + private Record[] _records; + private int _numMergedBlocks; + private final int _desiredNumMergedBlocks; + private int _nextIdx; + private final Comparator<Record> _comparator; + protected final int _numThreadsExtractFinalResult; + protected final int _chunkSizeExtractFinalResult; + + public SortedRecordTable(DataSchema dataSchema, QueryContext queryContext, int resultSize, Review Comment: (optional, can be a followup) Seems quite some code are duplicated in this class and `IndexedTable`. We can consider extracting the common part into `BaseTable` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java: ########## @@ -225,13 +225,16 @@ public void testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe() ResultSetGroup result = conn.execute(options + query); assertTrimFlagSet(result); - assertEquals(toResultStr(result), - "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" - + "77,\t377,\t4\n" - + "66,\t566,\t4\n" - + "39,\t339,\t4\n" - + "96,\t396,\t4\n" - + "25,\t25,\t4"); + String[] lines = toResultStr(result).split("\n"); Review Comment: Why do we need to change this? Is this query affected? ########## pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java: ########## @@ -212,4 +217,31 @@ private static IndexedTable getTrimEnabledIndexedTable(DataSchema dataSchema, bo initialCapacity, executorService); } } + + /** + * do sort aggregate when is safeTrim (order by group keys with no having clause) + * and limit is smaller than threshold + * TODO: we also want to do sort aggregate under order by group key with having case, + * in this case we can check if the calculated Server trimSize is < sortAggregateLimitThreshold + * if so, we do sort aggregate and trim to trimSize during combine. + * This requires extracting Server trimSize calculation logic into QueryContext as pre-req + */ + public static boolean shouldSortAggregateUnderSafeTrim(QueryContext queryContext) { + return !queryContext.isUnsafeTrim() && queryContext.getLimit() < queryContext.getSortAggregateLimitThreshold(); + } + + public static SortedRecordTable getAndPopulateSortedRecordTable(GroupByResultsBlock block, + QueryContext queryContext, int resultSize, + ExecutorService executorService, int desiredNumMergedBlocks, Comparator<Record> recordKeyComaparator) { Review Comment: (typo) ```suggestion ExecutorService executorService, int desiredNumMergedBlocks, Comparator<Record> recordKeyComparator) { ``` ########## pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java: ########## @@ -58,6 +63,109 @@ public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> return getComparator(orderByExpressions, nullHandlingEnabled, 0, orderByExpressions.size()); } + /** + * get orderBy expressions on the groupBy keys when orderBy keys match groupBy keys + */ + public static Comparator<Record> getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions, + List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) { + List<OrderByExpressionWithIndex> groupKeyOrderByExpressions = + getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, groupByExpressions); + Comparator<Object[]> valueComparator = getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled); + return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues()); + } + + private static Map<String, Integer> getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) { + Map<String, Integer> groupByExpressionIndexMap = new HashMap<>(); + int numGroupByExpressions = groupByExpressions.size(); + for (int i = 0; i < numGroupByExpressions; i++) { + groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), i); + } + return groupByExpressionIndexMap; + } + + /** + * orderby expression with an index with respect to its position in the group keys + */ + public static class OrderByExpressionWithIndex { Review Comment: Should this be public accessible or just a inner helper class? ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/SortedRecordTable.java: ########## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.util.QueryMultiThreadingUtils; +import org.apache.pinot.core.util.trace.TraceCallable; + + +/** + * Util class used for merging of sorted group-by aggregation + */ +public class SortedRecordTable extends BaseTable { Review Comment: Annotate it to `@SuppressWarnings({"rawtypes", "unchecked"})`. You may also follow similar checks (e.g. `assert`) in `IndexedTable` to address warnings from IDE ########## pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java: ########## @@ -58,6 +63,109 @@ public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> return getComparator(orderByExpressions, nullHandlingEnabled, 0, orderByExpressions.size()); } + /** + * get orderBy expressions on the groupBy keys when orderBy keys match groupBy keys + */ + public static Comparator<Record> getRecordKeyComparator(List<OrderByExpressionContext> orderByExpressions, + List<ExpressionContext> groupByExpressions, boolean nullHandlingEnabled) { + List<OrderByExpressionWithIndex> groupKeyOrderByExpressions = + getGroupKeyOrderByExpressionFromRowOrderByExpressions(orderByExpressions, groupByExpressions); + Comparator<Object[]> valueComparator = getComparatorWithIndex(groupKeyOrderByExpressions, nullHandlingEnabled); + return (k1, k2) -> valueComparator.compare(k1.getValues(), k2.getValues()); + } + + private static Map<String, Integer> getGroupByExpressionIndexMap(List<ExpressionContext> groupByExpressions) { + Map<String, Integer> groupByExpressionIndexMap = new HashMap<>(); + int numGroupByExpressions = groupByExpressions.size(); + for (int i = 0; i < numGroupByExpressions; i++) { + groupByExpressionIndexMap.put(groupByExpressions.get(i).getIdentifier(), i); + } + return groupByExpressionIndexMap; + } + + /** + * orderby expression with an index with respect to its position in the group keys + */ + public static class OrderByExpressionWithIndex { + OrderByExpressionContext _orderByExpressionContext; + Integer _index; Review Comment: Should `_index` be primitive type? I don't think we can tolerate `null` here ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java: ########## @@ -225,13 +225,16 @@ public void testMSQEGroupsTrimmedAtSegmentLevelWithOrderByOnAggregateIsNotSafe() ResultSetGroup result = conn.execute(options + query); assertTrimFlagSet(result); - assertEquals(toResultStr(result), - "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]\n" - + "77,\t377,\t4\n" - + "66,\t566,\t4\n" - + "39,\t339,\t4\n" - + "96,\t396,\t4\n" - + "25,\t25,\t4"); + String[] lines = toResultStr(result).split("\n"); + + // Assert the header exactly + assertEquals(lines[0], "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"EXPR$2\"[\"LONG\"]"); + // Assert col3 of all data rows is 4 + for (int i = 1; i < lines.length; i++) { + String[] cols = lines[i].split("\t"); + assertEquals("4", cols[2]); Review Comment: First argument should be `actual` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. Review Comment: Please add some javadoc about the algorithm ########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + + private final AtomicReference<SortedRecordTable> _waitingTable; + private final AtomicReference<SortedRecordTable> _satisfiedTable; + private final Comparator<Record> _recordKeyComparator; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext)); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingTable = new AtomicReference<>(); + _satisfiedTable = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + // short-circuit one segment case + if (_numOperators == 1) { Review Comment: We probably will never hit this given we have sequential combine. Let's make sure single operator case is handled by sequential combine ########## pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java: ########## @@ -347,7 +368,16 @@ public List<IntermediateRecord> trimInSegmentResults(GroupKeyGenerator groupKeyG } } - return Arrays.asList(heap); + for (int i = heap.length; i > 0; i--) { Review Comment: (MAJOR) This can cause overhead for other scenarios where sort is not required. Let's add a boolean to control whether to sort the result -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
