This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2f1f0b770ac feat: S3 SegmentRangeReader for partial segment downloads
(#19560)
2f1f0b770ac is described below
commit 2f1f0b770ac3d289b8c2782bc0fc10865fce9737
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jun 9 11:06:19 2026 -0700
feat: S3 SegmentRangeReader for partial segment downloads (#19560)
changes:
*
adds new `S3SegmentRangeReader` that wraps `ServerSideEncryptingAmazonS3` +
bucket + key prefix and issues closed-range `GetObjectRequests` against
`keyPrefix + filename`. Returned stream is wrapped in a `RetryingInputStream`
with the `S3Utils.S3RETRY` p [...]
* New `rangeable` boolean on `S3LoadSpec` stamped by the pusher at write
time. `S3LoadSpec.openRangeReader()` returns a reader iff the flag is true and
the key isn't .zip
* `S3DataSegmentPusher.pushNoZip` stamps rangeable=true when binaryVersion
is `V10_VERSION`, false otherwise. `pushZip` omits the field
---
extensions-core/s3-extensions/pom.xml | 10 +
.../druid/storage/s3/S3DataSegmentPusher.java | 29 ++-
.../org/apache/druid/storage/s3/S3LoadSpec.java | 43 +++-
.../druid/storage/s3/S3SegmentRangeReader.java | 151 +++++++++++++
.../druid/storage/s3/S3DataSegmentPusherTest.java | 97 ++++++++-
.../apache/druid/storage/s3/S3LoadSpecTest.java | 79 +++++++
.../druid/storage/s3/S3SegmentRangeReaderTest.java | 240 +++++++++++++++++++++
7 files changed, 638 insertions(+), 11 deletions(-)
diff --git a/extensions-core/s3-extensions/pom.xml
b/extensions-core/s3-extensions/pom.xml
index 15c8fd1773f..4bd06a6c655 100644
--- a/extensions-core/s3-extensions/pom.xml
+++ b/extensions-core/s3-extensions/pom.xml
@@ -216,6 +216,16 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-minio</artifactId>
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
index bd9176842ab..12d2cde8f65 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
@@ -24,6 +24,7 @@ import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
@@ -135,9 +136,13 @@ public class S3DataSegmentPusher implements
DataSegmentPusher
}
}
+ final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir);
+ // V10 unzipped is rangeable: a single druid.segment with a range-readable
header. V9 unzipped is a directory of
+ // separate smoosh files the range-read path can't consume.
+ final boolean rangeable = binaryVersion == IndexIO.V10_VERSION;
return baseSegment.withSize(size)
- .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path))
-
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+ .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path,
rangeable))
+ .withBinaryVersion(binaryVersion);
}
@Override
@@ -165,6 +170,26 @@ public class S3DataSegmentPusher implements
DataSegmentPusher
);
}
+ /**
+ * Variant that stamps {@link S3LoadSpec#RANGEABLE} so {@link
S3LoadSpec#openRangeReader()} can decide range-read
+ * eligibility. Used by the unzipped push path where the binary version is
known at write time.
+ */
+ private Map<String, Object> makeLoadSpec(String bucket, String key, boolean
rangeable)
+ {
+ return ImmutableMap.of(
+ "type",
+ "s3_zip",
+ "bucket",
+ bucket,
+ "key",
+ key,
+ "S3Schema",
+ "s3n",
+ S3LoadSpec.RANGEABLE,
+ rangeable
+ );
+ }
+
private static IOException handlePushServiceException(S3Exception e, long
indexSize)
{
if (S3Utils.ERROR_ENTITY_TOO_LARGE.equals(S3Utils.getS3ErrorCode(e))) {
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
index 5338abfe414..ec8fca91374 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java
@@ -21,13 +21,17 @@ package org.apache.druid.storage.s3;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.apache.druid.utils.CompressionUtils;
+import javax.annotation.Nullable;
import java.io.File;
/**
@@ -36,22 +40,33 @@ import java.io.File;
@JsonTypeName(S3StorageDruidModule.SCHEME_S3_ZIP)
public class S3LoadSpec implements LoadSpec
{
+ static final String RANGEABLE = "rangeable";
+
private final String bucket;
private final String key;
+ /**
+ * Stamped at push time when {@link S3DataSegmentPusher} writes a segment in
a layout that supports byte-range reads
+ * Only {@code Boolean.TRUE} enables {@link #openRangeReader()}; absence or
{@code false} means full-download.
+ */
+ @Nullable
+ private final Boolean rangeable;
+
private final S3DataSegmentPuller puller;
@JsonCreator
public S3LoadSpec(
@JacksonInject S3DataSegmentPuller puller,
@JsonProperty(S3DataSegmentPuller.BUCKET) String bucket,
- @JsonProperty(S3DataSegmentPuller.KEY) String key
+ @JsonProperty(S3DataSegmentPuller.KEY) String key,
+ @JsonProperty(RANGEABLE) @Nullable Boolean rangeable
)
{
Preconditions.checkNotNull(bucket);
Preconditions.checkNotNull(key);
this.bucket = bucket;
this.key = key;
+ this.rangeable = rangeable;
this.puller = puller;
}
@@ -61,6 +76,20 @@ public class S3LoadSpec implements LoadSpec
return new LoadSpecResult(puller.getSegmentFiles(new
CloudObjectLocation(bucket, key), outDir).size());
}
+ /**
+ * Returns a {@link S3SegmentRangeReader} when the segment was stamped
{@link #rangeable} {@code true} at push time
+ * and isn't a zip; otherwise {@code null}.
+ */
+ @Nullable
+ @Override
+ public SegmentRangeReader openRangeReader()
+ {
+ if (CompressionUtils.isZip(key) || !Boolean.TRUE.equals(rangeable)) {
+ return null;
+ }
+ return new S3SegmentRangeReader(puller.s3Client, bucket, key);
+ }
+
@JsonProperty(S3DataSegmentPuller.BUCKET)
public String getBucket()
{
@@ -72,4 +101,16 @@ public class S3LoadSpec implements LoadSpec
{
return key;
}
+
+ /**
+ * Returns the range-reads-supported flag stamped at push time, or {@code
null} for legacy segments pushed before
+ * this field existed (which will load via the full-download path).
+ */
+ @JsonProperty(RANGEABLE)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Boolean getRangeable()
+ {
+ return rangeable;
+ }
}
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
new file mode 100644
index 00000000000..9f88380c575
--- /dev/null
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3SegmentRangeReader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.RetryingInputStream;
+import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link SegmentRangeReader} backed by S3 HTTP {@code Range} requests. The
segment is expected to be stored as raw
+ * (unzipped) files under a common key prefix, i.e. the layout produced by
{@code S3DataSegmentPusher.pushNoZip} where
+ * each segment file is uploaded as {@code keyPrefix + file.getName()}. Each
{@link #readRange} call resolves the
+ * target object key as {@code keyPrefix + filename} and issues a closed
byte-range GET
+ * ({@code bytes=offset-(offset+length-1)}).
+ * <p>
+ * The returned stream is wrapped in a {@link RetryingInputStream} with the
{@link S3Utils#S3RETRY} predicate, the
+ * same retry policy {@link S3DataSegmentPuller} uses for full-segment
downloads. Segment loading from deep storage
+ * needs retry semantics built into the reader so callers don't each reinvent
it. The retrying wrapper reopens at the
+ * byte offset where it failed, so a transient mid-stream error becomes a
fresh range request for the remaining bytes
+ * rather than restarting the whole read.
+ */
+public class S3SegmentRangeReader implements SegmentRangeReader
+{
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final String bucket;
+ private final String keyPrefix;
+
+ public S3SegmentRangeReader(ServerSideEncryptingAmazonS3 s3Client, String
bucket, String keyPrefix)
+ {
+ this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
+ this.bucket = Preconditions.checkNotNull(bucket, "bucket");
+ this.keyPrefix = Preconditions.checkNotNull(keyPrefix, "keyPrefix");
+ }
+
+ @Override
+ public InputStream readRange(String filename, long offset, long length)
throws IOException
+ {
+ Preconditions.checkNotNull(filename, "filename");
+ Preconditions.checkArgument(offset >= 0, "offset must be non-negative, got
[%s]", offset);
+ Preconditions.checkArgument(length >= 0, "length must be non-negative, got
[%s]", length);
+
+ if (length == 0) {
+ // SegmentFileBuilderV10 allows zero-length internal-file entries,
short-circuit
+ return new ByteArrayInputStream(new byte[0]);
+ }
+ return new RetryingInputStream<>(
+ new RangeRequest(keyPrefix + filename, offset, length),
+ new RangeOpenFunction(s3Client, bucket),
+ S3Utils.S3RETRY,
+ null
+ );
+ }
+
+ /**
+ * Immutable description of a range read. Held as the {@code object} of
{@link RetryingInputStream} so retries can
+ * reopen with knowledge of the original offset and length without
rebuilding the request from scratch.
+ */
+ private static final class RangeRequest
+ {
+ final String objectKey;
+ final long offset;
+ final long length;
+
+ RangeRequest(String objectKey, long offset, long length)
+ {
+ this.objectKey = objectKey;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ /**
+ * Opens (or reopens, on retry) an S3 range read for a {@link RangeRequest}.
The {@code start} argument is the
+ * number of bytes already successfully consumed from the logical stream, so
the absolute S3 byte range is
+ * {@code [request.offset + start, request.offset + request.length - 1]}.
+ */
+ private static final class RangeOpenFunction implements
ObjectOpenFunction<RangeRequest>
+ {
+ private final ServerSideEncryptingAmazonS3 s3Client;
+ private final String bucket;
+
+ RangeOpenFunction(ServerSideEncryptingAmazonS3 s3Client, String bucket)
+ {
+ this.s3Client = s3Client;
+ this.bucket = bucket;
+ }
+
+ @Override
+ public InputStream open(RangeRequest request) throws IOException
+ {
+ return open(request, 0L);
+ }
+
+ @Override
+ public InputStream open(RangeRequest request, long start) throws
IOException
+ {
+ final long absoluteStart = request.offset + start;
+ final long absoluteEnd = request.offset + request.length - 1;
+ if (absoluteStart > absoluteEnd) {
+ // Logically nothing left to read, only reachable if a retry fires
after the consumer drained the entire
+ // range successfully, which shouldn't happen in practice. Returning
empty keeps us robust either way.
+ return new ByteArrayInputStream(new byte[0]);
+ }
+ final GetObjectRequest.Builder requestBuilder =
GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(request.objectKey)
+ .range(AwsBytesRange.of(absoluteStart, absoluteEnd).getBytesRange());
+ try {
+ final InputStream s3Object = s3Client.getObject(requestBuilder);
+ if (s3Object == null) {
+ throw new ISE(
+ "Failed to get s3 object for bucket[%s], key[%s],
range[bytes=%d-%d]",
+ bucket,
+ request.objectKey,
+ absoluteStart,
+ absoluteEnd
+ );
+ }
+ return s3Object;
+ }
+ catch (S3Exception e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
index 6bde494d900..590a36524cf 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherTest.java
@@ -115,7 +115,80 @@ public class S3DataSegmentPusherTest
};
config.setBucket("bucket");
config.setBaseKey("key");
- validate(false,
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/", s3Client,
config);
+ DataSegment segment = validate(
+ false,
+ "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/",
+ s3Client,
+ config,
+ new byte[]{0x0, 0x0, 0x0, 0x1}
+ );
+ // V1 (test fixture) → not V10 → rangeable stamped as false (skips legacy
HEAD probe).
+ Assert.assertEquals(Boolean.FALSE, segment.getLoadSpec().get("rangeable"));
+ }
+
+ @Test
+ public void testPushNoZipV10StampsRangeableTrue() throws Exception
+ {
+ ServerSideEncryptingAmazonS3 s3Client =
EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
+
+ Grant grant = Grant.builder()
+
.grantee(Grantee.builder().id("ownerId").type(Type.CANONICAL_USER).build())
+ .permission(Permission.FULL_CONTROL)
+ .build();
+
EasyMock.expect(s3Client.getBucketOwnerGrant(EasyMock.eq("bucket"))).andReturn(grant).once();
+
+ s3Client.upload(EasyMock.anyString(), EasyMock.anyString(),
EasyMock.anyObject(File.class), EasyMock.anyObject(Grant.class));
+ EasyMock.expectLastCall().once();
+
+ EasyMock.replay(s3Client);
+
+ S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig()
+ {
+ @Override
+ public boolean isZip()
+ {
+ return false;
+ }
+ };
+ config.setBucket("bucket");
+ config.setBaseKey("key");
+
+ // version.bin = [0, 0, 0, 0x0A] → IndexIO.V10_VERSION
+ DataSegment segment = validate(
+ false,
+ "key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/",
+ s3Client,
+ config,
+ new byte[]{0x0, 0x0, 0x0, 0x0A}
+ );
+ Assert.assertEquals(10, (int) segment.getBinaryVersion());
+ Assert.assertEquals(Boolean.TRUE, segment.getLoadSpec().get("rangeable"));
+ }
+
+ @Test
+ public void testPushZipDoesNotStampRangeable() throws Exception
+ {
+ // Zip path uses the no-flag makeLoadSpec overload; openRangeReader
returns null on the zip-key short circuit
+ // regardless, but we keep the loadSpec JSON compact for zipped segments
by omitting the field entirely.
+ ServerSideEncryptingAmazonS3 s3Client =
EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
+
+ Grant grant = Grant.builder()
+
.grantee(Grantee.builder().id("ownerId").type(Type.CANONICAL_USER).build())
+ .permission(Permission.FULL_CONTROL)
+ .build();
+
EasyMock.expect(s3Client.getBucketOwnerGrant(EasyMock.eq("bucket"))).andReturn(grant).once();
+
+ s3Client.upload(EasyMock.anyString(), EasyMock.anyString(),
EasyMock.anyObject(File.class), EasyMock.anyObject(Grant.class));
+ EasyMock.expectLastCall().once();
+
+ EasyMock.replay(s3Client);
+
+ DataSegment segment = validate(
+ false,
+
"key/foo/2015-01-01T00:00:00\\.000Z_2016-01-01T00:00:00\\.000Z/0/0/index\\.zip",
+ s3Client
+ );
+ Assert.assertFalse(segment.getLoadSpec().containsKey("rangeable"));
}
private void testPushInternal(boolean useUniquePath, String matcher) throws
Exception
@@ -162,24 +235,32 @@ public class S3DataSegmentPusherTest
validate(useUniquePath, matcher, s3Client);
}
- private void validate(boolean useUniquePath, String matcher,
ServerSideEncryptingAmazonS3 s3Client) throws IOException
+ private DataSegment validate(boolean useUniquePath, String matcher,
ServerSideEncryptingAmazonS3 s3Client) throws IOException
{
S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
config.setBucket("bucket");
config.setBaseKey("key");
- validate(useUniquePath, matcher, s3Client, config);
+ // Default version.bin is V1 for historical reasons.
+ DataSegment segment = validate(useUniquePath, matcher, s3Client, config,
new byte[]{0x0, 0x0, 0x0, 0x1});
+ Assert.assertEquals(1, (int) segment.getBinaryVersion());
+ return segment;
}
- private void validate(boolean useUniquePath, String matcher,
ServerSideEncryptingAmazonS3 s3Client, S3DataSegmentPusherConfig config) throws
IOException
+ private DataSegment validate(
+ boolean useUniquePath,
+ String matcher,
+ ServerSideEncryptingAmazonS3 s3Client,
+ S3DataSegmentPusherConfig config,
+ byte[] versionBytes
+ ) throws IOException
{
S3DataSegmentPusher pusher = new S3DataSegmentPusher(s3Client, config);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
- final byte[] data = new byte[]{0x0, 0x0, 0x0, 0x1};
- Files.write(data, tmp);
- final long size = data.length;
+ Files.write(versionBytes, tmp);
+ final long size = versionBytes.length;
DataSegment segmentToPush = new DataSegment(
"foo",
@@ -196,7 +277,6 @@ public class S3DataSegmentPusherTest
DataSegment segment = pusher.push(tempFolder.getRoot(), segmentToPush,
useUniquePath);
Assert.assertEquals(segmentToPush.getSize(), segment.getSize());
- Assert.assertEquals(1, (int) segment.getBinaryVersion());
Assert.assertEquals("bucket", segment.getLoadSpec().get("bucket"));
Assert.assertTrue(
segment.getLoadSpec().get("key").toString(),
@@ -205,5 +285,6 @@ public class S3DataSegmentPusherTest
Assert.assertEquals("s3_zip", segment.getLoadSpec().get("type"));
EasyMock.verify(s3Client);
+ return segment;
}
}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
new file mode 100644
index 00000000000..9280cdf068c
--- /dev/null
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3LoadSpecTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+@ExtendWith(MockitoExtension.class)
+public class S3LoadSpecTest
+{
+ private static final String BUCKET = "test-bucket";
+ private static final String RAW_KEY = "path/to/segment/";
+ private static final String ZIP_KEY = "path/to/index.zip";
+
+ @Mock
+ private ServerSideEncryptingAmazonS3 s3Client;
+
+ @Test
+ public void testOpenRangeReaderReturnsReaderWhenRangeableTrue()
+ {
+ final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client),
BUCKET, RAW_KEY, true);
+ final SegmentRangeReader reader = spec.openRangeReader();
+ assertNotNull(reader);
+ assertInstanceOf(S3SegmentRangeReader.class, reader);
+ verifyNoInteractions(s3Client);
+ }
+
+ @Test
+ public void testOpenRangeReaderReturnsNullForZipKeyEvenWhenRangeableTrue()
+ {
+ // Defensive: a zip key can't be range-read; the zip check wins over the
flag even if hand-crafted input claims
+ // the layout is rangeable.
+ final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client),
BUCKET, ZIP_KEY, true);
+ assertNull(spec.openRangeReader());
+ verifyNoInteractions(s3Client);
+ }
+
+ @Test
+ public void testOpenRangeReaderReturnsNullWhenRangeableFalse()
+ {
+ final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client),
BUCKET, RAW_KEY, false);
+ assertNull(spec.openRangeReader());
+ verifyNoInteractions(s3Client);
+ }
+
+ @Test
+ public void testOpenRangeReaderReturnsNullForLegacySegmentWithoutFlag()
+ {
+ // Legacy segment (pushed before this field existed) → null flag →
full-download path.
+ final S3LoadSpec spec = new S3LoadSpec(new S3DataSegmentPuller(s3Client),
BUCKET, RAW_KEY, null);
+ assertNull(spec.openRangeReader());
+ verifyNoInteractions(s3Client);
+ }
+}
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
new file mode 100644
index 00000000000..b219fb79392
--- /dev/null
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3SegmentRangeReaderTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.storage.s3;
+
+import com.google.common.io.ByteStreams;
+import org.apache.druid.segment.loading.SegmentRangeReader;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class S3SegmentRangeReaderTest
+{
+ private static final String BUCKET = "test-bucket";
+ private static final String KEY_PREFIX =
"ds/2024-01-01T00:00:00.000Z_2024-01-02T00:00:00.000Z/0/0/";
+
+ @Mock
+ private ServerSideEncryptingAmazonS3 s3Client;
+
+ private S3SegmentRangeReader reader;
+
+ @BeforeEach
+ public void setUp()
+ {
+ reader = new S3SegmentRangeReader(s3Client, BUCKET, KEY_PREFIX);
+ }
+
+ @Test
+ public void testReadRangeIssuesClosedRangeGetWithKeyPrefixPlusFilename()
throws IOException
+ {
+
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
byte[0]));
+
+ try (InputStream ignored = reader.readRange("000000.smoosh", 100, 250)) {
+ // open is performed in the RetryingInputStream constructor; the
GetObject call should have already happened.
+ }
+
+ final GetObjectRequest request = captureRequest();
+ assertEquals(BUCKET, request.bucket());
+ assertEquals(KEY_PREFIX + "000000.smoosh", request.key());
+ // closed range: bytes=offset-(offset+length-1)
+ assertEquals("bytes=100-349", request.range());
+ }
+
+ @Test
+ public void testReadRangeBuildsDifferentKeysForDifferentFilenames() throws
IOException
+ {
+
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
byte[0]));
+
+ reader.readRange("file-a", 0, 16).close();
+ reader.readRange("file-b", 0, 16).close();
+
+ final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+ ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+ verify(s3Client, times(2)).getObject(builderCaptor.capture());
+ assertEquals(KEY_PREFIX + "file-a",
builderCaptor.getAllValues().get(0).build().key());
+ assertEquals(KEY_PREFIX + "file-b",
builderCaptor.getAllValues().get(1).build().key());
+ }
+
+ @Test
+ public void testReadRangeWithSingleByteUsesInclusiveRange() throws
IOException
+ {
+
when(s3Client.getObject(any(GetObjectRequest.Builder.class))).thenReturn(stubResponse(new
byte[0]));
+
+ reader.readRange("f", 42, 1).close();
+
+ // bytes=offset-offset (length=1 → end = offset+0)
+ assertEquals("bytes=42-42", captureRequest().range());
+ }
+
+ @Test
+ public void testReadRangeWrapsNonRetryableS3ExceptionAsIOException()
+ {
+ // 403 isn't retryable per AWSClientUtil.isClientExceptionRecoverable →
RetryingInputStream's open fails once,
+ // S3RETRY says no, the IOException(S3Exception) is propagated as-is by
Throwables.propagateIfInstanceOf.
+ when(s3Client.getObject(any(GetObjectRequest.Builder.class)))
+ .thenThrow((S3Exception)
S3Exception.builder().message("denied").statusCode(403).build());
+
+ final IOException thrown = assertThrows(IOException.class, () ->
reader.readRange("f", 0, 1));
+ assertSame(S3Exception.class, thrown.getCause().getClass());
+ }
+
+ @Test
+ public void testReadRangeRetriesMidStreamFromBytesAlreadyConsumed() throws
IOException
+ {
+ // First range read delivers the first 4 bytes then errors mid-stream with
a retryable IOException.
+ // RetryingInputStream should reopen at the next byte (request.offset +
4), exercising the offset math in
+ // RangeOpenFunction.open(request, start). NOTE: this test sleeps ~1s
because RetryingInputStream's first retry
+ // uses RetryUtils.BASE_SLEEP_MILLIS exponential backoff (no
@VisibleForTesting hook is accessible from here).
+ final byte[] firstChunk = {0x01, 0x02, 0x03, 0x04};
+ final byte[] secondChunk = {0x05, 0x06, 0x07, 0x08, 0x09, 0x0A};
+ final byte[] all = new byte[firstChunk.length + secondChunk.length];
+ System.arraycopy(firstChunk, 0, all, 0, firstChunk.length);
+ System.arraycopy(secondChunk, 0, all, firstChunk.length,
secondChunk.length);
+
+ when(s3Client.getObject(any(GetObjectRequest.Builder.class)))
+ .thenReturn(stubResponseFailingAfter(firstChunk))
+ .thenReturn(stubResponse(secondChunk));
+
+ final byte[] read;
+ try (InputStream stream = reader.readRange("f", 100, 10)) {
+ read = ByteStreams.toByteArray(stream);
+ }
+ assertArrayEquals(all, read);
+
+ final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+ ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+ verify(s3Client, times(2)).getObject(builderCaptor.capture());
+ final List<GetObjectRequest> requests = Arrays.asList(
+ builderCaptor.getAllValues().get(0).build(),
+ builderCaptor.getAllValues().get(1).build()
+ );
+ // First: full requested range
+ assertEquals("bytes=100-109", requests.get(0).range());
+ // Retry: resume at (offset + bytes-already-consumed) through original end
+ assertEquals("bytes=104-109", requests.get(1).range());
+ }
+
+ @Test
+ public void testReadRangeRejectsNegativeOffset()
+ {
+ assertThrows(IllegalArgumentException.class, () -> reader.readRange("f",
-1, 16));
+ }
+
+ @Test
+ public void
testReadRangeReturnsEmptyStreamForZeroLengthWithoutContactingS3() throws
IOException
+ {
+ // SegmentFileBuilderV10 allows zero-length internal-file entries;
readRange must accept length=0 and return an
+ // empty stream without issuing an S3 GET (a closed range of bytes=N-(N-1)
would 416).
+ try (InputStream stream = reader.readRange("f", 100, 0)) {
+ assertEquals(-1, stream.read());
+ }
+ verifyNoInteractions(s3Client);
+ }
+
+ @Test
+ public void testReadRangeRejectsNegativeLength()
+ {
+ assertThrows(IllegalArgumentException.class, () -> reader.readRange("f",
0, -1));
+ }
+
+ @Test
+ public void testImplementsSegmentRangeReader()
+ {
+ // Compile-time and runtime guard: ensure the implements relationship
survives refactors so callers can rely on
+ // returning S3SegmentRangeReader from openRangeReader().
+ final SegmentRangeReader downcast = reader;
+ assertSame(reader, downcast);
+ }
+
+ private GetObjectRequest captureRequest()
+ {
+ final ArgumentCaptor<GetObjectRequest.Builder> builderCaptor =
+ ArgumentCaptor.forClass(GetObjectRequest.Builder.class);
+ verify(s3Client).getObject(builderCaptor.capture());
+ return builderCaptor.getValue().build();
+ }
+
+ private static ResponseInputStream<GetObjectResponse> stubResponse(byte[]
bytes)
+ {
+ return new ResponseInputStream<>(
+ GetObjectResponse.builder().build(),
+ new ByteArrayInputStream(bytes)
+ );
+ }
+
+ /**
+ * Returns a stream that delivers the given {@code head} bytes successfully,
then raises a bare {@link IOException}
+ * (no cause) on the next read — which {@link S3Utils#S3RETRY} treats as
retryable, so {@link
+ * org.apache.druid.data.input.impl.RetryingInputStream} will close this
delegate and call the open function again
+ * with {@code start = head.length}.
+ */
+ private static ResponseInputStream<GetObjectResponse>
stubResponseFailingAfter(byte[] head)
+ {
+ final InputStream delegate = new InputStream()
+ {
+ private final ByteArrayInputStream src = new ByteArrayInputStream(head);
+
+ @Override
+ public int read() throws IOException
+ {
+ final int b = src.read();
+ if (b == -1) {
+ throw new IOException("simulated mid-stream failure");
+ }
+ return b;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ final int n = src.read(b, off, len);
+ if (n == -1) {
+ throw new IOException("simulated mid-stream failure");
+ }
+ return n;
+ }
+ };
+ return new ResponseInputStream<>(GetObjectResponse.builder().build(),
delegate);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]