Copilot commented on code in PR #19390:
URL: https://github.com/apache/druid/pull/19390#discussion_r3175945532
##########
processing/src/test/java/org/apache/druid/query/aggregation/DoubleSumAggregatorTest.java:
##########
@@ -73,4 +78,103 @@ public void testComparator()
Assertions.assertEquals(0, comp.compare(agg.get(), agg.get()));
Assertions.assertEquals(1, comp.compare(agg.get(), first));
}
+
+ @Test
+ public void testUsesNullableBufferAggregatorWhenInputHasNoNulls()
+ {
+ ColumnSelectorFactory selectorFactory =
EasyMock.createMock(ColumnSelectorFactory.class);
+ TestDoubleColumnSelectorImpl selector = new
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+ ColumnCapabilitiesImpl capabilities =
+
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+ EasyMock.replay(selectorFactory);
+
+ BufferAggregator aggregator = new DoubleSumAggregatorFactory("sum",
"metric").factorizeBuffered(selectorFactory);
+
+ Assertions.assertTrue(aggregator instanceof
NullableNumericBufferAggregator);
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Byte.BYTES);
+ aggregator.init(buffer, 0);
+ Assertions.assertNull(aggregator.get(buffer, 0));
+ aggregator.aggregate(buffer, 0);
+ Assertions.assertEquals(1.0d, aggregator.getDouble(buffer, 0), 0.0d);
+ EasyMock.verify(selectorFactory);
+ }
+
+ @Test
+ public void testUsesNullableAggregatorWhenInputHasNoNulls()
+ {
+ ColumnSelectorFactory selectorFactory =
EasyMock.createMock(ColumnSelectorFactory.class);
+ TestDoubleColumnSelectorImpl selector = new
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+ ColumnCapabilitiesImpl capabilities =
+
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+ EasyMock.replay(selectorFactory);
+
+ Aggregator aggregator = new DoubleSumAggregatorFactory("sum",
"metric").factorize(selectorFactory);
+
+ Assertions.assertTrue(aggregator instanceof NullableNumericAggregator);
+ Assertions.assertNull(aggregator.get());
+ aggregator.aggregate();
+ Assertions.assertEquals(1.0d, aggregator.getDouble(), 0.0d);
+ EasyMock.verify(selectorFactory);
+ }
+
+ @Test
+ public void testPooledTopNSkipsNullableBufferAggregatorWhenInputHasNoNulls()
+ {
+ ColumnSelectorFactory selectorFactory =
EasyMock.createMock(ColumnSelectorFactory.class);
+ TestDoubleColumnSelectorImpl selector = new
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+ ColumnCapabilitiesImpl capabilities =
+
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+ DoubleSumAggregatorFactory factory = new DoubleSumAggregatorFactory("sum",
"metric");
+
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(3);
+ EasyMock.replay(selectorFactory);
+
+ BufferAggregator aggregator =
factory.factorizeBufferedForPooledTopN(selectorFactory);
+
+ Assertions.assertTrue(aggregator instanceof DoubleSumBufferAggregator);
+ Assertions.assertEquals(Double.BYTES,
factory.getMaxIntermediateSizeWithNullsForPooledTopN(selectorFactory));
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+ aggregator.init(buffer, 0);
+ aggregator.aggregate(buffer, 0);
+ Assertions.assertEquals(1.0d, aggregator.getDouble(buffer, 0), 0.0d);
+ EasyMock.verify(selectorFactory);
+ }
+
+ @Test
+ public void testUsesNullableBufferAggregatorWhenInputNullsAreUnknown()
+ {
+ ColumnSelectorFactory selectorFactory =
EasyMock.createMock(ColumnSelectorFactory.class);
+ TestDoubleColumnSelectorImpl selector = new
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+ ColumnCapabilitiesImpl capabilities =
+
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
+
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+ EasyMock.replay(selectorFactory);
+
+ BufferAggregator aggregator = new DoubleSumAggregatorFactory("sum",
"metric").factorizeBuffered(selectorFactory);
+
+ Assertions.assertTrue(aggregator instanceof
NullableNumericBufferAggregator);
+ EasyMock.verify(selectorFactory);
+ }
+
+ @Test
+ public void testUsesNullableAggregatorWhenInputNullsAreUnknown()
+ {
+ ColumnSelectorFactory selectorFactory =
EasyMock.createMock(ColumnSelectorFactory.class);
+ TestDoubleColumnSelectorImpl selector = new
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+ ColumnCapabilitiesImpl capabilities =
+
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
+
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+ EasyMock.replay(selectorFactory);
+
+ Aggregator aggregator = new DoubleSumAggregatorFactory("sum",
"metric").factorize(selectorFactory);
Review Comment:
The new branch that keeps nullable state for Pooled TopN when column
nullability is unknown is still untested. These cases only exercise
`factorizeBuffered`/`factorize`, so a regression in
`factorizeBufferedForPooledTopN` or
`getMaxIntermediateSizeWithNullsForPooledTopN` would slip through even though
that is the new behavior added in this PR.
##########
processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java:
##########
@@ -259,7 +262,10 @@ public int[] build()
int numBytesPerRecord = 0;
for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) {
- aggregatorSizes[i] =
query.getAggregatorSpecs().get(i).getMaxIntermediateSizeWithNulls();
+ aggregatorSizes[i] = getMaxIntermediateSizeWithNullsForPooledTopN(
+ cursor.getColumnSelectorFactory(),
+ query.getAggregatorSpecs().get(i)
+ );
Review Comment:
This only reduces the per-aggregator state size inside
`PooledTopNAlgorithm`. `TopNQueryEngine` still decides whether pooled TopN is
usable, and whether to enable the aggregate-metric-first heuristic, using
`AggregatorFactory.getMaxIntermediateSizeWithNulls()`. Queries that are near
the buffer-capacity cutoff or the `numBytesPerRecord > 100` heuristic will
therefore keep taking the old path, so the optimization won't apply where it's
most needed.
##########
processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java:
##########
@@ -86,12 +91,35 @@ public final VectorAggregator
factorizeVector(VectorColumnSelectorFactory column
Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot
vectorize");
VectorValueSelector selector = vectorSelector(columnSelectorFactory);
VectorAggregator aggregator = factorizeVector(columnSelectorFactory,
selector);
- if (this.forceNotNullable()) {
+ if (forceNotNullable()) {
return aggregator;
}
return new NullableNumericVectorAggregator(aggregator, selector);
}
+ /**
+ * Factorizes a buffer aggregator for Pooled TopN. Unlike general
aggregation engines, Pooled TopN creates aggregate
+ * state only after a dimension value is seen. If the input column is known
to be numeric and non-null, the nullable
+ * wrapper cannot affect the result, so TopN may skip it to keep hot-loop
specialization effective.
+ */
+ public final BufferAggregator
factorizeBufferedForPooledTopN(ColumnSelectorFactory columnSelectorFactory)
+ {
+ T selector = selector(columnSelectorFactory);
+ BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory,
selector);
+ if (!useNullableNumericAggregatorsForPooledTopN(columnSelectorFactory)) {
+ return aggregator;
+ }
+ return new NullableNumericBufferAggregator(aggregator,
makeNullSelector(selector, columnSelectorFactory));
+ }
+
+ public final int
getMaxIntermediateSizeWithNullsForPooledTopN(ColumnInspector columnInspector)
+ {
+ if (!useNullableNumericAggregatorsForPooledTopN(columnInspector)) {
+ return getMaxIntermediateSize();
+ }
+ return getMaxIntermediateSizeWithNulls();
Review Comment:
The new no-null optimization is only wired into
`factorizeBufferedForPooledTopN` /
`getMaxIntermediateSizeWithNullsForPooledTopN`. The regular `factorize` and
`factorizeBuffered` paths still always wrap in `NullableNumeric*Aggregator`, so
this change does not actually implement the general non-vectorized
nullable-aggregator optimization described in the PR summary or address
non-TopN scans that use the standard factorization paths.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]