FrankChen021 commented on code in PR #19390:
URL: https://github.com/apache/druid/pull/19390#discussion_r3176469217


##########
processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java:
##########
@@ -259,7 +262,10 @@ public int[] build()
       int numBytesPerRecord = 0;
 
       for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) {
-        aggregatorSizes[i] = 
query.getAggregatorSpecs().get(i).getMaxIntermediateSizeWithNulls();
+        aggregatorSizes[i] = getMaxIntermediateSizeWithNullsForPooledTopN(
+            cursor.getColumnSelectorFactory(),
+            query.getAggregatorSpecs().get(i)
+        );

Review Comment:
   Addressed in a7c2e5e: TopNQueryEngine now uses the same 
PooledTopNAlgorithm.getMaxIntermediateSizeWithNullsForPooledTopN calculation 
for algorithm selection and pooled buffer-fit heuristics.



##########
processing/src/test/java/org/apache/druid/query/aggregation/DoubleSumAggregatorTest.java:
##########
@@ -73,4 +78,103 @@ public void testComparator()
     Assertions.assertEquals(0, comp.compare(agg.get(), agg.get()));
     Assertions.assertEquals(1, comp.compare(agg.get(), first));
   }
+
+  @Test
+  public void testUsesNullableBufferAggregatorWhenInputHasNoNulls()
+  {
+    ColumnSelectorFactory selectorFactory = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    TestDoubleColumnSelectorImpl selector = new 
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+    ColumnCapabilitiesImpl capabilities =
+        
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+    
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+    
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+    EasyMock.replay(selectorFactory);
+
+    BufferAggregator aggregator = new DoubleSumAggregatorFactory("sum", 
"metric").factorizeBuffered(selectorFactory);
+
+    Assertions.assertTrue(aggregator instanceof 
NullableNumericBufferAggregator);
+    ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + Byte.BYTES);
+    aggregator.init(buffer, 0);
+    Assertions.assertNull(aggregator.get(buffer, 0));
+    aggregator.aggregate(buffer, 0);
+    Assertions.assertEquals(1.0d, aggregator.getDouble(buffer, 0), 0.0d);
+    EasyMock.verify(selectorFactory);
+  }
+
+  @Test
+  public void testUsesNullableAggregatorWhenInputHasNoNulls()
+  {
+    ColumnSelectorFactory selectorFactory = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    TestDoubleColumnSelectorImpl selector = new 
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+    ColumnCapabilitiesImpl capabilities =
+        
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+    
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+    
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+    EasyMock.replay(selectorFactory);
+
+    Aggregator aggregator = new DoubleSumAggregatorFactory("sum", 
"metric").factorize(selectorFactory);
+
+    Assertions.assertTrue(aggregator instanceof NullableNumericAggregator);
+    Assertions.assertNull(aggregator.get());
+    aggregator.aggregate();
+    Assertions.assertEquals(1.0d, aggregator.getDouble(), 0.0d);
+    EasyMock.verify(selectorFactory);
+  }
+
+  @Test
+  public void testPooledTopNSkipsNullableBufferAggregatorWhenInputHasNoNulls()
+  {
+    ColumnSelectorFactory selectorFactory = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    TestDoubleColumnSelectorImpl selector = new 
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+    ColumnCapabilitiesImpl capabilities =
+        
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE).setHasNulls(false);
+    DoubleSumAggregatorFactory factory = new DoubleSumAggregatorFactory("sum", 
"metric");
+    
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+    
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(3);
+    EasyMock.replay(selectorFactory);
+
+    BufferAggregator aggregator = 
factory.factorizeBufferedForPooledTopN(selectorFactory);
+
+    Assertions.assertTrue(aggregator instanceof DoubleSumBufferAggregator);
+    Assertions.assertEquals(Double.BYTES, 
factory.getMaxIntermediateSizeWithNullsForPooledTopN(selectorFactory));
+    ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+    aggregator.init(buffer, 0);
+    aggregator.aggregate(buffer, 0);
+    Assertions.assertEquals(1.0d, aggregator.getDouble(buffer, 0), 0.0d);
+    EasyMock.verify(selectorFactory);
+  }
+
+  @Test
+  public void testUsesNullableBufferAggregatorWhenInputNullsAreUnknown()
+  {
+    ColumnSelectorFactory selectorFactory = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    TestDoubleColumnSelectorImpl selector = new 
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+    ColumnCapabilitiesImpl capabilities =
+        
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
+    
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+    
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+    EasyMock.replay(selectorFactory);
+
+    BufferAggregator aggregator = new DoubleSumAggregatorFactory("sum", 
"metric").factorizeBuffered(selectorFactory);
+
+    Assertions.assertTrue(aggregator instanceof 
NullableNumericBufferAggregator);
+    EasyMock.verify(selectorFactory);
+  }
+
+  @Test
+  public void testUsesNullableAggregatorWhenInputNullsAreUnknown()
+  {
+    ColumnSelectorFactory selectorFactory = 
EasyMock.createMock(ColumnSelectorFactory.class);
+    TestDoubleColumnSelectorImpl selector = new 
TestDoubleColumnSelectorImpl(new double[]{1.0d});
+    ColumnCapabilitiesImpl capabilities =
+        
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
+    
EasyMock.expect(selectorFactory.makeColumnValueSelector("metric")).andReturn(selector);
+    
EasyMock.expect(selectorFactory.getColumnCapabilities("metric")).andReturn(capabilities).times(2);
+    EasyMock.replay(selectorFactory);
+
+    Aggregator aggregator = new DoubleSumAggregatorFactory("sum", 
"metric").factorize(selectorFactory);

Review Comment:
   Addressed in a7c2e5e: added 
testPooledTopNUsesNullableBufferAggregatorWhenInputNullsAreUnknown to assert 
both the nullable wrapper and the null-byte-inclusive Pooled TopN intermediate 
size for unknown nullability.



##########
processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java:
##########
@@ -86,12 +91,35 @@ public final VectorAggregator 
factorizeVector(VectorColumnSelectorFactory column
     Preconditions.checkState(canVectorize(columnSelectorFactory), "Cannot 
vectorize");
     VectorValueSelector selector = vectorSelector(columnSelectorFactory);
     VectorAggregator aggregator = factorizeVector(columnSelectorFactory, 
selector);
-    if (this.forceNotNullable()) {
+    if (forceNotNullable()) {
       return aggregator;
     }
     return new NullableNumericVectorAggregator(aggregator, selector);
   }
 
+  /**
+   * Factorizes a buffer aggregator for Pooled TopN. Unlike general 
aggregation engines, Pooled TopN creates aggregate
+   * state only after a dimension value is seen. If the input column is known 
to be numeric and non-null, the nullable
+   * wrapper cannot affect the result, so TopN may skip it to keep hot-loop 
specialization effective.
+   */
+  public final BufferAggregator 
factorizeBufferedForPooledTopN(ColumnSelectorFactory columnSelectorFactory)
+  {
+    T selector = selector(columnSelectorFactory);
+    BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory, 
selector);
+    if (!useNullableNumericAggregatorsForPooledTopN(columnSelectorFactory)) {
+      return aggregator;
+    }
+    return new NullableNumericBufferAggregator(aggregator, 
makeNullSelector(selector, columnSelectorFactory));
+  }
+
+  public final int 
getMaxIntermediateSizeWithNullsForPooledTopN(ColumnInspector columnInspector)
+  {
+    if (!useNullableNumericAggregatorsForPooledTopN(columnInspector)) {
+      return getMaxIntermediateSize();
+    }
+    return getMaxIntermediateSizeWithNulls();

Review Comment:
   Addressed by updating the PR title/body to make the scope explicit: this is 
now a Pooled TopN-specific optimization. The regular 
factorize/factorizeBuffered paths intentionally keep nullable wrappers to 
preserve SQL-compatible empty-group/no-value null semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to