github-advanced-security[bot] commented on code in PR #19357:
URL: https://github.com/apache/druid/pull/19357#discussion_r3114503272


##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/SpillingGrouper.java:
##########
@@ -345,12 +380,122 @@
 
   private void spill() throws IOException
   {
+    // Serialize the buffer flush to heap memory rather than directly to disk. 
When the pre-allocated
+    // in-memory size per row (e.g. ~131KB for ThetaSketch) is much larger 
than the compact serialized
+    // size (e.g. ~24 bytes when each key has been seen only a few times), 
each flush produces a tiny
+    // byte array. Batching these in memory until MIN_SPILL_FILE_BYTES is 
reached avoids creating
+    // thousands of tiny disk files, which would each require a live file 
handle during the merge phase.
+    final byte[] runBytes;
     try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
-      files.add(spill(iterator));
-      dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
+      runBytes = serializeToBytes(iterator);
+    }
+    pendingSpillRuns.add(runBytes);
+    pendingSpillBytes += runBytes.length;
+    pendingDictionaryEntries.addAll(keySerde.getDictionary());
+    grouper.reset();
 
-      grouper.reset();
+    if (pendingSpillBytes >= MIN_SPILL_FILE_BYTES) {
+      flushPendingRunsToDisk(true);
+    }
+  }
+
+  /**
+   * Merge-sorts all pending in-memory spill runs and writes them as a single 
sorted file to disk.
+   * Reading from heap memory (ByteArrayInputStream) is orders of magnitude 
faster than reading
+   * from disk, so the merge cost here is negligible even for many accumulated 
runs.
+   */
+  private void flushPendingRunsToDisk(final boolean sorted) throws IOException
+  {
+    if (pendingSpillRuns.isEmpty()) {
+      return;
+    }
+
+    final List<MappingIterator<Entry<KeyType>>> readers = new 
ArrayList<>(pendingSpillRuns.size());
+    try {
+      for (final byte[] runBytes : pendingSpillRuns) {
+        readers.add(spillMapper.readValues(
+            spillMapper.getFactory().createParser(new LZ4BlockInputStream(new 
ByteArrayInputStream(runBytes))),

Review Comment:
   ## CodeQL / Deprecated method or constructor invocation
   
   Invoking [LZ4BlockInputStream.LZ4BlockInputStream](1) should be avoided 
because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/11101)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to