This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 936196c9f99 [fs] Optimize cloud file listing with per-page filtering 
and early termination in PinotFS (#17847)
936196c9f99 is described below

commit 936196c9f9950a1b7df5210e351fcf3777fe66cc
Author: Anshul Singh <[email protected]>
AuthorDate: Fri Mar 13 18:19:02 2026 +0530

    [fs] Optimize cloud file listing with per-page filtering and early 
termination in PinotFS (#17847)
    
    * [fs] Optimize cloud file listing with per-page filtering and early 
termination in PinotFS
    
    * address feedback
---
 .../pinot/plugin/filesystem/ADLSGen2PinotFS.java   |  40 +++
 .../test/ADLSGen2PinotFSPaginatedListTest.java     | 273 ++++++++++++++++
 .../apache/pinot/plugin/filesystem/GcsPinotFS.java |  55 ++++
 .../filesystem/GcsPinotFSPaginatedListTest.java    | 259 +++++++++++++++
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  |  65 ++++
 .../filesystem/S3PinotFSPaginatedListTest.java     | 357 +++++++++++++++++++++
 .../pinot/spi/filesystem/NoClosePinotFS.java       |   8 +
 .../org/apache/pinot/spi/filesystem/PinotFS.java   |  33 ++
 .../spi/filesystem/PinotFSPaginatedListTest.java   | 129 ++++++++
 9 files changed, 1219 insertions(+)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index e3b3cc01baa..3c4606d492a 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -56,9 +56,11 @@ import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.time.OffsetDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -474,6 +476,44 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     }
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final 
boolean recursive,
+      final Predicate<String> pathFilter, final int maxResults)
+      throws IOException {
+    if (maxResults <= 0) {
+      LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning 
empty list", maxResults);
+      return new ArrayList<>();
+    }
+    LOGGER.debug("listFilesWithMetadata (paginated) is called with 
fileUri='{}', recursive='{}', maxResults={}",
+        fileUri, recursive, maxResults);
+    final List<FileMetadata> result = new ArrayList<>();
+    try {
+      // PagedIterable fetches pages lazily; breaking out stops further API 
calls
+      for (final PathItem item : listPathItems(fileUri, recursive)) {
+        if (item.isDirectory()) {
+          continue;
+        }
+        final String filePath = 
AzurePinotFSUtil.convertAzureStylePathToUriStylePath(item.getName());
+        if (pathFilter.test(filePath)) {
+          result.add(new FileMetadata.Builder()
+              .setFilePath(filePath)
+              
.setLastModifiedTime(item.getLastModified().toInstant().toEpochMilli())
+              .setLength(item.getContentLength())
+              .setIsDirectory(false)
+              .build());
+          if (result.size() >= maxResults) {
+            break;
+          }
+        }
+      }
+    } catch (DataLakeStorageException e) {
+      throw new IOException(e);
+    }
+    LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+        result.size(), maxResults, fileUri, recursive);
+    return result;
+  }
+
   private PagedIterable<PathItem> listPathItems(URI fileUri, boolean recursive)
       throws IOException {
     // Unlike other Azure SDK APIs that takes url encoded path, 
ListPathsOptions takes decoded url
diff --git 
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
 
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..518c31617f1
--- /dev/null
+++ 
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem.test;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.azure.storage.file.datalake.models.PathItem;
+import java.io.IOException;
+import java.net.URI;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for ADLSGen2PinotFS.listFilesWithMetadata(URI, boolean, 
Predicate, int)
+ * — the paginated listing with lazy PagedIterable iteration and early 
termination.
+ */
+public class ADLSGen2PinotFSPaginatedListTest {
+
+  private static final Predicate<String> ACCEPT_ALL = path -> true;
+  private static final OffsetDateTime MTIME = OffsetDateTime.of(2026, 1, 15, 
10, 30, 0, 0, ZoneOffset.UTC);
+
+  @Mock
+  private DataLakeFileSystemClient _mockFileSystemClient;
+
+  @SuppressWarnings("rawtypes")
+  @Mock
+  private PagedIterable _mockPagedIterable;
+
+  private ADLSGen2PinotFS _adlsPinotFS;
+  private AutoCloseable _mocks;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+    _adlsPinotFS = new ADLSGen2PinotFS(_mockFileSystemClient);
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    _mocks.close();
+  }
+
+  private static PathItem mockPathItem(final String name, final boolean 
isDirectory,
+      final long contentLength) {
+    final PathItem item = mock(PathItem.class);
+    when(item.getName()).thenReturn(name);
+    when(item.isDirectory()).thenReturn(isDirectory);
+    when(item.getContentLength()).thenReturn(contentLength);
+    when(item.getLastModified()).thenReturn(MTIME);
+    return item;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void setupIterator(final PathItem... items) {
+    when(_mockFileSystemClient.listPaths(any(), 
any())).thenReturn(_mockPagedIterable);
+    final Iterator<PathItem> iterator = Arrays.asList(items).iterator();
+    when(_mockPagedIterable.iterator()).thenReturn(iterator);
+  }
+
+  @Test
+  public void testAllMatch() throws IOException {
+    setupIterator(
+        mockPathItem("data/file1.parquet", false, 100),
+        mockPathItem("data/file2.parquet", false, 200),
+        mockPathItem("data/file3.parquet", false, 300)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 3);
+    assertEquals(result.get(0).getFilePath(), "/data/file1.parquet");
+    assertEquals(result.get(1).getFilePath(), "/data/file2.parquet");
+    assertEquals(result.get(2).getFilePath(), "/data/file3.parquet");
+  }
+
+  @Test
+  public void testEarlyTermination() throws IOException {
+    // 5 items, maxResults=3
+    setupIterator(
+        mockPathItem("data/f1.parquet", false, 100),
+        mockPathItem("data/f2.parquet", false, 200),
+        mockPathItem("data/f3.parquet", false, 300),
+        mockPathItem("data/f4.parquet", false, 400),
+        mockPathItem("data/f5.parquet", false, 500)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 3);
+
+    assertEquals(result.size(), 3);
+    assertEquals(result.get(2).getFilePath(), "/data/f3.parquet");
+  }
+
+  @Test
+  public void testMaxResultsOne() throws IOException {
+    setupIterator(
+        mockPathItem("data/f1.parquet", false, 100),
+        mockPathItem("data/f2.parquet", false, 200)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 1);
+
+    assertEquals(result.size(), 1);
+  }
+
+  @Test
+  public void testFilterPredicate() throws IOException {
+    setupIterator(
+        mockPathItem("data/file1.parquet", false, 100),
+        mockPathItem("data/file2.csv", false, 200),
+        mockPathItem("data/file3.parquet", false, 300),
+        mockPathItem("data/file4.json", false, 400)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, path -> path.endsWith(".parquet"), 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().allMatch(f -> 
f.getFilePath().endsWith(".parquet")));
+  }
+
+  @Test
+  public void testDirectoriesSkipped() throws IOException {
+    setupIterator(
+        mockPathItem("data/subdir", true, 0),
+        mockPathItem("data/file1.parquet", false, 100),
+        mockPathItem("data/anotherdir", true, 0),
+        mockPathItem("data/file2.parquet", false, 200)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+  }
+
+  @Test
+  public void testFilterRejectsAll() throws IOException {
+    setupIterator(
+        mockPathItem("data/file1.parquet", false, 100),
+        mockPathItem("data/file2.parquet", false, 200)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, path -> false, 10);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testEmptyListing() throws IOException {
+    setupIterator();
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testFileMetadataAttributes() throws IOException {
+    setupIterator(
+        mockPathItem("data/file.parquet", false, 4096)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    final FileMetadata fm = result.get(0);
+    assertEquals(fm.getFilePath(), "/data/file.parquet");
+    assertEquals(fm.getLength(), 4096L);
+    assertEquals(fm.getLastModifiedTime(), MTIME.toInstant().toEpochMilli());
+    assertFalse(fm.isDirectory());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(expectedExceptions = IOException.class)
+  public void testDataLakeExceptionWrappedAsIOException() throws IOException {
+    when(_mockFileSystemClient.listPaths(any(), any()))
+        .thenThrow(mock(DataLakeStorageException.class));
+
+    _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 10);
+  }
+
+  @Test
+  public void testMaxResultsZeroReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, 0);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, ACCEPT_ALL, -3);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testEarlyTerminationWithFilterAndDirectories() throws 
IOException {
+    // Mix of dirs, matching files, and non-matching files; maxResults=2
+    setupIterator(
+        mockPathItem("data/dir1", true, 0),
+        mockPathItem("data/file1.csv", false, 100),
+        mockPathItem("data/file2.parquet", false, 200),
+        mockPathItem("data/dir2", true, 0),
+        mockPathItem("data/file3.parquet", false, 300),
+        mockPathItem("data/file4.parquet", false, 400)
+    );
+
+    final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+        URI.create("abfss://[email protected]/data/"),
+        true, path -> path.endsWith(".parquet"), 2);
+
+    assertEquals(result.size(), 2);
+    assertEquals(result.get(0).getFilePath(), "/data/file2.parquet");
+    assertEquals(result.get(1).getFilePath(), "/data/file3.parquet");
+  }
+}
diff --git 
a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
index 8742e090ab9..7e21689744c 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -258,6 +259,60 @@ public class GcsPinotFS extends BasePinotFS {
     return listedFiles;
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final 
boolean recursive,
+      final Predicate<String> pathFilter, final int maxResults)
+      throws IOException {
+    if (maxResults <= 0) {
+      LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning 
empty list", maxResults);
+      return new ArrayList<>();
+    }
+    final List<FileMetadata> result = new ArrayList<>();
+    final GcsUri gcsFileUri = new GcsUri(fileUri);
+    final String prefix = gcsFileUri.getPrefix();
+    final String bucketName = gcsFileUri.getBucketName();
+    try {
+      Page<Blob> page;
+      if (recursive) {
+        page = _storage.list(bucketName, 
Storage.BlobListOption.prefix(prefix));
+      } else {
+        page = _storage.list(bucketName, Storage.BlobListOption.prefix(prefix),
+            Storage.BlobListOption.currentDirectory());
+      }
+      while (page != null && result.size() < maxResults) {
+        for (final Blob blob : page.getValues()) {
+          if (blob.getName().equals(prefix)) {
+            continue;
+          }
+          final boolean isDirectory = 
blob.getName().endsWith(GcsUri.DELIMITER);
+          if (isDirectory) {
+            continue;
+          }
+          final String filePath = GcsUri.createGcsUri(bucketName, 
blob.getName()).toString();
+          if (pathFilter.test(filePath)) {
+            final FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+                .setFilePath(filePath)
+                .setLength(blob.getSize())
+                .setIsDirectory(false);
+            if (blob.getUpdateTime() != null) {
+              fileBuilder.setLastModifiedTime(blob.getUpdateTime());
+            }
+            result.add(fileBuilder.build());
+            if (result.size() >= maxResults) {
+              break;
+            }
+          }
+        }
+        page = page.hasNextPage() ? page.getNextPage() : null;
+      }
+    } catch (Exception t) {
+      throw new IOException(t);
+    }
+    LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+        result.size(), maxResults, gcsFileUri, recursive);
+    return result;
+  }
+
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
       throws Exception {
diff --git 
a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
 
b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..3d316536a92
--- /dev/null
+++ 
b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for GcsPinotFS.listFilesWithMetadata(URI, boolean, Predicate, 
int)
+ * — the paginated listing with early termination using Page&lt;Blob&gt;.
+ */
+public class GcsPinotFSPaginatedListTest {
+
+  private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+  private Storage _mockStorage;
+  private GcsPinotFS _gcsPinotFS;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    _mockStorage = mock(Storage.class);
+    _gcsPinotFS = new GcsPinotFS();
+    final Field storageField = GcsPinotFS.class.getDeclaredField("_storage");
+    storageField.setAccessible(true);
+    storageField.set(_gcsPinotFS, _mockStorage);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Page<Blob> mockPage(final List<Blob> blobs, final Page<Blob> 
nextPage) {
+    final Page<Blob> page = mock(Page.class);
+    when(page.getValues()).thenReturn(blobs);
+    when(page.hasNextPage()).thenReturn(nextPage != null);
+    when(page.getNextPage()).thenReturn(nextPage);
+    return page;
+  }
+
+  private static Blob mockBlob(final String name, final long size, final Long 
updateTime) {
+    final Blob blob = mock(Blob.class);
+    when(blob.getName()).thenReturn(name);
+    when(blob.getSize()).thenReturn(size);
+    when(blob.getUpdateTime()).thenReturn(updateTime);
+    return blob;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void stubStorageList(final Page<Blob> page) {
+    when(_mockStorage.list(anyString(), (Storage.BlobListOption[]) any()))
+        .thenReturn(page);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSinglePageAllMatch() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/file1.parquet", 100, 1000L),
+        mockBlob("data/file2.parquet", 200, 2000L)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 2);
+    assertEquals(result.get(0).getFilePath(), 
"gs://bucket/data/file1.parquet");
+    assertEquals(result.get(1).getFilePath(), 
"gs://bucket/data/file2.parquet");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testEarlyTerminationSkipsProcessingNextPage() throws IOException 
{
+    // Page 1 has 3 blobs, Page 2 has 1 blob; maxResults=3
+    // Page 2's getValues() should never be called — while-loop exits because 
result.size() == maxResults
+    final Page<Blob> page2 = mockPage(
+        Arrays.asList(mockBlob("data/f4.parquet", 400, 4000L)), null);
+    final Page<Blob> page1 = mockPage(Arrays.asList(
+        mockBlob("data/f1.parquet", 100, 1000L),
+        mockBlob("data/f2.parquet", 200, 2000L),
+        mockBlob("data/f3.parquet", 300, 3000L)
+    ), page2);
+    stubStorageList(page1);
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 3);
+
+    assertEquals(result.size(), 3);
+    // page2.getValues() should never be invoked: the while-loop exits before 
iterating page 2
+    verify(page2, never()).getValues();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testMultiplePagesNeeded() throws IOException {
+    final Page<Blob> page2 = mockPage(
+        Arrays.asList(mockBlob("data/f3.parquet", 300, 3000L)), null);
+    final Page<Blob> page1 = mockPage(Arrays.asList(
+        mockBlob("data/f1.parquet", 100, 1000L),
+        mockBlob("data/f2.parquet", 200, 2000L)
+    ), page2);
+    stubStorageList(page1);
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 3);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFilterPredicate() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/file1.parquet", 100, 1000L),
+        mockBlob("data/file2.csv", 200, 2000L),
+        mockBlob("data/file3.parquet", 300, 3000L)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true,
+        path -> path.endsWith(".parquet"), 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().allMatch(f -> 
f.getFilePath().endsWith(".parquet")));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testDirectoriesSkipped() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/subdir/", 0, 1000L),
+        mockBlob("data/file1.parquet", 100, 2000L),
+        mockBlob("data/another/", 0, 3000L),
+        mockBlob("data/file2.parquet", 200, 4000L)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testPrefixDirectoryMarkerSkipped() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/", 0, 1000L),
+        mockBlob("data/file1.parquet", 100, 2000L)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    assertEquals(result.get(0).getFilePath(), 
"gs://bucket/data/file1.parquet");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNullUpdateTimeHandled() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/file1.parquet", 100, null)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    // lastModifiedTime defaults to 0 when updateTime is null
+    assertEquals(result.get(0).getLastModifiedTime(), 0L);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testEmptyBucket() throws IOException {
+    stubStorageList(mockPage(Collections.emptyList(), null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testMaxResultsZeroReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 0);
+
+    assertEquals(result.size(), 0);
+    verify(_mockStorage, never()).list(anyString(), (Storage.BlobListOption[]) 
any());
+  }
+
+  @Test
+  public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, -1);
+
+    assertEquals(result.size(), 0);
+    verify(_mockStorage, never()).list(anyString(), (Storage.BlobListOption[]) 
any());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFileMetadataAttributes() throws IOException {
+    final List<Blob> blobs = Arrays.asList(
+        mockBlob("data/file.parquet", 4096, 1705312200000L)
+    );
+    stubStorageList(mockPage(blobs, null));
+
+    final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+        URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    final FileMetadata fm = result.get(0);
+    assertEquals(fm.getFilePath(), "gs://bucket/data/file.parquet");
+    assertEquals(fm.getLength(), 4096L);
+    assertEquals(fm.getLastModifiedTime(), 1705312200000L);
+    assertEquals(fm.isDirectory(), false);
+  }
+}
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 2700ea46e3f..494f9d6c5d4 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
@@ -748,6 +749,70 @@ public class S3PinotFS extends BasePinotFS {
     return listedFiles;
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final 
boolean recursive,
+      final Predicate<String> pathFilter, final int maxResults)
+      throws IOException {
+    if (maxResults <= 0) {
+      LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning 
empty list", maxResults);
+      return new ArrayList<>();
+    }
+    final List<FileMetadata> result = new ArrayList<>();
+    final String scheme = fileUri.getScheme();
+    Preconditions.checkArgument(scheme.equals(S3_SCHEME) || 
scheme.equals(S3A_SCHEME));
+    try {
+      String continuationToken = null;
+      boolean isDone = false;
+      final String prefix = normalizeToDirectoryPrefix(fileUri);
+      while (!isDone && result.size() < maxResults) {
+        ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+            ListObjectsV2Request.builder().bucket(fileUri.getHost());
+        if (!prefix.equals(DELIMITER)) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.prefix(prefix);
+        }
+        if (!recursive) {
+          listObjectsV2RequestBuilder = 
listObjectsV2RequestBuilder.delimiter(DELIMITER);
+        }
+        if (continuationToken != null) {
+          listObjectsV2RequestBuilder.continuationToken(continuationToken);
+        }
+        final ListObjectsV2Request listObjectsV2Request = 
listObjectsV2RequestBuilder.build();
+        LOGGER.debug("Trying to send ListObjectsV2Request {}", 
listObjectsV2Request);
+        final ListObjectsV2Response listObjectsV2Response = 
retryWithS3CredentialRefresh(() ->
+            _s3Client.listObjectsV2(listObjectsV2Request));
+        for (final S3Object s3Object : listObjectsV2Response.contents()) {
+          if (s3Object.key().equals(fileUri.getPath())) {
+            continue;
+          }
+          final boolean isDirectory = s3Object.key().endsWith(DELIMITER);
+          if (isDirectory) {
+            continue;
+          }
+          final String filePath =
+              scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER + 
getNormalizedFileKey(s3Object);
+          if (pathFilter.test(filePath)) {
+            result.add(new FileMetadata.Builder()
+                .setFilePath(filePath)
+                .setLastModifiedTime(s3Object.lastModified().toEpochMilli())
+                .setLength(s3Object.size())
+                .setIsDirectory(false)
+                .build());
+            if (result.size() >= maxResults) {
+              break;
+            }
+          }
+        }
+        isDone = !listObjectsV2Response.isTruncated();
+        continuationToken = listObjectsV2Response.nextContinuationToken();
+      }
+    } catch (Throwable t) {
+      throw new IOException(t);
+    }
+    LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+        result.size(), maxResults, fileUri, recursive);
+    return result;
+  }
+
   private static String getNormalizedFileKey(S3Object s3Object) {
     String fileKey = s3Object.key();
     if (fileKey.startsWith(DELIMITER)) {
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..a071440d3f0
--- /dev/null
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for S3PinotFS.listFilesWithMetadata(URI, boolean, Predicate, int)
+ * — the paginated listing with early termination.
+ */
+public class S3PinotFSPaginatedListTest {
+
+  private static final Instant NOW = Instant.now();
+  private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+  private S3Client _mockS3Client;
+  private S3PinotFS _s3PinotFS;
+
+  @BeforeMethod
+  public void setUp() {
+    _mockS3Client = mock(S3Client.class);
+    _s3PinotFS = new S3PinotFS();
+    _s3PinotFS.init(_mockS3Client);
+  }
+
+  private static S3Object s3obj(final String key, final long size) {
+    return S3Object.builder()
+        .key(key)
+        .size(size)
+        .lastModified(NOW)
+        .build();
+  }
+
+  private static ListObjectsV2Response page(final List<S3Object> objects,
+      final boolean truncated, final String nextToken) {
+    final ListObjectsV2Response.Builder builder = 
ListObjectsV2Response.builder()
+        .contents(objects)
+        .isTruncated(truncated);
+    if (nextToken != null) {
+      builder.nextContinuationToken(nextToken);
+    }
+    return builder.build();
+  }
+
+  @Test
+  public void testSinglePageAllMatch() throws IOException {
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/file2.parquet", 200),
+        s3obj("data/file3.parquet", 300)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 3);
+    assertEquals(result.get(0).getFilePath(), 
"s3://bucket/data/file1.parquet");
+    assertEquals(result.get(1).getFilePath(), 
"s3://bucket/data/file2.parquet");
+    assertEquals(result.get(2).getFilePath(), 
"s3://bucket/data/file3.parquet");
+    verify(_mockS3Client, 
times(1)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testSinglePageWithFilter() throws IOException {
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/file2.csv", 200),
+        s3obj("data/file3.parquet", 300),
+        s3obj("data/file4.json", 400)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true,
+        path -> path.endsWith(".parquet"), 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().allMatch(f -> 
f.getFilePath().endsWith(".parquet")));
+  }
+
+  @Test
+  public void testEarlyTerminationAcrossPages() throws IOException {
+    // 3 pages with 3 files each; maxResults=5 should stop after page 2
+    final List<S3Object> page1 = Arrays.asList(
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/file2.parquet", 200),
+        s3obj("data/file3.parquet", 300)
+    );
+    final List<S3Object> page2 = Arrays.asList(
+        s3obj("data/file4.parquet", 400),
+        s3obj("data/file5.parquet", 500),
+        s3obj("data/file6.parquet", 600)
+    );
+    // page 3 should never be fetched
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(page1, true, "token1"))
+        .thenReturn(page(page2, true, "token2"));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 5);
+
+    assertEquals(result.size(), 5);
+    // Only 2 S3 API calls, NOT 3
+    verify(_mockS3Client, 
times(2)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testEarlyTerminationWithinPage() throws IOException {
+    // 10 files on a single page; maxResults=3 should stop mid-page
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/f1.parquet", 100),
+        s3obj("data/f2.parquet", 200),
+        s3obj("data/f3.parquet", 300),
+        s3obj("data/f4.parquet", 400),
+        s3obj("data/f5.parquet", 500)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 3);
+
+    assertEquals(result.size(), 3);
+    assertEquals(result.get(0).getFilePath(), "s3://bucket/data/f1.parquet");
+    assertEquals(result.get(2).getFilePath(), "s3://bucket/data/f3.parquet");
+  }
+
+  @Test
+  public void testMaxResultsOneStopsImmediately() throws IOException {
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/file2.parquet", 200)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, true, "token1"));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 1);
+
+    assertEquals(result.size(), 1);
+    // Only 1 API call despite truncated=true
+    verify(_mockS3Client, 
times(1)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testContinuationTokenPassedCorrectly() throws IOException {
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(Arrays.asList(s3obj("data/f1.parquet", 100)), true, 
"abc123"))
+        .thenReturn(page(Arrays.asList(s3obj("data/f2.parquet", 200)), false, 
null));
+
+    _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    final ArgumentCaptor<ListObjectsV2Request> captor =
+        ArgumentCaptor.forClass(ListObjectsV2Request.class);
+    verify(_mockS3Client, times(2)).listObjectsV2(captor.capture());
+    // First call: no continuation token
+    assertTrue(captor.getAllValues().get(0).continuationToken() == null);
+    // Second call: has the token from page 1
+    assertEquals(captor.getAllValues().get(1).continuationToken(), "abc123");
+  }
+
+  @Test
+  public void testDirectoriesSkipped() throws IOException {
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/subdir/", 0),
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/another-dir/", 0),
+        s3obj("data/file2.parquet", 200)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 2);
+    assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+  }
+
+  @Test
+  public void testFilterRejectsAll() throws IOException {
+    final List<S3Object> objects = Arrays.asList(
+        s3obj("data/file1.parquet", 100),
+        s3obj("data/file2.parquet", 200)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, path -> false, 10);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testEmptyBucket() throws IOException {
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(Collections.emptyList(), false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testFileMetadataAttributes() throws IOException {
+    final Instant mtime = Instant.parse("2026-01-15T10:30:00Z");
+    final S3Object obj = S3Object.builder()
+        .key("data/file.parquet")
+        .size(4096L)
+        .lastModified(mtime)
+        .build();
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(Arrays.asList(obj), false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    final FileMetadata fm = result.get(0);
+    assertEquals(fm.getFilePath(), "s3://bucket/data/file.parquet");
+    assertEquals(fm.getLength(), 4096L);
+    assertEquals(fm.getLastModifiedTime(), mtime.toEpochMilli());
+    assertEquals(fm.isDirectory(), false);
+  }
+
+  @Test
+  public void testS3aScheme() throws IOException {
+    final List<S3Object> objects = Arrays.asList(s3obj("data/file.parquet", 
100));
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(objects, false, null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3a://bucket/data/"), true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 1);
+    assertTrue(result.get(0).getFilePath().startsWith("s3a://"));
+  }
+
+  @Test
+  public void testAllPagesNeededWhenMaxResultsHigh() throws IOException {
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(Arrays.asList(s3obj("data/f1.parquet", 100)), true, 
"t1"))
+        .thenReturn(page(Arrays.asList(s3obj("data/f2.parquet", 200)), true, 
"t2"))
+        .thenReturn(page(Arrays.asList(s3obj("data/f3.parquet", 300)), false, 
null));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 100);
+
+    assertEquals(result.size(), 3);
+    verify(_mockS3Client, 
times(3)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testFilterWithEarlyTerminationAcrossPages() throws IOException {
+    // Page 1: 2 parquet + 1 csv; Page 2: 2 parquet + 1 csv; maxResults=3 for 
parquet only
+    final List<S3Object> page1 = Arrays.asList(
+        s3obj("data/a.parquet", 100),
+        s3obj("data/b.csv", 200),
+        s3obj("data/c.parquet", 300)
+    );
+    final List<S3Object> page2 = Arrays.asList(
+        s3obj("data/d.csv", 400),
+        s3obj("data/e.parquet", 500),
+        s3obj("data/f.csv", 600)
+    );
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(page1, true, "t1"))
+        .thenReturn(page(page2, true, "t2"));
+
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true,
+        path -> path.endsWith(".parquet"), 3);
+
+    assertEquals(result.size(), 3);
+    assertTrue(result.stream().allMatch(f -> 
f.getFilePath().endsWith(".parquet")));
+    // 2 pages fetched (2 from page1 + 1 from page2 = 3 matches)
+    verify(_mockS3Client, 
times(2)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testMaxResultsZeroReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 0);
+
+    assertEquals(result.size(), 0);
+    verify(_mockS3Client, 
times(0)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+    final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/"), true, ACCEPT_ALL, -5);
+
+    assertEquals(result.size(), 0);
+    verify(_mockS3Client, 
times(0)).listObjectsV2(any(ListObjectsV2Request.class));
+  }
+
+  @Test
+  public void testPrefixSentToS3() throws IOException {
+    when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(page(Collections.emptyList(), false, null));
+
+    _s3PinotFS.listFilesWithMetadata(
+        URI.create("s3://bucket/data/2026/01/"), true, ACCEPT_ALL, 10);
+
+    final ArgumentCaptor<ListObjectsV2Request> captor =
+        ArgumentCaptor.forClass(ListObjectsV2Request.class);
+    verify(_mockS3Client).listObjectsV2(captor.capture());
+    assertEquals(captor.getValue().bucket(), "bucket");
+    assertEquals(captor.getValue().prefix(), "data/2026/01/");
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
index de29fe4cb80..860806059cf 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
+import java.util.function.Predicate;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -98,6 +99,13 @@ public class NoClosePinotFS implements PinotFS {
     return _delegate.listFilesWithMetadata(fileUri, recursive);
   }
 
+  @Override
+  public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean 
recursive,
+      Predicate<String> pathFilter, int maxResults)
+      throws IOException {
+    return _delegate.listFilesWithMetadata(fileUri, recursive, pathFilter, 
maxResults);
+  }
+
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
       throws Exception {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 863df527f9c..2d26e5d2035 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -24,7 +24,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Predicate;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -180,6 +182,37 @@ public interface PinotFS extends Closeable, Serializable {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Lists files with metadata, applying a path filter and stopping after 
{@code maxResults} matches.
+   * Cloud storage implementations (S3, GCS, ADLS) should override this to 
fetch pages lazily
+   * and terminate early once enough matching files are found — avoiding 
loading all objects into memory.
+   *
+   * @param fileUri location to list files from
+   * @param recursive if we want to list files recursively
+   * @param pathFilter predicate applied to each file path; only files passing 
this filter are included
+   * @param maxResults maximum number of matching (non-directory) files to 
return
+   * @return a list of FileMetadata capped at maxResults
+   * @throws IOException on IO failure
+   */
+  default List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean 
recursive,
+      Predicate<String> pathFilter, int maxResults)
+      throws IOException {
+    if (maxResults <= 0) {
+      return new ArrayList<>();
+    }
+    final List<FileMetadata> all = listFilesWithMetadata(fileUri, recursive);
+    final List<FileMetadata> result = new ArrayList<>();
+    for (final FileMetadata fm : all) {
+      if (!fm.isDirectory() && pathFilter.test(fm.getFilePath())) {
+        result.add(fm);
+        if (result.size() >= maxResults) {
+          break;
+        }
+      }
+    }
+    return result;
+  }
+
   /**
    * Copies a file from a remote filesystem to the local one. Keeps the 
original file.
    * @param srcUri location of current file on remote filesystem (must not be 
a directory)
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..017c3487153
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.filesystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests for the 4-arg listFilesWithMetadata: default method guard clause and 
NoClosePinotFS delegation.
+ */
+public class PinotFSPaginatedListTest {
+
+  private static final URI TEST_URI = URI.create("s3://bucket/data/");
+  private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+  // --- PinotFS default method: maxResults <= 0 guard ---
+
+  @Test
+  public void testDefaultMethodMaxResultsZeroReturnsEmpty() throws IOException 
{
+    final PinotFS mockFS = mock(PinotFS.class);
+    when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(), 
anyInt()))
+        .thenCallRealMethod();
+
+    final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI, 
true, ACCEPT_ALL, 0);
+
+    assertEquals(result.size(), 0);
+    // 2-arg method should never be called when maxResults <= 0
+    verify(mockFS, never()).listFilesWithMetadata(any(URI.class), 
anyBoolean());
+  }
+
+  @Test
+  public void testDefaultMethodMaxResultsNegativeReturnsEmpty() throws 
IOException {
+    final PinotFS mockFS = mock(PinotFS.class);
+    when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(), 
anyInt()))
+        .thenCallRealMethod();
+
+    final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI, 
true, ACCEPT_ALL, -10);
+
+    assertEquals(result.size(), 0);
+    verify(mockFS, never()).listFilesWithMetadata(any(URI.class), 
anyBoolean());
+  }
+
+  @Test
+  public void testDefaultMethodPositiveMaxResultsDelegatesToTwoArg() throws 
IOException {
+    final PinotFS mockFS = mock(PinotFS.class);
+    when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(), 
anyInt()))
+        .thenCallRealMethod();
+    when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean()))
+        .thenReturn(Arrays.asList(
+            new 
FileMetadata.Builder().setFilePath("s3://bucket/data/f1.parquet")
+                .setLength(100).setIsDirectory(false).build(),
+            new 
FileMetadata.Builder().setFilePath("s3://bucket/data/f2.parquet")
+                .setLength(200).setIsDirectory(false).build()
+        ));
+
+    final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI, 
true, ACCEPT_ALL, 10);
+
+    assertEquals(result.size(), 2);
+    verify(mockFS).listFilesWithMetadata(any(URI.class), anyBoolean());
+  }
+
+  // --- NoClosePinotFS delegation ---
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNoClosePinotFSDelegatesToUnderlyingFS() throws IOException {
+    final PinotFS mockDelegate = mock(PinotFS.class);
+    final List<FileMetadata> expected = Arrays.asList(
+        new FileMetadata.Builder().setFilePath("s3://bucket/data/f1.parquet")
+            .setLength(100).setIsDirectory(false).build()
+    );
+    when(mockDelegate.listFilesWithMetadata(any(URI.class), anyBoolean(), 
any(), anyInt()))
+        .thenReturn(expected);
+
+    final NoClosePinotFS noCloseFS = new NoClosePinotFS(mockDelegate);
+    final Predicate<String> filter = path -> path.endsWith(".parquet");
+    final List<FileMetadata> result = 
noCloseFS.listFilesWithMetadata(TEST_URI, true, filter, 5);
+
+    assertEquals(result, expected);
+    verify(mockDelegate).listFilesWithMetadata(TEST_URI, true, filter, 5);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNoClosePinotFSPassesExactArguments() throws IOException {
+    final PinotFS mockDelegate = mock(PinotFS.class);
+    when(mockDelegate.listFilesWithMetadata(any(URI.class), anyBoolean(), 
any(), anyInt()))
+        .thenReturn(Arrays.asList());
+
+    final NoClosePinotFS noCloseFS = new NoClosePinotFS(mockDelegate);
+    final URI uri = URI.create("gs://my-bucket/prefix/");
+    final Predicate<String> filter = path -> false;
+    noCloseFS.listFilesWithMetadata(uri, false, filter, 42);
+
+    verify(mockDelegate).listFilesWithMetadata(uri, false, filter, 42);
+    // 2-arg should NOT be called — delegation should go directly to the 
optimized 4-arg on delegate
+    verify(mockDelegate, never()).listFilesWithMetadata(any(URI.class), 
anyBoolean());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to