This is an automated email from the ASF dual-hosted git repository.

abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3228e3ec05c fix(delta): drain all batches per scan file in 
DeltaInputSourceIterator (#19592)
3228e3ec05c is described below

commit 3228e3ec05c7533a9ca6ad5055896b7ff77d6d61
Author: Roman Rinchinov <[email protected]>
AuthorDate: Thu Jun 18 23:20:59 2026 +0200

    fix(delta): drain all batches per scan file in DeltaInputSourceIterator 
(#19592)
    
    * fix(delta): drain all batches per scan file in DeltaInputSourceIterator
    
    Fixes #18606 — only 1024 rows ingested per Parquet file when using the
    Delta Lake input source.
    
    Root cause: filteredBatchIterator was a local variable inside hasNext().
    When the method returned true after the first non-empty batch of a file,
    the iterator went out of scope. The next hasNext() call advanced to the
    next file, skipping all remaining batches of the current file.
    
    With Delta kernel's default batch size of 1024 rows, this caused exactly
    1024 rows × N files to be ingested regardless of actual file size.
    
    Fix: promote filteredBatchIterator to a field (currentFileIterator) so
    it survives across hasNext() calls and all batches of a file are drained
    before advancing to the next file.
    
    Also fixed close() to properly close currentFileIterator and drain all
    remaining file iterators.
    
    * test(delta): add regression test for GH-18606 batch drain fix
    
    Adds a Delta table with 2 Parquet files × 2000 rows (total 4000) where
    each file exceeds the Delta kernel's default batch size of 1024 rows.
    
    Without the fix: DeltaInputSourceIterator returns 1024 × 2 = 2048 rows.
    With the fix:    all 4000 rows are returned correctly.
    
    Test: 
DeltaInputSourceBatchDrainTest.testAllRowsReturnedWhenFilesExceedOneBatch
    
    * test(delta): add BatchDrainRegressionTests to DeltaInputSourceTest for 
GH-18606
    
    Adds LargeRowGroupDeltaTable (2 files × 2000 rows = 4000 total) and a
    BatchDrainRegressionTests inner class inside DeltaInputSourceTest following
    the same pattern as existing test classes.
    
    The regression test fails with the bug (returns 1024 × 2 = 2048 rows)
    and passes with the fix (returns all 4000 rows).
    
    * style: add missing newline at end of LargeRowGroupDeltaTable.java
    
    * fix(delta): close drained file iterator before advancing
    
    Each per-file iterator from Scan.transformPhysicalData() owns an
    underlying Parquet reader/file handle. hasNext() overwrote
    currentFileIterator with the next file without closing the exhausted
    one, leaking a handle per completed file on multi-file tables (close()
    only closed the last and the never-started iterators). Now close the
    drained iterator before advancing.
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../druid/delta/input/DeltaInputSourceReader.java  |  45 ++++++++++++-----
 .../druid/delta/input/DeltaInputSourceTest.java    |  40 +++++++++++++++
 .../druid/delta/input/LargeRowGroupDeltaTable.java |  54 +++++++++++++++++++++
 ...-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc | Bin 0 -> 152 bytes
 ...-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc | Bin 0 -> 152 bytes
 .../_delta_log/.00000000000000000000.json.crc      | Bin 0 -> 20 bytes
 .../_delta_log/00000000000000000000.json           |   5 ++
 ...104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet | Bin 0 -> 18078 bytes
 ...2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet | Bin 0 -> 18185 bytes
 9 files changed, 133 insertions(+), 11 deletions(-)

diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
index 672a126f7c4..6acbb095f3b 100644
--- 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
@@ -95,6 +95,13 @@ public class DeltaInputSourceReader implements 
InputSourceReader
   {
     private final 
Iterator<io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>> 
filteredColumnarBatchIterators;
 
+    // Keep a reference to the current file's batch iterator so we drain ALL
+    // its batches before advancing to the next file.
+    // Bug fix for https://github.com/apache/druid/issues/18606:
+    // the original code used a local variable for filteredBatchIterator which
+    // was discarded on return, causing only the first batch (1024 rows) of 
each
+    // file to be read.
+    private io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
currentFileIterator = null;
     private io.delta.kernel.utils.CloseableIterator<Row> currentBatch = null;
     private final InputRowSchema inputRowSchema;
 
@@ -111,20 +118,34 @@ public class DeltaInputSourceReader implements 
InputSourceReader
     public boolean hasNext()
     {
       while (currentBatch == null || !currentBatch.hasNext()) {
-        if (!filteredColumnarBatchIterators.hasNext()) {
-          return false; // No more batches or records to read!
-        }
-
-        final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredBatchIterator =
-            filteredColumnarBatchIterators.next();
-
-        while (filteredBatchIterator.hasNext()) {
-          final FilteredColumnarBatch nextBatch = filteredBatchIterator.next();
+        // Drain remaining batches from the current file before moving to the 
next.
+        while (currentFileIterator != null && currentFileIterator.hasNext()) {
+          final FilteredColumnarBatch nextBatch = currentFileIterator.next();
           currentBatch = nextBatch.getRows();
           if (currentBatch.hasNext()) {
             return true;
           }
         }
+
+        // Advance to the next file.
+        if (!filteredColumnarBatchIterators.hasNext()) {
+          return false;
+        }
+        // Close the drained file iterator before overwriting it. Each 
iterator from
+        // Scan.transformPhysicalData() owns an underlying Parquet reader/file 
handle;
+        // not closing it here would leak a handle per completed file on 
multi-file
+        // tables (only the last and the never-started iterators are closed in 
close()).
+        // hasNext() cannot throw checked exceptions, so wrap like the rest of 
this
+        // extension (see DeltaInputSource).
+        if (currentFileIterator != null) {
+          try {
+            currentFileIterator.close();
+          }
+          catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        currentFileIterator = filteredColumnarBatchIterators.next();
       }
       return true;
     }
@@ -146,8 +167,10 @@ public class DeltaInputSourceReader implements 
InputSourceReader
       if (currentBatch != null) {
         currentBatch.close();
       }
-
-      if (filteredColumnarBatchIterators.hasNext()) {
+      if (currentFileIterator != null) {
+        currentFileIterator.close();
+      }
+      while (filteredColumnarBatchIterators.hasNext()) {
         filteredColumnarBatchIterators.next().close();
       }
     }
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
index cbbcaefb3ce..6a689f31df5 100644
--- 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
@@ -439,6 +439,46 @@ public class DeltaInputSourceTest
     return rows;
   }
 
+  /**
+   * Regression test for https://github.com/apache/druid/issues/18606.
+   *
+   * {@link DeltaInputSourceReader.DeltaInputSourceIterator} used a local 
variable for the
+   * per-file {@code CloseableIterator<FilteredColumnarBatch>}. When {@code 
hasNext()} returned
+   * after the first non-empty batch of a file, that iterator went out of 
scope. The next
+   * {@code hasNext()} call advanced to the next file, skipping all remaining 
batches of the
+   * current file. With the Delta kernel default batch size of 1024 rows this 
produced exactly
+   * {@code 1024 * numFiles} rows regardless of actual file size.
+   *
+   * Test table: 2 Parquet files x 2000 rows = 4000 rows total.
+   * Without the fix: 1024 x 2 = 2048 rows.
+   * With the fix:    4000 rows.
+   */
+  public static class BatchDrainRegressionTests
+  {
+    @Test
+    public void testAllRowsReturnedWhenFileExceedsOneBatch() throws IOException
+    {
+      final DeltaInputSource deltaInputSource = new DeltaInputSource(
+          LargeRowGroupDeltaTable.DELTA_TABLE_PATH,
+          null,
+          null,
+          null
+      );
+      final InputSourceReader inputSourceReader = deltaInputSource.reader(
+          LargeRowGroupDeltaTable.SCHEMA,
+          null,
+          null
+      );
+      final List<InputRow> rows = readAllRows(inputSourceReader);
+      Assert.assertEquals(
+          "Expected all rows to be read. "
+          + "If this fails with " + (1024 * 2) + " rows, the per-file batch 
drain bug (GH-18606) has regressed.",
+          LargeRowGroupDeltaTable.EXPECTED_ROW_COUNT,
+          rows.size()
+      );
+    }
+  }
+
   private static void validateRows(
       final List<Map<String, Object>> expectedRows,
       final List<InputRow> actualReadRows,
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java
new file mode 100644
index 00000000000..c9a1966af15
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+/**
+ * Descriptor for a Delta table with 2 Parquet files × 2000 rows = 4000 rows 
total.
+ *
+ * Each file has more than 1024 rows, ensuring the Delta kernel reads more 
than one
+ * batch per file. Used as a regression test for GH-18606 where
+ * {@link DeltaInputSourceReader} only returned the first 1024 rows per file.
+ *
+ * Generated by src/test/resources/create_delta_table.py 
(large-row-group-table).
+ */
+public class LargeRowGroupDeltaTable
+{
+  public static final String DELTA_TABLE_PATH =
+      "src/test/resources/large-row-group-table";
+
+  public static final int EXPECTED_ROW_COUNT = 4000;
+
+  public static final InputRowSchema SCHEMA = new InputRowSchema(
+      new TimestampSpec("id", "posix", null),
+      new DimensionsSpec(ImmutableList.of(
+          new LongDimensionSchema("id"),
+          new StringDimensionSchema("name")
+      )),
+      ColumnsFilter.all()
+  );
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..aa967daecef
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..a46c186eed7
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc
new file mode 100644
index 00000000000..7e765da1cd4
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json
new file mode 100644
index 00000000000..9e92f4a42fe
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1781690365208,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"4000","numOutputBytes":"36263"},"engineInfo":"Apache-Spark/3.5.0
 Delta-Lake/3.2.0","txnId":"f2a1da56-9880-474d-80cb-520430c4d221"}}
+{"metaData":{"id":"c1c6ec87-61f6-4ca9-8b67-2edd4a2e6acb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1781690363342}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"add":{"path":"part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet","partitionValues":{},"size":18078,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":2002,\"minValues\":{\"id\":1,\"name\":\"name_1\"},\"maxValues\":{\"id\":3995,\"name\":\"name_995\"},\"nullCount\":{\"id\":0,\"name\":0}}"}}
+{"add":{"path":"part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet","partitionValues":{},"size":18185,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":1998,\"minValues\":{\"id\":0,\"name\":\"name_0\"},\"maxValues\":{\"id\":3999,\"name\":\"name_999\"},\"nullCount\":{\"id\":0,\"name\":0}}"}}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet
new file mode 100644
index 00000000000..ba8302a2741
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet
new file mode 100644
index 00000000000..c0d53dde407
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet
 differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to