This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 936196c9f99 [fs] Optimize cloud file listing with per-page filtering
and early termination in PinotFS (#17847)
936196c9f99 is described below
commit 936196c9f9950a1b7df5210e351fcf3777fe66cc
Author: Anshul Singh <[email protected]>
AuthorDate: Fri Mar 13 18:19:02 2026 +0530
[fs] Optimize cloud file listing with per-page filtering and early
termination in PinotFS (#17847)
* [fs] Optimize cloud file listing with per-page filtering and early
termination in PinotFS
* address feedback
---
.../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 40 +++
.../test/ADLSGen2PinotFSPaginatedListTest.java | 273 ++++++++++++++++
.../apache/pinot/plugin/filesystem/GcsPinotFS.java | 55 ++++
.../filesystem/GcsPinotFSPaginatedListTest.java | 259 +++++++++++++++
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 65 ++++
.../filesystem/S3PinotFSPaginatedListTest.java | 357 +++++++++++++++++++++
.../pinot/spi/filesystem/NoClosePinotFS.java | 8 +
.../org/apache/pinot/spi/filesystem/PinotFS.java | 33 ++
.../spi/filesystem/PinotFSPaginatedListTest.java | 129 ++++++++
9 files changed, 1219 insertions(+)
diff --git
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index e3b3cc01baa..3c4606d492a 100644
---
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -56,9 +56,11 @@ import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -474,6 +476,44 @@ public class ADLSGen2PinotFS extends BasePinotFS {
}
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final
boolean recursive,
+ final Predicate<String> pathFilter, final int maxResults)
+ throws IOException {
+ if (maxResults <= 0) {
+ LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning
empty list", maxResults);
+ return new ArrayList<>();
+ }
+ LOGGER.debug("listFilesWithMetadata (paginated) is called with
fileUri='{}', recursive='{}', maxResults={}",
+ fileUri, recursive, maxResults);
+ final List<FileMetadata> result = new ArrayList<>();
+ try {
+ // PagedIterable fetches pages lazily; breaking out stops further API
calls
+ for (final PathItem item : listPathItems(fileUri, recursive)) {
+ if (item.isDirectory()) {
+ continue;
+ }
+ final String filePath =
AzurePinotFSUtil.convertAzureStylePathToUriStylePath(item.getName());
+ if (pathFilter.test(filePath)) {
+ result.add(new FileMetadata.Builder()
+ .setFilePath(filePath)
+
.setLastModifiedTime(item.getLastModified().toInstant().toEpochMilli())
+ .setLength(item.getContentLength())
+ .setIsDirectory(false)
+ .build());
+ if (result.size() >= maxResults) {
+ break;
+ }
+ }
+ }
+ } catch (DataLakeStorageException e) {
+ throw new IOException(e);
+ }
+ LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+ result.size(), maxResults, fileUri, recursive);
+ return result;
+ }
+
private PagedIterable<PathItem> listPathItems(URI fileUri, boolean recursive)
throws IOException {
// Unlike other Azure SDK APIs that takes url encoded path,
ListPathsOptions takes decoded url
diff --git
a/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..518c31617f1
--- /dev/null
+++
b/pinot-plugins/pinot-file-system/pinot-adls/src/test/java/org/apache/pinot/plugin/filesystem/test/ADLSGen2PinotFSPaginatedListTest.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem.test;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.azure.storage.file.datalake.models.PathItem;
+import java.io.IOException;
+import java.net.URI;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for ADLSGen2PinotFS.listFilesWithMetadata(URI, boolean,
Predicate, int)
+ * — the paginated listing with lazy PagedIterable iteration and early
termination.
+ */
+public class ADLSGen2PinotFSPaginatedListTest {
+
+ private static final Predicate<String> ACCEPT_ALL = path -> true;
+ private static final OffsetDateTime MTIME = OffsetDateTime.of(2026, 1, 15,
10, 30, 0, 0, ZoneOffset.UTC);
+
+ @Mock
+ private DataLakeFileSystemClient _mockFileSystemClient;
+
+ @SuppressWarnings("rawtypes")
+ @Mock
+ private PagedIterable _mockPagedIterable;
+
+ private ADLSGen2PinotFS _adlsPinotFS;
+ private AutoCloseable _mocks;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ _adlsPinotFS = new ADLSGen2PinotFS(_mockFileSystemClient);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ _mocks.close();
+ }
+
+ private static PathItem mockPathItem(final String name, final boolean
isDirectory,
+ final long contentLength) {
+ final PathItem item = mock(PathItem.class);
+ when(item.getName()).thenReturn(name);
+ when(item.isDirectory()).thenReturn(isDirectory);
+ when(item.getContentLength()).thenReturn(contentLength);
+ when(item.getLastModified()).thenReturn(MTIME);
+ return item;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setupIterator(final PathItem... items) {
+ when(_mockFileSystemClient.listPaths(any(),
any())).thenReturn(_mockPagedIterable);
+ final Iterator<PathItem> iterator = Arrays.asList(items).iterator();
+ when(_mockPagedIterable.iterator()).thenReturn(iterator);
+ }
+
+ @Test
+ public void testAllMatch() throws IOException {
+ setupIterator(
+ mockPathItem("data/file1.parquet", false, 100),
+ mockPathItem("data/file2.parquet", false, 200),
+ mockPathItem("data/file3.parquet", false, 300)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 3);
+ assertEquals(result.get(0).getFilePath(), "/data/file1.parquet");
+ assertEquals(result.get(1).getFilePath(), "/data/file2.parquet");
+ assertEquals(result.get(2).getFilePath(), "/data/file3.parquet");
+ }
+
+ @Test
+ public void testEarlyTermination() throws IOException {
+ // 5 items, maxResults=3
+ setupIterator(
+ mockPathItem("data/f1.parquet", false, 100),
+ mockPathItem("data/f2.parquet", false, 200),
+ mockPathItem("data/f3.parquet", false, 300),
+ mockPathItem("data/f4.parquet", false, 400),
+ mockPathItem("data/f5.parquet", false, 500)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 3);
+
+ assertEquals(result.size(), 3);
+ assertEquals(result.get(2).getFilePath(), "/data/f3.parquet");
+ }
+
+ @Test
+ public void testMaxResultsOne() throws IOException {
+ setupIterator(
+ mockPathItem("data/f1.parquet", false, 100),
+ mockPathItem("data/f2.parquet", false, 200)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 1);
+
+ assertEquals(result.size(), 1);
+ }
+
+ @Test
+ public void testFilterPredicate() throws IOException {
+ setupIterator(
+ mockPathItem("data/file1.parquet", false, 100),
+ mockPathItem("data/file2.csv", false, 200),
+ mockPathItem("data/file3.parquet", false, 300),
+ mockPathItem("data/file4.json", false, 400)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, path -> path.endsWith(".parquet"), 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().allMatch(f ->
f.getFilePath().endsWith(".parquet")));
+ }
+
+ @Test
+ public void testDirectoriesSkipped() throws IOException {
+ setupIterator(
+ mockPathItem("data/subdir", true, 0),
+ mockPathItem("data/file1.parquet", false, 100),
+ mockPathItem("data/anotherdir", true, 0),
+ mockPathItem("data/file2.parquet", false, 200)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+ }
+
+ @Test
+ public void testFilterRejectsAll() throws IOException {
+ setupIterator(
+ mockPathItem("data/file1.parquet", false, 100),
+ mockPathItem("data/file2.parquet", false, 200)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, path -> false, 10);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testEmptyListing() throws IOException {
+ setupIterator();
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testFileMetadataAttributes() throws IOException {
+ setupIterator(
+ mockPathItem("data/file.parquet", false, 4096)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ final FileMetadata fm = result.get(0);
+ assertEquals(fm.getFilePath(), "/data/file.parquet");
+ assertEquals(fm.getLength(), 4096L);
+ assertEquals(fm.getLastModifiedTime(), MTIME.toInstant().toEpochMilli());
+ assertFalse(fm.isDirectory());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(expectedExceptions = IOException.class)
+ public void testDataLakeExceptionWrappedAsIOException() throws IOException {
+ when(_mockFileSystemClient.listPaths(any(), any()))
+ .thenThrow(mock(DataLakeStorageException.class));
+
+ _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 10);
+ }
+
+ @Test
+ public void testMaxResultsZeroReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, 0);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, ACCEPT_ALL, -3);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testEarlyTerminationWithFilterAndDirectories() throws
IOException {
+ // Mix of dirs, matching files, and non-matching files; maxResults=2
+ setupIterator(
+ mockPathItem("data/dir1", true, 0),
+ mockPathItem("data/file1.csv", false, 100),
+ mockPathItem("data/file2.parquet", false, 200),
+ mockPathItem("data/dir2", true, 0),
+ mockPathItem("data/file3.parquet", false, 300),
+ mockPathItem("data/file4.parquet", false, 400)
+ );
+
+ final List<FileMetadata> result = _adlsPinotFS.listFilesWithMetadata(
+ URI.create("abfss://[email protected]/data/"),
+ true, path -> path.endsWith(".parquet"), 2);
+
+ assertEquals(result.size(), 2);
+ assertEquals(result.get(0).getFilePath(), "/data/file2.parquet");
+ assertEquals(result.get(1).getFilePath(), "/data/file3.parquet");
+ }
+}
diff --git
a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
index 8742e090ab9..7e21689744c 100644
---
a/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -258,6 +259,60 @@ public class GcsPinotFS extends BasePinotFS {
return listedFiles;
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final
boolean recursive,
+ final Predicate<String> pathFilter, final int maxResults)
+ throws IOException {
+ if (maxResults <= 0) {
+ LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning
empty list", maxResults);
+ return new ArrayList<>();
+ }
+ final List<FileMetadata> result = new ArrayList<>();
+ final GcsUri gcsFileUri = new GcsUri(fileUri);
+ final String prefix = gcsFileUri.getPrefix();
+ final String bucketName = gcsFileUri.getBucketName();
+ try {
+ Page<Blob> page;
+ if (recursive) {
+ page = _storage.list(bucketName,
Storage.BlobListOption.prefix(prefix));
+ } else {
+ page = _storage.list(bucketName, Storage.BlobListOption.prefix(prefix),
+ Storage.BlobListOption.currentDirectory());
+ }
+ while (page != null && result.size() < maxResults) {
+ for (final Blob blob : page.getValues()) {
+ if (blob.getName().equals(prefix)) {
+ continue;
+ }
+ final boolean isDirectory =
blob.getName().endsWith(GcsUri.DELIMITER);
+ if (isDirectory) {
+ continue;
+ }
+ final String filePath = GcsUri.createGcsUri(bucketName,
blob.getName()).toString();
+ if (pathFilter.test(filePath)) {
+ final FileMetadata.Builder fileBuilder = new FileMetadata.Builder()
+ .setFilePath(filePath)
+ .setLength(blob.getSize())
+ .setIsDirectory(false);
+ if (blob.getUpdateTime() != null) {
+ fileBuilder.setLastModifiedTime(blob.getUpdateTime());
+ }
+ result.add(fileBuilder.build());
+ if (result.size() >= maxResults) {
+ break;
+ }
+ }
+ }
+ page = page.hasNextPage() ? page.getNextPage() : null;
+ }
+ } catch (Exception t) {
+ throw new IOException(t);
+ }
+ LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+ result.size(), maxResults, gcsFileUri, recursive);
+ return result;
+ }
+
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
diff --git
a/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..3d316536a92
--- /dev/null
+++
b/pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/GcsPinotFSPaginatedListTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for GcsPinotFS.listFilesWithMetadata(URI, boolean, Predicate,
int)
+ * — the paginated listing with early termination using Page<Blob>.
+ */
+public class GcsPinotFSPaginatedListTest {
+
+ private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+ private Storage _mockStorage;
+ private GcsPinotFS _gcsPinotFS;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ _mockStorage = mock(Storage.class);
+ _gcsPinotFS = new GcsPinotFS();
+ final Field storageField = GcsPinotFS.class.getDeclaredField("_storage");
+ storageField.setAccessible(true);
+ storageField.set(_gcsPinotFS, _mockStorage);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Page<Blob> mockPage(final List<Blob> blobs, final Page<Blob>
nextPage) {
+ final Page<Blob> page = mock(Page.class);
+ when(page.getValues()).thenReturn(blobs);
+ when(page.hasNextPage()).thenReturn(nextPage != null);
+ when(page.getNextPage()).thenReturn(nextPage);
+ return page;
+ }
+
+ private static Blob mockBlob(final String name, final long size, final Long
updateTime) {
+ final Blob blob = mock(Blob.class);
+ when(blob.getName()).thenReturn(name);
+ when(blob.getSize()).thenReturn(size);
+ when(blob.getUpdateTime()).thenReturn(updateTime);
+ return blob;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void stubStorageList(final Page<Blob> page) {
+ when(_mockStorage.list(anyString(), (Storage.BlobListOption[]) any()))
+ .thenReturn(page);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSinglePageAllMatch() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/file1.parquet", 100, 1000L),
+ mockBlob("data/file2.parquet", 200, 2000L)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 2);
+ assertEquals(result.get(0).getFilePath(),
"gs://bucket/data/file1.parquet");
+ assertEquals(result.get(1).getFilePath(),
"gs://bucket/data/file2.parquet");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testEarlyTerminationSkipsProcessingNextPage() throws IOException
{
+ // Page 1 has 3 blobs, Page 2 has 1 blob; maxResults=3
+ // Page 2's getValues() should never be called — while-loop exits because
result.size() == maxResults
+ final Page<Blob> page2 = mockPage(
+ Arrays.asList(mockBlob("data/f4.parquet", 400, 4000L)), null);
+ final Page<Blob> page1 = mockPage(Arrays.asList(
+ mockBlob("data/f1.parquet", 100, 1000L),
+ mockBlob("data/f2.parquet", 200, 2000L),
+ mockBlob("data/f3.parquet", 300, 3000L)
+ ), page2);
+ stubStorageList(page1);
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 3);
+
+ assertEquals(result.size(), 3);
+ // page2.getValues() should never be invoked: the while-loop exits before
iterating page 2
+ verify(page2, never()).getValues();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMultiplePagesNeeded() throws IOException {
+ final Page<Blob> page2 = mockPage(
+ Arrays.asList(mockBlob("data/f3.parquet", 300, 3000L)), null);
+ final Page<Blob> page1 = mockPage(Arrays.asList(
+ mockBlob("data/f1.parquet", 100, 1000L),
+ mockBlob("data/f2.parquet", 200, 2000L)
+ ), page2);
+ stubStorageList(page1);
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 3);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFilterPredicate() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/file1.parquet", 100, 1000L),
+ mockBlob("data/file2.csv", 200, 2000L),
+ mockBlob("data/file3.parquet", 300, 3000L)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true,
+ path -> path.endsWith(".parquet"), 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().allMatch(f ->
f.getFilePath().endsWith(".parquet")));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testDirectoriesSkipped() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/subdir/", 0, 1000L),
+ mockBlob("data/file1.parquet", 100, 2000L),
+ mockBlob("data/another/", 0, 3000L),
+ mockBlob("data/file2.parquet", 200, 4000L)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPrefixDirectoryMarkerSkipped() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/", 0, 1000L),
+ mockBlob("data/file1.parquet", 100, 2000L)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ assertEquals(result.get(0).getFilePath(),
"gs://bucket/data/file1.parquet");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNullUpdateTimeHandled() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/file1.parquet", 100, null)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ // lastModifiedTime defaults to 0 when updateTime is null
+ assertEquals(result.get(0).getLastModifiedTime(), 0L);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testEmptyBucket() throws IOException {
+ stubStorageList(mockPage(Collections.emptyList(), null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testMaxResultsZeroReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 0);
+
+ assertEquals(result.size(), 0);
+ verify(_mockStorage, never()).list(anyString(), (Storage.BlobListOption[])
any());
+ }
+
+ @Test
+ public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, -1);
+
+ assertEquals(result.size(), 0);
+ verify(_mockStorage, never()).list(anyString(), (Storage.BlobListOption[])
any());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFileMetadataAttributes() throws IOException {
+ final List<Blob> blobs = Arrays.asList(
+ mockBlob("data/file.parquet", 4096, 1705312200000L)
+ );
+ stubStorageList(mockPage(blobs, null));
+
+ final List<FileMetadata> result = _gcsPinotFS.listFilesWithMetadata(
+ URI.create("gs://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ final FileMetadata fm = result.get(0);
+ assertEquals(fm.getFilePath(), "gs://bucket/data/file.parquet");
+ assertEquals(fm.getLength(), 4096L);
+ assertEquals(fm.getLastModifiedTime(), 1705312200000L);
+ assertEquals(fm.isDirectory(), false);
+ }
+}
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 2700ea46e3f..494f9d6c5d4 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
@@ -748,6 +749,70 @@ public class S3PinotFS extends BasePinotFS {
return listedFiles;
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final
boolean recursive,
+ final Predicate<String> pathFilter, final int maxResults)
+ throws IOException {
+ if (maxResults <= 0) {
+ LOGGER.warn("listFilesWithMetadata called with maxResults={}, returning
empty list", maxResults);
+ return new ArrayList<>();
+ }
+ final List<FileMetadata> result = new ArrayList<>();
+ final String scheme = fileUri.getScheme();
+ Preconditions.checkArgument(scheme.equals(S3_SCHEME) ||
scheme.equals(S3A_SCHEME));
+ try {
+ String continuationToken = null;
+ boolean isDone = false;
+ final String prefix = normalizeToDirectoryPrefix(fileUri);
+ while (!isDone && result.size() < maxResults) {
+ ListObjectsV2Request.Builder listObjectsV2RequestBuilder =
+ ListObjectsV2Request.builder().bucket(fileUri.getHost());
+ if (!prefix.equals(DELIMITER)) {
+ listObjectsV2RequestBuilder =
listObjectsV2RequestBuilder.prefix(prefix);
+ }
+ if (!recursive) {
+ listObjectsV2RequestBuilder =
listObjectsV2RequestBuilder.delimiter(DELIMITER);
+ }
+ if (continuationToken != null) {
+ listObjectsV2RequestBuilder.continuationToken(continuationToken);
+ }
+ final ListObjectsV2Request listObjectsV2Request =
listObjectsV2RequestBuilder.build();
+ LOGGER.debug("Trying to send ListObjectsV2Request {}",
listObjectsV2Request);
+ final ListObjectsV2Response listObjectsV2Response =
retryWithS3CredentialRefresh(() ->
+ _s3Client.listObjectsV2(listObjectsV2Request));
+ for (final S3Object s3Object : listObjectsV2Response.contents()) {
+ if (s3Object.key().equals(fileUri.getPath())) {
+ continue;
+ }
+ final boolean isDirectory = s3Object.key().endsWith(DELIMITER);
+ if (isDirectory) {
+ continue;
+ }
+ final String filePath =
+ scheme + SCHEME_SEPARATOR + fileUri.getHost() + DELIMITER +
getNormalizedFileKey(s3Object);
+ if (pathFilter.test(filePath)) {
+ result.add(new FileMetadata.Builder()
+ .setFilePath(filePath)
+ .setLastModifiedTime(s3Object.lastModified().toEpochMilli())
+ .setLength(s3Object.size())
+ .setIsDirectory(false)
+ .build());
+ if (result.size() >= maxResults) {
+ break;
+ }
+ }
+ }
+ isDone = !listObjectsV2Response.isTruncated();
+ continuationToken = listObjectsV2Response.nextContinuationToken();
+ }
+ } catch (Throwable t) {
+ throw new IOException(t);
+ }
+ LOGGER.info("Listed {} files (max: {}) from URI: {}, is recursive: {}",
+ result.size(), maxResults, fileUri, recursive);
+ return result;
+ }
+
private static String getNormalizedFileKey(S3Object s3Object) {
String fileKey = s3Object.key();
if (fileKey.startsWith(DELIMITER)) {
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..a071440d3f0
--- /dev/null
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSPaginatedListTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.mockito.ArgumentCaptor;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Unit tests for S3PinotFS.listFilesWithMetadata(URI, boolean, Predicate, int)
+ * — the paginated listing with early termination.
+ */
+public class S3PinotFSPaginatedListTest {
+
+ private static final Instant NOW = Instant.now();
+ private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+ private S3Client _mockS3Client;
+ private S3PinotFS _s3PinotFS;
+
+ @BeforeMethod
+ public void setUp() {
+ _mockS3Client = mock(S3Client.class);
+ _s3PinotFS = new S3PinotFS();
+ _s3PinotFS.init(_mockS3Client);
+ }
+
+ private static S3Object s3obj(final String key, final long size) {
+ return S3Object.builder()
+ .key(key)
+ .size(size)
+ .lastModified(NOW)
+ .build();
+ }
+
+ private static ListObjectsV2Response page(final List<S3Object> objects,
+ final boolean truncated, final String nextToken) {
+ final ListObjectsV2Response.Builder builder =
ListObjectsV2Response.builder()
+ .contents(objects)
+ .isTruncated(truncated);
+ if (nextToken != null) {
+ builder.nextContinuationToken(nextToken);
+ }
+ return builder.build();
+ }
+
+ @Test
+ public void testSinglePageAllMatch() throws IOException {
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/file2.parquet", 200),
+ s3obj("data/file3.parquet", 300)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 3);
+ assertEquals(result.get(0).getFilePath(),
"s3://bucket/data/file1.parquet");
+ assertEquals(result.get(1).getFilePath(),
"s3://bucket/data/file2.parquet");
+ assertEquals(result.get(2).getFilePath(),
"s3://bucket/data/file3.parquet");
+ verify(_mockS3Client,
times(1)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testSinglePageWithFilter() throws IOException {
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/file2.csv", 200),
+ s3obj("data/file3.parquet", 300),
+ s3obj("data/file4.json", 400)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true,
+ path -> path.endsWith(".parquet"), 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().allMatch(f ->
f.getFilePath().endsWith(".parquet")));
+ }
+
+ @Test
+ public void testEarlyTerminationAcrossPages() throws IOException {
+ // 3 pages with 3 files each; maxResults=5 should stop after page 2
+ final List<S3Object> page1 = Arrays.asList(
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/file2.parquet", 200),
+ s3obj("data/file3.parquet", 300)
+ );
+ final List<S3Object> page2 = Arrays.asList(
+ s3obj("data/file4.parquet", 400),
+ s3obj("data/file5.parquet", 500),
+ s3obj("data/file6.parquet", 600)
+ );
+ // page 3 should never be fetched
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(page1, true, "token1"))
+ .thenReturn(page(page2, true, "token2"));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 5);
+
+ assertEquals(result.size(), 5);
+ // Only 2 S3 API calls, NOT 3
+ verify(_mockS3Client,
times(2)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testEarlyTerminationWithinPage() throws IOException {
+ // 10 files on a single page; maxResults=3 should stop mid-page
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/f1.parquet", 100),
+ s3obj("data/f2.parquet", 200),
+ s3obj("data/f3.parquet", 300),
+ s3obj("data/f4.parquet", 400),
+ s3obj("data/f5.parquet", 500)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 3);
+
+ assertEquals(result.size(), 3);
+ assertEquals(result.get(0).getFilePath(), "s3://bucket/data/f1.parquet");
+ assertEquals(result.get(2).getFilePath(), "s3://bucket/data/f3.parquet");
+ }
+
+ @Test
+ public void testMaxResultsOneStopsImmediately() throws IOException {
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/file2.parquet", 200)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, true, "token1"));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 1);
+
+ assertEquals(result.size(), 1);
+ // Only 1 API call despite truncated=true
+ verify(_mockS3Client,
times(1)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testContinuationTokenPassedCorrectly() throws IOException {
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(Arrays.asList(s3obj("data/f1.parquet", 100)), true,
"abc123"))
+ .thenReturn(page(Arrays.asList(s3obj("data/f2.parquet", 200)), false,
null));
+
+ _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ final ArgumentCaptor<ListObjectsV2Request> captor =
+ ArgumentCaptor.forClass(ListObjectsV2Request.class);
+ verify(_mockS3Client, times(2)).listObjectsV2(captor.capture());
+ // First call: no continuation token
+ assertTrue(captor.getAllValues().get(0).continuationToken() == null);
+ // Second call: has the token from page 1
+ assertEquals(captor.getAllValues().get(1).continuationToken(), "abc123");
+ }
+
+ @Test
+ public void testDirectoriesSkipped() throws IOException {
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/subdir/", 0),
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/another-dir/", 0),
+ s3obj("data/file2.parquet", 200)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 2);
+ assertTrue(result.stream().noneMatch(FileMetadata::isDirectory));
+ }
+
+ @Test
+ public void testFilterRejectsAll() throws IOException {
+ final List<S3Object> objects = Arrays.asList(
+ s3obj("data/file1.parquet", 100),
+ s3obj("data/file2.parquet", 200)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, path -> false, 10);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testEmptyBucket() throws IOException {
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(Collections.emptyList(), false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testFileMetadataAttributes() throws IOException {
+ final Instant mtime = Instant.parse("2026-01-15T10:30:00Z");
+ final S3Object obj = S3Object.builder()
+ .key("data/file.parquet")
+ .size(4096L)
+ .lastModified(mtime)
+ .build();
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(Arrays.asList(obj), false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ final FileMetadata fm = result.get(0);
+ assertEquals(fm.getFilePath(), "s3://bucket/data/file.parquet");
+ assertEquals(fm.getLength(), 4096L);
+ assertEquals(fm.getLastModifiedTime(), mtime.toEpochMilli());
+ assertEquals(fm.isDirectory(), false);
+ }
+
+ @Test
+ public void testS3aScheme() throws IOException {
+ final List<S3Object> objects = Arrays.asList(s3obj("data/file.parquet",
100));
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(objects, false, null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3a://bucket/data/"), true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 1);
+ assertTrue(result.get(0).getFilePath().startsWith("s3a://"));
+ }
+
+ @Test
+ public void testAllPagesNeededWhenMaxResultsHigh() throws IOException {
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(Arrays.asList(s3obj("data/f1.parquet", 100)), true,
"t1"))
+ .thenReturn(page(Arrays.asList(s3obj("data/f2.parquet", 200)), true,
"t2"))
+ .thenReturn(page(Arrays.asList(s3obj("data/f3.parquet", 300)), false,
null));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 100);
+
+ assertEquals(result.size(), 3);
+ verify(_mockS3Client,
times(3)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testFilterWithEarlyTerminationAcrossPages() throws IOException {
+ // Page 1: 2 parquet + 1 csv; Page 2: 2 parquet + 1 csv; maxResults=3 for
parquet only
+ final List<S3Object> page1 = Arrays.asList(
+ s3obj("data/a.parquet", 100),
+ s3obj("data/b.csv", 200),
+ s3obj("data/c.parquet", 300)
+ );
+ final List<S3Object> page2 = Arrays.asList(
+ s3obj("data/d.csv", 400),
+ s3obj("data/e.parquet", 500),
+ s3obj("data/f.csv", 600)
+ );
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(page1, true, "t1"))
+ .thenReturn(page(page2, true, "t2"));
+
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true,
+ path -> path.endsWith(".parquet"), 3);
+
+ assertEquals(result.size(), 3);
+ assertTrue(result.stream().allMatch(f ->
f.getFilePath().endsWith(".parquet")));
+ // 2 pages fetched (2 from page1 + 1 from page2 = 3 matches)
+ verify(_mockS3Client,
times(2)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testMaxResultsZeroReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, 0);
+
+ assertEquals(result.size(), 0);
+ verify(_mockS3Client,
times(0)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testMaxResultsNegativeReturnsEmpty() throws IOException {
+ final List<FileMetadata> result = _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/"), true, ACCEPT_ALL, -5);
+
+ assertEquals(result.size(), 0);
+ verify(_mockS3Client,
times(0)).listObjectsV2(any(ListObjectsV2Request.class));
+ }
+
+ @Test
+ public void testPrefixSentToS3() throws IOException {
+ when(_mockS3Client.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(page(Collections.emptyList(), false, null));
+
+ _s3PinotFS.listFilesWithMetadata(
+ URI.create("s3://bucket/data/2026/01/"), true, ACCEPT_ALL, 10);
+
+ final ArgumentCaptor<ListObjectsV2Request> captor =
+ ArgumentCaptor.forClass(ListObjectsV2Request.class);
+ verify(_mockS3Client).listObjectsV2(captor.capture());
+ assertEquals(captor.getValue().bucket(), "bucket");
+ assertEquals(captor.getValue().prefix(), "data/2026/01/");
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
index de29fe4cb80..860806059cf 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/NoClosePinotFS.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -98,6 +99,13 @@ public class NoClosePinotFS implements PinotFS {
return _delegate.listFilesWithMetadata(fileUri, recursive);
}
+ @Override
+ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean
recursive,
+ Predicate<String> pathFilter, int maxResults)
+ throws IOException {
+ return _delegate.listFilesWithMetadata(fileUri, recursive, pathFilter,
maxResults);
+ }
+
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 863df527f9c..2d26e5d2035 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -24,7 +24,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -180,6 +182,37 @@ public interface PinotFS extends Closeable, Serializable {
throw new UnsupportedOperationException();
}
+ /**
+ * Lists files with metadata, applying a path filter and stopping after
{@code maxResults} matches.
+ * Cloud storage implementations (S3, GCS, ADLS) should override this to
fetch pages lazily
+ * and terminate early once enough matching files are found — avoiding
loading all objects into memory.
+ *
+ * @param fileUri location to list files from
+ * @param recursive if we want to list files recursively
+ * @param pathFilter predicate applied to each file path; only files passing
this filter are included
+ * @param maxResults maximum number of matching (non-directory) files to
return
+ * @return a list of FileMetadata capped at maxResults
+ * @throws IOException on IO failure
+ */
+ default List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean
recursive,
+ Predicate<String> pathFilter, int maxResults)
+ throws IOException {
+ if (maxResults <= 0) {
+ return new ArrayList<>();
+ }
+ final List<FileMetadata> all = listFilesWithMetadata(fileUri, recursive);
+ final List<FileMetadata> result = new ArrayList<>();
+ for (final FileMetadata fm : all) {
+ if (!fm.isDirectory() && pathFilter.test(fm.getFilePath())) {
+ result.add(fm);
+ if (result.size() >= maxResults) {
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
/**
* Copies a file from a remote filesystem to the local one. Keeps the
original file.
* @param srcUri location of current file on remote filesystem (must not be
a directory)
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
new file mode 100644
index 00000000000..017c3487153
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/PinotFSPaginatedListTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.filesystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Tests for the 4-arg listFilesWithMetadata: default method guard clause and
NoClosePinotFS delegation.
+ */
+public class PinotFSPaginatedListTest {
+
+ private static final URI TEST_URI = URI.create("s3://bucket/data/");
+ private static final Predicate<String> ACCEPT_ALL = path -> true;
+
+ // --- PinotFS default method: maxResults <= 0 guard ---
+
+ @Test
+ public void testDefaultMethodMaxResultsZeroReturnsEmpty() throws IOException
{
+ final PinotFS mockFS = mock(PinotFS.class);
+ when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(),
anyInt()))
+ .thenCallRealMethod();
+
+ final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI,
true, ACCEPT_ALL, 0);
+
+ assertEquals(result.size(), 0);
+ // 2-arg method should never be called when maxResults <= 0
+ verify(mockFS, never()).listFilesWithMetadata(any(URI.class),
anyBoolean());
+ }
+
+ @Test
+ public void testDefaultMethodMaxResultsNegativeReturnsEmpty() throws
IOException {
+ final PinotFS mockFS = mock(PinotFS.class);
+ when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(),
anyInt()))
+ .thenCallRealMethod();
+
+ final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI,
true, ACCEPT_ALL, -10);
+
+ assertEquals(result.size(), 0);
+ verify(mockFS, never()).listFilesWithMetadata(any(URI.class),
anyBoolean());
+ }
+
+ @Test
+ public void testDefaultMethodPositiveMaxResultsDelegatesToTwoArg() throws
IOException {
+ final PinotFS mockFS = mock(PinotFS.class);
+ when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean(), any(),
anyInt()))
+ .thenCallRealMethod();
+ when(mockFS.listFilesWithMetadata(any(URI.class), anyBoolean()))
+ .thenReturn(Arrays.asList(
+ new
FileMetadata.Builder().setFilePath("s3://bucket/data/f1.parquet")
+ .setLength(100).setIsDirectory(false).build(),
+ new
FileMetadata.Builder().setFilePath("s3://bucket/data/f2.parquet")
+ .setLength(200).setIsDirectory(false).build()
+ ));
+
+ final List<FileMetadata> result = mockFS.listFilesWithMetadata(TEST_URI,
true, ACCEPT_ALL, 10);
+
+ assertEquals(result.size(), 2);
+ verify(mockFS).listFilesWithMetadata(any(URI.class), anyBoolean());
+ }
+
+ // --- NoClosePinotFS delegation ---
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNoClosePinotFSDelegatesToUnderlyingFS() throws IOException {
+ final PinotFS mockDelegate = mock(PinotFS.class);
+ final List<FileMetadata> expected = Arrays.asList(
+ new FileMetadata.Builder().setFilePath("s3://bucket/data/f1.parquet")
+ .setLength(100).setIsDirectory(false).build()
+ );
+ when(mockDelegate.listFilesWithMetadata(any(URI.class), anyBoolean(),
any(), anyInt()))
+ .thenReturn(expected);
+
+ final NoClosePinotFS noCloseFS = new NoClosePinotFS(mockDelegate);
+ final Predicate<String> filter = path -> path.endsWith(".parquet");
+ final List<FileMetadata> result =
noCloseFS.listFilesWithMetadata(TEST_URI, true, filter, 5);
+
+ assertEquals(result, expected);
+ verify(mockDelegate).listFilesWithMetadata(TEST_URI, true, filter, 5);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testNoClosePinotFSPassesExactArguments() throws IOException {
+ final PinotFS mockDelegate = mock(PinotFS.class);
+ when(mockDelegate.listFilesWithMetadata(any(URI.class), anyBoolean(),
any(), anyInt()))
+ .thenReturn(Arrays.asList());
+
+ final NoClosePinotFS noCloseFS = new NoClosePinotFS(mockDelegate);
+ final URI uri = URI.create("gs://my-bucket/prefix/");
+ final Predicate<String> filter = path -> false;
+ noCloseFS.listFilesWithMetadata(uri, false, filter, 42);
+
+ verify(mockDelegate).listFilesWithMetadata(uri, false, filter, 42);
+ // 2-arg should NOT be called — delegation should go directly to the
optimized 4-arg on delegate
+ verify(mockDelegate, never()).listFilesWithMetadata(any(URI.class),
anyBoolean());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]