Jackie-Jiang commented on a change in pull request #7916:
URL: https://github.com/apache/pinot/pull/7916#discussion_r788376924
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java
##########
@@ -41,6 +41,7 @@ public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2,
Set<String> projectionC
for (String projectionColumn : projectionColumns) {
_dataSourceMap.put(projectionColumn,
starTreeV2.getDataSource(projectionColumn));
}
+
Review comment:
Let's revert this file since it is not relevant
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -62,57 +68,15 @@ public AggregationPlanNode(IndexSegment indexSegment,
QueryContext queryContext)
public Operator<IntermediateResultsBlock> run() {
assert _queryContext.getAggregationFunctions() != null;
- int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
- AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
-
- FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
_queryContext);
- BaseFilterOperator filterOperator = filterPlanNode.run();
-
- // Use metadata/dictionary to solve the query if possible
- // TODO: Use the same operator for both of them so that COUNT(*), MAX(col)
can be optimized
- if (filterOperator.isResultMatchingAll()) {
- if (isFitForMetadataBasedPlan(aggregationFunctions)) {
- return new MetadataBasedAggregationOperator(aggregationFunctions,
_indexSegment.getSegmentMetadata(),
- Collections.emptyMap());
- } else if (isFitForDictionaryBasedPlan(aggregationFunctions,
_indexSegment)) {
- Map<String, Dictionary> dictionaryMap = new HashMap<>();
- for (AggregationFunction aggregationFunction : aggregationFunctions) {
- String column = ((ExpressionContext)
aggregationFunction.getInputExpressions().get(0)).getIdentifier();
- dictionaryMap.computeIfAbsent(column, k ->
_indexSegment.getDataSource(k).getDictionary());
- }
- return new DictionaryBasedAggregationOperator(aggregationFunctions,
dictionaryMap, numTotalDocs);
- }
- }
-
- // Use star-tree to solve the query if possible
- List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
- if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext))
{
- AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
- StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
- if (aggregationFunctionColumnPairs != null) {
- Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
- StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter(),
- filterPlanNode.getPredicateEvaluatorMap());
- if (predicateEvaluatorsMap != null) {
- for (StarTreeV2 starTreeV2 : starTrees) {
- if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(),
aggregationFunctionColumnPairs, null,
- predicateEvaluatorsMap.keySet())) {
- TransformOperator transformOperator =
- new StarTreeTransformPlanNode(starTreeV2,
aggregationFunctionColumnPairs, null,
- predicateEvaluatorsMap,
_queryContext.getDebugOptions()).run();
- return new AggregationOperator(aggregationFunctions,
transformOperator, numTotalDocs, true);
- }
- }
- }
- }
+ boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+ BaseOperator<IntermediateResultsBlock> aggOperator;
+ if (hasFilteredPredicates) {
+ aggOperator = buildFilteredAggOperator();
Review comment:
(minor) this part can be more concise by directly return instead of
putting the operator in an local variable
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -119,11 +122,13 @@ private QueryContext(String tableName,
List<ExpressionContext> selectExpressions
@Nullable FilterContext filter, @Nullable List<ExpressionContext>
groupByExpressions,
@Nullable FilterContext havingFilter, @Nullable
List<OrderByExpressionContext> orderByExpressions, int limit,
int offset, Map<String, String> queryOptions, @Nullable Map<String,
String> debugOptions,
- BrokerRequest brokerRequest) {
+ BrokerRequest brokerRequest, boolean hasFilteredAggregations,
Review comment:
`hasFilteredAggregations` should not be set through the constructor. It
is updated in `generateAggregationFunctions()`
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -350,6 +381,7 @@ public String toString() {
private List<ExpressionContext> _selectExpressions;
private List<String> _aliasList;
private FilterContext _filter;
+ private ExpressionContext _filterExpression;
Review comment:
The change in the `Builder` is not necessary
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -144,6 +146,7 @@ private static QueryContext convertSQL(BrokerRequest
brokerRequest) {
return new
QueryContext.Builder().setTableName(pinotQuery.getDataSource().getTableName())
.setSelectExpressions(selectExpressions).setAliasList(aliasList).setFilter(filter)
+ .setFilterExpression(filterExpressionContext)
Review comment:
This seems not used in `QueryContext`, and this file can be reverted
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -74,6 +74,7 @@
private final List<ExpressionContext> _selectExpressions;
private final List<String> _aliasList;
private final FilterContext _filter;
+ private final ExpressionContext _filterExpression;
Review comment:
This field is not used, let's remove it. Filter shouldn't be stored as
`ExpressionContext`
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -154,4 +118,181 @@ private static boolean
isFitForDictionaryBasedPlan(AggregationFunction[] aggrega
}
return true;
}
+
+ /**
+ * Build a FilteredAggregationOperator given the parameters.
+ * @param mainPredicateFilterOperator Filter operator corresponding to the
main predicate
+ * @param mainTransformOperator Transform operator corresponding to the main
predicate
+ * @param numTotalDocs Number of total docs
+ */
+ private BaseOperator<IntermediateResultsBlock>
buildOperatorForFilteredAggregations(
+ BaseFilterOperator mainPredicateFilterOperator,
+ TransformOperator mainTransformOperator, int numTotalDocs) {
+ Map<FilterContext, Pair<List<AggregationFunction>, TransformOperator>>
filterContextToAggFuncsMap =
+ new HashMap<>();
+ List<AggregationFunction> nonFilteredAggregationFunctions = new
ArrayList<>();
+ List<Pair<FilterContext, AggregationFunction>> aggregationFunctions =
_queryContext
+ .getAggregationsWithFilters();
+
+ // For each aggregation function, check if the aggregation function is a
filtered agg.
+ // If it is, populate the corresponding filter operator and corresponding
transform operator
+ for (Pair<FilterContext, AggregationFunction> inputPair :
aggregationFunctions) {
+ if (inputPair.getLeft() != null) {
+ FilterContext currentFilterExpression = inputPair.getLeft();
+ if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
+
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getRight());
+ continue;
+ }
+ Pair<FilterPlanNode, BaseFilterOperator> pair =
+ buildFilterOperator(currentFilterExpression);
+ BaseFilterOperator wrappedFilterOperator = new
CombinedFilterOperator(mainPredicateFilterOperator,
+ pair.getRight());
+ Pair<TransformOperator,
+ BaseOperator<IntermediateResultsBlock>> innerPair =
+ buildOperators(wrappedFilterOperator, pair.getLeft());
+ // For each transform operator, associate it with the underlying
expression. This allows
+ // fetching the relevant TransformOperator when resolving blocks
during aggregation
+ // execution
+ List aggFunctionList = new ArrayList<>();
+ aggFunctionList.add(inputPair.getRight());
+ filterContextToAggFuncsMap.put(currentFilterExpression,
+ Pair.of(aggFunctionList, innerPair.getLeft()));
+ } else {
+ nonFilteredAggregationFunctions.add(inputPair.getRight());
+ }
+ }
+ List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
+ new ArrayList<>();
+ // Convert to array since FilteredAggregationOperator expects it
+ for (Pair<List<AggregationFunction>, TransformOperator> pair
+ : filterContextToAggFuncsMap.values()) {
+ List<AggregationFunction> aggregationFunctionList = pair.getLeft();
+ if (aggregationFunctionList == null) {
+ throw new IllegalStateException("Null aggregation list seen");
+ }
+ aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new
AggregationFunction[0]),
+ pair.getRight()));
+ }
+
aggToTransformOpList.add(Pair.of(nonFilteredAggregationFunctions.toArray(new
AggregationFunction[0]),
+ mainTransformOperator));
+
+ return new
FilteredAggregationOperator(_queryContext.getAggregationFunctions(),
aggToTransformOpList,
+ numTotalDocs);
+ }
+
+ /**
+ * Build a filter operator from the given FilterContext.
+ *
+ * It returns the FilterPlanNode to allow reusing plan level components such
as predicate
+ * evaluator map
+ */
+ private Pair<FilterPlanNode, BaseFilterOperator>
buildFilterOperator(FilterContext filterContext) {
+ FilterPlanNode filterPlanNode = new FilterPlanNode(_indexSegment,
_queryContext, filterContext);
+
+ return Pair.of(filterPlanNode, filterPlanNode.run());
+ }
+
+ /**
+ * Build transform and aggregation operators for the given bottom level plan
+ * @param filterOperator Filter operator to be used in the corresponding
chain
+ * @param filterPlanNode Plan node associated with the filter operator
+ * @return Pair, consisting of the built TransformOperator and Aggregation
operator for chain
+ */
+ private Pair<TransformOperator,
+ BaseOperator<IntermediateResultsBlock>>
buildOperators(BaseFilterOperator filterOperator,
+ FilterPlanNode filterPlanNode) {
+ assert _queryContext.getAggregationFunctions() != null;
+
+ int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+ AggregationFunction[] aggregationFunctions =
_queryContext.getAggregationFunctions();
+
+ Set<ExpressionContext> expressionsToTransform =
+
AggregationFunctionUtils.collectExpressionsToTransform(aggregationFunctions,
null);
+ boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
+
+ List<StarTreeV2> starTrees = _indexSegment.getStarTrees();
+
+ // Use metadata/dictionary to solve the query if possible
+ // TODO: Use the same operator for both of them so that COUNT(*), MAX(col)
can be optimized
+ if (filterOperator.isResultMatchingAll() && !hasFilteredPredicates) {
+ if (isFitForMetadataBasedPlan(aggregationFunctions)) {
+ return Pair.of(null, new
MetadataBasedAggregationOperator(aggregationFunctions,
+ _indexSegment.getSegmentMetadata(), Collections.emptyMap()));
+ } else if (isFitForDictionaryBasedPlan(aggregationFunctions,
_indexSegment)) {
+ Map<String, Dictionary> dictionaryMap = new HashMap<>();
+ for (AggregationFunction aggregationFunction : aggregationFunctions) {
+ String column = ((ExpressionContext)
aggregationFunction.getInputExpressions().get(0)).getIdentifier();
+ dictionaryMap.computeIfAbsent(column, k ->
_indexSegment.getDataSource(k).getDictionary());
+ }
+ return Pair.of(null,
+ new DictionaryBasedAggregationOperator(aggregationFunctions,
dictionaryMap,
+ numTotalDocs));
+ }
+ }
+
+ if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(_queryContext))
{
+ // Use star-tree to solve the query if possible
+
+ AggregationFunctionColumnPair[] aggregationFunctionColumnPairs =
+ StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions);
+ if (aggregationFunctionColumnPairs != null) {
+ Map<String, List<CompositePredicateEvaluator>> predicateEvaluatorsMap =
+ StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment,
_queryContext.getFilter(),
+ filterPlanNode.getPredicateEvaluatorMap());
+ if (predicateEvaluatorsMap != null) {
+ for (StarTreeV2 starTreeV2 : starTrees) {
+ if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(),
aggregationFunctionColumnPairs, null,
+ predicateEvaluatorsMap.keySet())) {
+
+ TransformOperator transformOperator = new
StarTreeTransformPlanNode(starTreeV2,
+ aggregationFunctionColumnPairs, null,
+ predicateEvaluatorsMap,
_queryContext.getDebugOptions()).run();
+ AggregationOperator aggregationOperator = new
AggregationOperator(aggregationFunctions,
+ transformOperator, numTotalDocs, true);
+
+ return Pair.of(transformOperator, aggregationOperator);
+ }
+ }
+ }
+ }
+ }
+
+ TransformOperator transformOperator = new TransformPlanNode(_indexSegment,
_queryContext,
+ expressionsToTransform, DocIdSetPlanNode.MAX_DOC_PER_CALL,
filterOperator).run();
+ AggregationOperator aggregationOperator = new
AggregationOperator(aggregationFunctions,
+ transformOperator, numTotalDocs, false);
+
+ return Pair.of(transformOperator, aggregationOperator);
+ }
+
+ /**
+ * Builds the operator to be used for non filtered aggregations
+ */
+ private BaseOperator<IntermediateResultsBlock> buildNonFilteredAggOperator()
{
Review comment:
What I meant is to move the current code into this method, and implement
`buildFilteredAggOperator()` separately. The reason being:
1. The metadata/dictionary based operator and star-tree does not apply to
the filtered aggregation
2. Sharing `buildOperators()` method can bring extra overhead to
non-filtered aggregations
##########
File path:
pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
##########
@@ -663,4 +649,18 @@ public void testPqlAndSqlCompatible()
assertNull(sqlReader.readLine());
}
}
+
Review comment:
Seems the only change is moving the test, can we revert it?
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
##########
@@ -57,6 +57,7 @@ public StarTreeTransformPlanNode(StarTreeV2 starTreeV2,
_groupByExpressions = Collections.emptyList();
groupByColumns = null;
}
+
Review comment:
Let's revert this file since it is not relevant
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -90,9 +91,11 @@
// Pre-calculate the aggregation functions and columns for the query so that
it can be shared across all the segments
private AggregationFunction[] _aggregationFunctions;
- private List<Pair<AggregationFunction, FilterContext>>
_filteredAggregationFunctions;
- // TODO: Use Pair<FunctionContext, FilterContext> as key to support filtered
aggregations in order-by and post
- // aggregation
+
+ private List<Pair<FilterContext, AggregationFunction>>
_aggregationFunctionsWithMetadata;
Review comment:
Let's rename it to `_filteredAggregations` (it has nothing to do with
metadata)? Also suggest putting `AggregationFunction` as the first argument of
the pair to be consistent with other fields
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -441,76 +480,106 @@ public QueryContext build() {
*/
private void generateAggregationFunctions(QueryContext queryContext) {
List<AggregationFunction> aggregationFunctions = new ArrayList<>();
- List<Pair<AggregationFunction, FilterContext>>
filteredAggregationFunctions = new ArrayList<>();
+ List<Pair<FilterContext, AggregationFunction>>
aggregationFunctionsWithMetadata = new ArrayList<>();
Map<FunctionContext, Integer> aggregationFunctionIndexMap = new
HashMap<>();
+ Map<Pair<FunctionContext, FilterContext>, Integer>
filterExpressionIndexMap = new HashMap<>();
// Add aggregation functions in the SELECT clause
// NOTE: DO NOT deduplicate the aggregation functions in the SELECT
clause because that involves protocol change.
- List<FunctionContext> aggregationsInSelect = new ArrayList<>();
- List<Pair<FunctionContext, FilterContext>> filteredAggregations = new
ArrayList<>();
+ List<Pair<Pair<FilterContext, ExpressionContext>, FunctionContext>>
aggregationsInSelect = new ArrayList<>();
Review comment:
I don't think we need to keep `ExpressionContext` here.
`List<FunctionContext, FilterContext>` should be enough for the following
computations
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -441,76 +480,106 @@ public QueryContext build() {
*/
private void generateAggregationFunctions(QueryContext queryContext) {
List<AggregationFunction> aggregationFunctions = new ArrayList<>();
- List<Pair<AggregationFunction, FilterContext>>
filteredAggregationFunctions = new ArrayList<>();
+ List<Pair<FilterContext, AggregationFunction>>
aggregationFunctionsWithMetadata = new ArrayList<>();
Review comment:
Please rename the variable
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java
##########
@@ -154,4 +129,168 @@ private static boolean
isFitForDictionaryBasedPlan(AggregationFunction[] aggrega
}
return true;
}
+
+ /**
+ * Build a FilteredAggregationOperator given the parameters.
+ * @param mainPredicateFilterOperator Filter operator corresponding to the
main predicate
+ * @param mainTransformOperator Transform operator corresponding to the main
predicate
+ * @param aggregationFunctions Aggregation functions in the query
+ * @param numTotalDocs Number of total docs
+ */
+ private BaseOperator<IntermediateResultsBlock>
buildOperatorForFilteredAggregations(
+ BaseFilterOperator mainPredicateFilterOperator,
Review comment:
The format still doesn't align with the pinot style
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]