songwdfu commented on code in PR #16308: URL: https://github.com/apache/pinot/pull/16308#discussion_r2264559649
########## pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java: ########## @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.combine; + +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.data.table.Record; +import org.apache.pinot.core.data.table.SortedRecordTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock; +import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.core.query.utils.OrderByComparatorFactory; +import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.exception.QueryErrorMessage; +import org.apache.pinot.spi.exception.QueryException; +import org.apache.pinot.spi.trace.Tracing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Combine operator for group-by queries. + */ +@SuppressWarnings("rawtypes") +public class SortedGroupByCombineOperator extends BaseSingleBlockCombineOperator<GroupByResultsBlock> { + + private static final Logger LOGGER = LoggerFactory.getLogger(SortedGroupByCombineOperator.class); + private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY"; + + // We use a CountDownLatch to track if all Futures are finished by the query timeout, and cancel the unfinished + // _futures (try to interrupt the execution if it already started). + private final CountDownLatch _operatorLatch; + + private volatile boolean _groupsTrimmed; + private volatile boolean _numGroupsLimitReached; + private volatile boolean _numGroupsWarningLimitReached; + + private final AtomicReference<SortedRecordTable> _waitingTable; + private final AtomicReference<SortedRecordTable> _satisfiedTable; + private final Comparator<Record> _recordKeyComparator; + + public SortedGroupByCombineOperator(List<Operator> operators, QueryContext queryContext, + ExecutorService executorService) { + super(null, operators, overrideMaxExecutionThreads(queryContext, operators.size()), executorService); + + assert (GroupByUtils.shouldSortAggregateUnderSafeTrim(queryContext)); + _operatorLatch = new CountDownLatch(_numTasks); + _waitingTable = new AtomicReference<>(); + _satisfiedTable = new AtomicReference<>(); + _recordKeyComparator = OrderByComparatorFactory.getRecordKeyComparator(queryContext.getOrderByExpressions(), + queryContext.getGroupByExpressions(), queryContext.isNullHandlingEnabled()); + } + + /** + * For group-by queries, when maxExecutionThreads is not explicitly configured, override it to create as many tasks as + * the default number of query worker threads (or the number of operators / segments if that's lower). + */ + private static QueryContext overrideMaxExecutionThreads(QueryContext queryContext, int numOperators) { + int maxExecutionThreads = queryContext.getMaxExecutionThreads(); + if (maxExecutionThreads <= 0) { + queryContext.setMaxExecutionThreads(Math.min(numOperators, ResourceManager.DEFAULT_QUERY_WORKER_THREADS)); + } + return queryContext; + } + + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + /** + * Executes query on one sorted segment in a worker thread and merges the results into the sorted record table. + */ + @Override + protected void processSegments() { + int operatorId; + while (_processingException.get() == null && (operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + Operator operator = _operators.get(operatorId); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + GroupByResultsBlock resultsBlock = (GroupByResultsBlock) operator.nextBlock(); + if (resultsBlock.isGroupsTrimmed()) { + _groupsTrimmed = true; + } + // Set groups limit reached flag. + if (resultsBlock.isNumGroupsLimitReached()) { + _numGroupsLimitReached = true; + } + if (resultsBlock.isNumGroupsWarningLimitReached()) { + _numGroupsWarningLimitReached = true; + } + // short-circuit one segment case + if (_numOperators == 1) { Review Comment: > We probably will never hit this given we have sequential combine. Let's make sure single operator case is handled by sequential combine This should be reachable when we have 1 core only and we have single operator. IMHO in case that we have 1 core only, it might make more sense to use pair-wise rather than sequential combine, since the producer - consumer model in the latter does not guarantee timely consumption of produced results (depend on thread scheduling), which might build up to mem pressure? Our default behavior also accounts for this: pair-wise is used when `numOperators >= numAvailableCores`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
