FrankChen021 commented on code in PR #19592:
URL: https://github.com/apache/druid/pull/19592#discussion_r3435736072


##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java:
##########
@@ -111,20 +118,20 @@ public DeltaInputSourceIterator(
     public boolean hasNext()
     {
       while (currentBatch == null || !currentBatch.hasNext()) {
-        if (!filteredColumnarBatchIterators.hasNext()) {
-          return false; // No more batches or records to read!
-        }
-
-        final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredBatchIterator =
-            filteredColumnarBatchIterators.next();
-
-        while (filteredBatchIterator.hasNext()) {
-          final FilteredColumnarBatch nextBatch = filteredBatchIterator.next();
+        // Drain remaining batches from the current file before moving to the 
next.
+        while (currentFileIterator != null && currentFileIterator.hasNext()) {
+          final FilteredColumnarBatch nextBatch = currentFileIterator.next();
           currentBatch = nextBatch.getRows();
           if (currentBatch.hasNext()) {
             return true;
           }
         }
+
+        // Advance to the next file.
+        if (!filteredColumnarBatchIterators.hasNext()) {
+          return false;
+        }
+        currentFileIterator = filteredColumnarBatchIterators.next();

Review Comment:
   [P1] Close exhausted file iterators before advancing
   
   The latest commit only adds a newline, so this remains from the full PR 
diff: after draining currentFileIterator, hasNext() overwrites it with the next 
scan-file iterator without closing the exhausted one. These iterators come from 
Scan.transformPhysicalData(), whose close() closes the underlying Parquet 
physicalDataIter, so a fully read multi-file table leaks every completed file 
reader except the last one closed by DeltaInputSourceIterator.close(). Large 
Delta ingestions can therefore accumulate open Parquet readers/file handles. 
Close the exhausted currentFileIterator before assigning the next one, or 
otherwise ensure each per-file iterator is closed once it is drained.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to