This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4601e285 CASSANDRASC-142: Improve S3 download throttling with 
range-GetObject
4601e285 is described below

commit 4601e28529996a3447e74093cc6cc35879143031
Author: Yifan Cai <y...@apache.org>
AuthorDate: Tue Aug 13 23:29:44 2024 -0700

    CASSANDRASC-142: Improve S3 download throttling with range-GetObject
    
    The patch adds a few changes according to the best practice for AWS SDK S3 
client, including:
    - download throttling with range-GetObject
    - configure apiCallTimeout
    - use OpenSSL for S3 client
    
    Patch by Yifan Cai; Reviewed by Doug Rohrer, Saranya Krishnakumar for 
CASSANDRASC-142
---
 CHANGES.txt                                        |   1 +
 build.gradle                                       |  16 +-
 gradle.properties                                  |   2 +
 spotbugs-exclude.xml                               |   1 +
 src/main/dist/conf/sidecar.yaml                    |   2 +
 .../sidecar/config/S3ClientConfiguration.java      |  13 ++
 .../config/yaml/S3ClientConfigurationImpl.java     |  51 ++++-
 .../apache/cassandra/sidecar/db/RestoreRange.java  |  28 ++-
 .../sidecar/restore/HttpRangesIterator.java        |  62 ++++++
 .../sidecar/restore/RestoreProcessor.java          |   2 -
 .../sidecar/restore/RestoreRangeTask.java          |  11 +-
 .../cassandra/sidecar/restore/StorageClient.java   | 221 ++++++++++-----------
 .../sidecar/restore/StorageClientPool.java         |  27 ++-
 .../cassandra/sidecar/server/ServerVerticle.java   |   3 +-
 .../sidecar/restore/StorageClientTest.java         | 127 ++++++++----
 .../sidecar/restore/HttpRangesIteratorTest.java    |  63 ++++++
 .../sidecar/restore/RestoreRangeTaskTest.java      |  37 ++--
 .../cassandra/sidecar/server/ServerSSLTest.java    |   2 +-
 .../{logback-sidecar.xml => logback-test.xml}      |   0
 vertx-client/build.gradle                          |  10 +-
 20 files changed, 475 insertions(+), 204 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9b62fa86..6079f284 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Improve S3 download throttling with range-GetObject (CASSANDRASC-142)
  * Updating traffic shaping options throws IllegalStateException 
(CASSANDRASC-140)
  * Add restore job progress endpoint and consistency check on restore ranges 
(CASSANDRASC-132)
  * Upgrade asciidoctor plugin to version 3.3.2 (CASSANDRASC-139)
diff --git a/build.gradle b/build.gradle
index 04ead46f..db600a07 100644
--- a/build.gradle
+++ b/build.gradle
@@ -218,6 +218,21 @@ dependencies {
         }
     }
 
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}")  // for openSSL
+
+    // The newer versions (2.0.48.Final+) of tcnative require explicit 
dependency declarations,
+    // including the classifiers. See 
https://netty.io/wiki/forked-tomcat-native.html#gradle-and-bazel
+    // for details.
+
+    // openSSL native libraries for linux x86-64 architectures
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'linux-x86_64')
+    // openSSL native libraries for macOS aarch-64 architectures
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'osx-aarch_64')
+    // openSSL native libraries for linux aarch-64 architectures
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'linux-aarch_64')
+    // openSSL native libraries for macOS x86-64 architectures
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'osx-x86_64')
+
     jolokia 'org.jolokia:jolokia-jvm:1.6.0:agent'
 
     testImplementation 
"org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
@@ -319,7 +334,6 @@ clean.doLast {
 
 test {
     systemProperty "vertxweb.environment", "dev"
-    systemProperty "logback.configurationFile", "logback-sidecar.xml"
     systemProperty "vertx.logger-delegate-factory-class-name", 
"io.vertx.core.logging.SLF4JLogDelegateFactory"
     // ordinarily we don't need integration tests
     // see the integrationTest task
diff --git a/gradle.properties b/gradle.properties
index 4ad89e04..09afe79f 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -29,6 +29,8 @@ dtestDependencyName=cassandra-dtest-local-all
 awsSdkVersion=2.26.12
 # The dep is to introduce xxhash impl
 commonsCodecVersion=1.16.1
+# openSSL
+boringSslVersion=2.0.61.Final
 # If running MacOS then you need to increase the max
 # open FD limit
 org.gradle.jvmargs=-XX:-MaxFDLimit
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index 223f2a1c..8df098d2 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -82,6 +82,7 @@
             <Class 
name="org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder$SnapshotFile" 
/>
             <Class 
name="org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandlerTest" />
             <Class 
name="org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor" />
+            <Class name="org.apache.cassandra.sidecar.restore.StorageClient" />
         </Or>
     </Match>
 
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 2595689b..76b8d0bb 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -185,6 +185,8 @@ s3_client:
   concurrency: 4
   thread_name_prefix: s3-client
   thread_keep_alive_seconds: 60
+  api_call_timeout_millis: 60000 # 1 minute
+  range_get_object_bytes_size: 5242880  # 5 MiB
 #  proxy_config:
 #    uri:
 #    username:
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java 
b/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java
index 803f4d8a..41cd3dff 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java
@@ -35,6 +35,19 @@ public interface S3ClientConfiguration
      */
     long threadKeepAliveSeconds();
 
+    /**
+     * Returns range bytes size to produce <a 
href="https://www.rfc-editor.org/rfc/rfc9110.html#name-range";>Range header</a> 
for range-get object.
+     * The size should not be too large (long request) or too small (too many 
request). 5 to 10 MiB would be ideal to start with.
+     * @return range bytes size.
+     */
+    int rangeGetObjectBytesSize();
+
+    /**
+     * API call timeout in milliseconds for S3 API calls.
+     * @return duration of timeout in milliseconds
+     */
+    long apiCallTimeoutMillis();
+
     /**
      * Route traffic through a proxy in the environment that requires doing 
so, when a proxy is specified
      *
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java
 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java
index d5bf5767..505f13d7 100644
--- 
a/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java
+++ 
b/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java
@@ -18,7 +18,10 @@
 
 package org.apache.cassandra.sidecar.config.yaml;
 
+import java.util.concurrent.TimeUnit;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
 import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
 import org.jetbrains.annotations.NotNull;
@@ -31,18 +34,32 @@ public class S3ClientConfigurationImpl implements 
S3ClientConfiguration
     public static final String DEFAULT_THREAD_NAME_PREFIX = "s3-client";
 
     public static final String PROXY_PROPERTY = "proxy_config";
+    public static final String API_CALL_TIMEOUT_MILLIS = 
"api_call_timeout_millis";
+    public static final String RANGE_GET_OBJECT_BYTES_SIZE = 
"range_get_object_bytes_size";
+    public static final String THREAD_KEEP_ALIVE_SECONDS = 
"thread_keep_alive_seconds";
+    public static final String CONCURRENCY = "concurrency";
+    public static final String THREAD_NAME_PREFIX = "thread_name_prefix";
+
     public static final long DEFAULT_THREAD_KEEP_ALIVE_SECONDS = 60;
     public static final int DEFAULT_S3_CLIENT_CONCURRENCY = 4;
+    public static final long DEFAULT_API_CALL_TIMEOUT_MILLIS = 
TimeUnit.SECONDS.toMillis(60);
+    public static final int DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE = 5 * 1024 * 
1024; // 5 MiB
 
-    @JsonProperty(value = "thread_name_prefix")
+    @JsonProperty(value = THREAD_NAME_PREFIX)
     protected final String threadNamePrefix;
 
-    @JsonProperty(value = "concurrency")
+    @JsonProperty(value = CONCURRENCY)
     protected final int concurrency;
 
-    @JsonProperty(value = "thread_keep_alive_seconds")
+    @JsonProperty(value = THREAD_KEEP_ALIVE_SECONDS)
     protected final long threadKeepAliveSeconds;
 
+    @JsonProperty(value = RANGE_GET_OBJECT_BYTES_SIZE)
+    protected final int rangeGetObjectBytesSize;
+
+    @JsonProperty(value = API_CALL_TIMEOUT_MILLIS)
+    protected final long apiCallTimeoutMillis;
+
     @JsonProperty(value = PROXY_PROPERTY)
     protected final S3ProxyConfiguration proxyConfig;
 
@@ -51,18 +68,26 @@ public class S3ClientConfigurationImpl implements 
S3ClientConfiguration
         this(DEFAULT_THREAD_NAME_PREFIX,
              DEFAULT_S3_CLIENT_CONCURRENCY,
              DEFAULT_THREAD_KEEP_ALIVE_SECONDS,
+             DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE,
+             DEFAULT_API_CALL_TIMEOUT_MILLIS,
              new S3ProxyConfigurationImpl());
     }
 
     public S3ClientConfigurationImpl(String threadNamePrefix,
                                      int concurrency,
                                      long threadKeepAliveSeconds,
+                                     int rangeGetObjectBytesSize,
+                                     long apiCallTimeoutMillis,
                                      S3ProxyConfiguration proxyConfig)
     {
+        Preconditions.checkArgument(apiCallTimeoutMillis > 
TimeUnit.SECONDS.toMillis(10),
+                                    "apiCallTimeout cannot be smaller than 10 
seconds. Configured: " + apiCallTimeoutMillis + " ms");
         this.threadNamePrefix = threadNamePrefix;
         this.concurrency = concurrency;
         this.threadKeepAliveSeconds = threadKeepAliveSeconds;
+        this.rangeGetObjectBytesSize = rangeGetObjectBytesSize;
         this.proxyConfig = proxyConfig;
+        this.apiCallTimeoutMillis = apiCallTimeoutMillis;
     }
 
     /**
@@ -70,7 +95,7 @@ public class S3ClientConfigurationImpl implements 
S3ClientConfiguration
      */
     @NotNull
     @Override
-    @JsonProperty(value = "thread_name_prefix")
+    @JsonProperty(value = THREAD_NAME_PREFIX)
     public String threadNamePrefix()
     {
         return threadNamePrefix;
@@ -80,7 +105,7 @@ public class S3ClientConfigurationImpl implements 
S3ClientConfiguration
      * {@inheritDoc}
      */
     @Override
-    @JsonProperty(value = "concurrency")
+    @JsonProperty(value = CONCURRENCY)
     public int concurrency()
     {
         return concurrency;
@@ -90,12 +115,26 @@ public class S3ClientConfigurationImpl implements 
S3ClientConfiguration
      * {@inheritDoc}
      */
     @Override
-    @JsonProperty(value = "thread_keep_alive_seconds")
+    @JsonProperty(value = THREAD_KEEP_ALIVE_SECONDS)
     public long threadKeepAliveSeconds()
     {
         return threadKeepAliveSeconds;
     }
 
+    @Override
+    @JsonProperty(value = RANGE_GET_OBJECT_BYTES_SIZE)
+    public int rangeGetObjectBytesSize()
+    {
+        return rangeGetObjectBytesSize;
+    }
+
+    @Override
+    @JsonProperty(value = API_CALL_TIMEOUT_MILLIS)
+    public long apiCallTimeoutMillis()
+    {
+        return apiCallTimeoutMillis;
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java 
b/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
index ed20d872..d0b71730 100644
--- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
+++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java
@@ -33,7 +33,6 @@ import 
org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider;
 import org.apache.cassandra.sidecar.common.DataObjectBuilder;
 import org.apache.cassandra.sidecar.common.response.data.RestoreRangeJson;
 import org.apache.cassandra.sidecar.common.server.data.RestoreRangeStatus;
-import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -108,6 +107,7 @@ public class RestoreRange
     private final RestoreJobProgressTracker tracker;
 
     // mutable states
+    private Long sliceObjectLength; // content length value from the 
HeadObjectResponse; it should be the same with the slice#compressedSize
     private boolean existsOnS3 = false;
     private boolean hasStaged = false;
     private boolean hasImported = false;
@@ -234,9 +234,10 @@ public class RestoreRange
         tracker.requestOutOfRangeDataCleanup();
     }
 
-    public void setExistsOnS3()
+    public void setExistsOnS3(Long objectLength)
     {
         this.existsOnS3 = true;
+        this.sliceObjectLength = objectLength;
     }
 
     public void incrementDownloadAttempt()
@@ -329,19 +330,22 @@ public class RestoreRange
         return readSliceProperty(RestoreSlice::checksum);
     }
 
-    public Long sliceCreationTimeNanos()
+    public long sliceCreationTimeNanos()
     {
-        return readSliceProperty(RestoreSlice::creationTimeNanos);
+        return Objects.requireNonNull(source, "Source slice does not exist")
+                      .creationTimeNanos();
     }
 
-    public Long sliceCompressedSize()
+    public long sliceCompressedSize()
     {
-        return readSliceProperty(RestoreSlice::compressedSize);
+        return Objects.requireNonNull(source, "Source slice does not exist")
+                      .compressedSize();
     }
 
-    public Long sliceUncompressedSize()
+    public long sliceUncompressedSize()
     {
-        return readSliceProperty(RestoreSlice::uncompressedSize);
+        return Objects.requireNonNull(source, "Source slice does not exist")
+                      .uncompressedSize();
     }
 
     public String keyspace()
@@ -413,6 +417,11 @@ public class RestoreRange
         return existsOnS3;
     }
 
+    public long sliceObjectLength()
+    {
+        return sliceObjectLength == null ? 0 : sliceObjectLength;
+    }
+
     public boolean hasStaged()
     {
         return hasStaged;
@@ -447,8 +456,7 @@ public class RestoreRange
 
     public long estimatedSpaceRequiredInBytes()
     {
-        Preconditions.checkState(source != null, "Cannot estimate space 
requirement without source slice");
-        return source.compressedSize() + source.uncompressedSize();
+        return sliceCompressedSize() + sliceUncompressedSize();
     }
 
     // -------------
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java
new file mode 100644
index 00000000..2ad42697
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+
+/**
+ * Iterator over the produced http ranges
+ */
+public class HttpRangesIterator implements Iterator<HttpRange>
+{
+    private final long totalBytes;
+    private final int rangeSize;
+    private long offset = 0;
+
+    public HttpRangesIterator(long totalBytes, int rangeSize)
+    {
+        Preconditions.checkArgument(totalBytes > 0, "totalBytes must be 
positive");
+        Preconditions.checkArgument(rangeSize > 0, "rangeSize must be 
positive");
+        this.totalBytes = totalBytes;
+        this.rangeSize = rangeSize;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        return offset < totalBytes;
+    }
+
+    @Override
+    public HttpRange next()
+    {
+        if (!hasNext())
+        {
+            throw new NoSuchElementException("No more HttpRanges");
+        }
+        long end = Math.min(offset + rangeSize, totalBytes) - 1;
+        HttpRange range = HttpRange.of(offset, end);
+        offset = end + 1;
+        return range;
+    }
+}
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
index 036f4eb6..a68eb09e 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java
@@ -274,8 +274,6 @@ public class RestoreProcessor implements PeriodicTask
             else
             {
                 LOGGER.error("Slice failed with unrecoverable failure. 
sliceKey={}", range.sliceKey(), cause);
-                // fail the slice and mark the slice has failed on its owning 
instance.
-                // In the phase 1 implementation, all slices of the job get 
aborted
                 range.fail(RestoreJobExceptions.toFatal(cause));
                 if (range.job().isManagedBySidecar())
                 {
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
index a57867cb..763b9c12 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java
@@ -199,9 +199,10 @@ public class RestoreRangeTask implements 
RestoreRangeHandler
         return failOnCancelled(range, null)
                // 1. check object existence and validate eTag / checksum
                .compose(value -> checkObjectExistence())
-               // 2. download slice/object when the remote object exists
                .compose(value -> failOnCancelled(range, value))
-               .compose(headObject -> downloadSlice())
+               // 2. download slice/object (with partNumber) when the remote 
object exists
+               .compose(value -> downloadSlice())
+               .compose(value -> failOnCancelled(range, value))
                // 3. unzip the file and import/commit
                .compose(this::unzipAndImport);
     }
@@ -218,10 +219,10 @@ public class RestoreRangeTask implements 
RestoreRangeHandler
         // even if the file already exists on disk, we should still check the 
object existence
         return
         fromCompletionStage(s3Client.objectExists(range))
-        .compose(exists -> { // on success
+        .compose(headObjectResponse -> { // on success
             long durationNanos = currentTimeInNanos() - 
range.sliceCreationTimeNanos();
             metrics.sliceReplicationTime.metric.update(durationNanos, 
TimeUnit.NANOSECONDS);
-            range.setExistsOnS3();
+            range.setExistsOnS3(headObjectResponse.contentLength());
             LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} 
replicationTimeNanos={}",
                          range.jobId(), range.sliceKey(), durationNanos);
             return Future.succeededFuture();
@@ -285,7 +286,7 @@ public class RestoreRangeTask implements RestoreRangeHandler
 
         LOGGER.info("Begin downloading restore slice. sliceKey={}", 
range.sliceKey());
         Future<File> future =
-        fromCompletionStage(s3Client.downloadObjectIfAbsent(range))
+        s3Client.downloadObjectIfAbsent(range, executorPool)
         .recover(cause -> { // converts to restore job exception
             LOGGER.warn("Failed to download restore slice. sliceKey={}", 
range.sliceKey(), cause);
 
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
index cf3b12d7..e0fc259c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java
@@ -21,8 +21,7 @@ package org.apache.cassandra.sidecar.restore;
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.Channel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.FileAlreadyExistsException;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.EnumSet;
@@ -38,8 +37,13 @@ import com.google.common.util.concurrent.SidecarRateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.vertx.core.Future;
 import org.apache.cassandra.sidecar.common.data.StorageCredentials;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreRange;
 import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException;
@@ -47,8 +51,8 @@ import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
-import software.amazon.awssdk.core.async.ResponsePublisher;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -66,6 +70,7 @@ public class StorageClient
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageClient.class);
 
     private final S3AsyncClient client;
+    private final int rangeHeaderSize;
     private final SidecarRateLimiter downloadRateLimiter;
     private final Map<UUID, Credentials> credentialsProviders = new 
ConcurrentHashMap<>();
 
@@ -73,12 +78,15 @@ public class StorageClient
     StorageClient(S3AsyncClient client)
     {
         // no rate-limiting
-        this(client, SidecarRateLimiter.create(-1));
+        this(client,
+             S3ClientConfigurationImpl.DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE,
+             SidecarRateLimiter.create(-1));
     }
 
-    StorageClient(S3AsyncClient client, SidecarRateLimiter downloadRateLimiter)
+    StorageClient(S3AsyncClient client, int rangeHeaderSize, 
SidecarRateLimiter downloadRateLimiter)
     {
         this.client = client;
+        this.rangeHeaderSize = rangeHeaderSize;
         this.downloadRateLimiter = downloadRateLimiter;
     }
 
@@ -117,14 +125,18 @@ public class StorageClient
         credentialsProviders.remove(jobId);
     }
 
+    /**
+     * Check object existence with matching checksum
+     * @param range restore range
+     * @return future of HeadObjectResponse
+     */
     public CompletableFuture<HeadObjectResponse> objectExists(RestoreRange 
range)
     {
         Credentials credentials = credentialsProviders.get(range.jobId());
         if (credentials == null)
         {
-            CompletableFuture<HeadObjectResponse> failedFuture = new 
CompletableFuture<>();
-            failedFuture.completeExceptionally(credentialsNotFound(range));
-            return failedFuture;
+            LOGGER.warn("Credentials not found. jobId={}", range.jobId());
+            return failedFuture(credentialsNotFound(range));
         }
 
         // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
@@ -140,15 +152,13 @@ public class StorageClient
                      .whenComplete(logCredentialOnRequestFailure(range, 
credentials));
     }
 
-    public CompletableFuture<File> downloadObjectIfAbsent(RestoreRange range)
+    public Future<File> downloadObjectIfAbsent(RestoreRange range, 
TaskExecutorPool taskExecutorPool)
     {
         Credentials credentials = credentialsProviders.get(range.jobId());
         if (credentials == null)
         {
             LOGGER.warn("Credentials to download object not found. jobId={}", 
range.jobId());
-            CompletableFuture<File> failedFuture = new CompletableFuture<>();
-            failedFuture.completeExceptionally(credentialsNotFound(range));
-            return failedFuture;
+            return Future.failedFuture(credentialsNotFound(range));
         }
 
         Path objectPath = range.stagedObjectPath();
@@ -163,27 +173,22 @@ public class StorageClient
             // TODO 2: extend restore_job table to define the multi-part 
upload chunk size, in order to perform local
             //         verification of the etag/checksum
             // For now, we just skip download, assuming the scenario is rare 
and no maliciousness
-            return CompletableFuture.completedFuture(object);
+            return Future.succeededFuture(object);
         }
 
-        if (!object.getParentFile().mkdirs())
+        try
         {
-            LOGGER.warn("Error occurred while creating directory. jobId={} 
sliceKey={}",
-                        range.jobId(), range.sliceKey());
-
+            Files.createDirectories(objectPath.getParent());
+        }
+        catch (Exception ex)
+        {
+            LOGGER.error("Error occurred while creating directory. jobId={} 
sliceKey={}",
+                         range.jobId(), range.sliceKey(), ex);
+            return Future.failedFuture(ex);
         }
 
         LOGGER.info("Downloading object. jobId={} sliceKey={}", range.jobId(), 
range.sliceKey());
-        // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
-        GetObjectRequest request =
-        GetObjectRequest.builder()
-                        .overrideConfiguration(b -> 
b.credentialsProvider(credentials.awsCredentialsProvider()))
-                        .bucket(range.sliceBucket())
-                        .key(range.sliceKey())
-                        .build();
-        return rateLimitedGetObject(range, client, request, objectPath)
-               .whenComplete(logCredentialOnRequestFailure(range, credentials))
-               .thenApply(res -> object);
+        return rangeGetObject(range, credentials, objectPath, 
taskExecutorPool);
     }
 
     public void close()
@@ -198,6 +203,71 @@ public class StorageClient
         }
     }
 
+    // Range-GetObject with http range header
+    private Future<File> rangeGetObject(RestoreRange range, Credentials 
credentials, Path destinationPath, TaskExecutorPool taskExecutorPool)
+    {
+        HttpRangesIterator iterator = new 
HttpRangesIterator(range.sliceObjectLength(), rangeHeaderSize);
+        Preconditions.checkState(iterator.hasNext(), "SliceObject is empty. 
sliceKey=" + range.sliceKey());
+
+        SeekableByteChannel seekableByteChannel;
+        try
+        {
+            seekableByteChannel = Files.newByteChannel(destinationPath, 
EnumSet.of(CREATE_NEW, WRITE));
+        }
+        catch (IOException e)
+        {
+            LOGGER.error("Failed to create file channel for downloading. 
jobId={} sliceKey={}",
+                         range.jobId(), range.sliceKey(), e);
+            return Future.failedFuture(e);
+        }
+        Future<SeekableByteChannel> channelFuture = 
Future.succeededFuture(seekableByteChannel);
+        while (iterator.hasNext())
+        {
+            HttpRange httpRange = iterator.next();
+            // Schedule each part download in the taskExecutorPool with 
ordered == true.
+            // Parts are downloaded one by one in sequence.
+            channelFuture = channelFuture.compose(channel -> 
taskExecutorPool.executeBlocking(() -> {
+                // the length is guaranteed to be no greater than 
rangeHeaderSize (int)
+                int actualRangeSize = (int) httpRange.length();
+                // throttle the download throughput
+                downloadRateLimiter.acquire(actualRangeSize);
+                // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
+                GetObjectRequest request =
+                GetObjectRequest.builder()
+                                .overrideConfiguration(b -> 
b.credentialsProvider(credentials.awsCredentialsProvider()))
+                                .bucket(range.sliceBucket())
+                                .key(range.sliceKey())
+                                .range(httpRange.toString())
+                                .build();
+                // note: it is a blocking get; No parallelism in getting the 
ranges of the same object
+                ResponseBytes<GetObjectResponse> bytes = 
client.getObject(request, AsyncResponseTransformer.toBytes()).get();
+                channel.write(bytes.asByteBuffer());
+                return channel;
+            }, true));
+        }
+        return channelFuture
+               // eventually is evaluated in both success and failure cases
+               .eventually(() -> taskExecutorPool.runBlocking(() -> {
+                   ThrowableUtils.propagate(() -> 
closeChannel(seekableByteChannel));
+               }, true))
+               .compose(channel -> 
Future.succeededFuture(destinationPath.toFile()),
+                        failure -> { // failure mapper; log the credential on 
failure
+                            LOGGER.error("Request is not successful. jobId={} 
credentials={}",
+                                         range.jobId(), 
credentials.readCredentials, failure);
+                            try
+                            {
+                                Files.deleteIfExists(destinationPath);
+                            }
+                            catch (IOException e)
+                            {
+                                LOGGER.warn("Failed to clean up the failed 
download. jobId={} sliceKey={}",
+                                            range.jobId(), range.sliceKey(), 
e);
+                                failure.addSuppressed(e);
+                            }
+                            return Future.failedFuture(failure);
+                        });
+    }
+
     private boolean matches(Credentials c1, Credentials c2)
     {
         if (c1 == c2)
@@ -225,107 +295,30 @@ public class StorageClient
         return (ignored, cause) -> {
             if (cause != null)
             {
-                LOGGER.error("GetObjectRequest is not successful. jobId={} 
credentials={}",
+                LOGGER.error("Request is not successful. jobId={} 
credentials={}",
                              range.jobId(), credentials.readCredentials, 
cause);
             }
         };
     }
 
     /**
-     * Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. 
It writes the object from S3 to a file
-     * applying rate limiting on the download throughput.
-     *
-     * @param range           the range to be restored
-     * @param client          the S3 client
-     * @param request         the {@link GetObjectRequest request}
-     * @param destinationPath the path where the object will be persisted
-     * @return a {@link CompletableFuture} of the {@link GetObjectResponse}
-     */
-    private CompletableFuture<GetObjectResponse> 
rateLimitedGetObject(RestoreRange range,
-                                                                      
S3AsyncClient client,
-                                                                      
GetObjectRequest request,
-                                                                      Path 
destinationPath)
-    {
-        return client.getObject(request, 
AsyncResponseTransformer.toPublisher())
-                     .thenCompose(responsePublisher -> 
subscribeRateLimitedWrite(range,
-                                                                               
  destinationPath,
-                                                                               
  responsePublisher));
-    }
-
-    /**
-     * Returns a {@link CompletableFuture} to the {@link GetObjectResponse} 
and consuming the GetObjectResponse
-     * by subscribing to the {@code publisher}. Applying backpressure on the 
received bytes by rate limiting
-     * the download throughput using the {@code downloadRateLimiter} object.
+     * Closes the channel if not-null and open
      *
-     * @param range           the range to be restored
-     * @param destinationPath the path where the object will be persisted
-     * @param publisher       the {@link ResponsePublisher}
-     * @return a {@link CompletableFuture} to the {@link GetObjectResponse}
+     * @param channel the channel to be closed
      */
-    CompletableFuture<GetObjectResponse> 
subscribeRateLimitedWrite(RestoreRange range,
-                                                                   Path 
destinationPath,
-                                                                   
ResponsePublisher<GetObjectResponse> publisher)
+    private void closeChannel(Channel channel) throws IOException
     {
-        // closed at the completion of subscribeFuture
-        WritableByteChannel channel;
-        try
-        {
-            // always create new file, and fails if it already exists
-            // this is consistent with the expectation that we won't
-            // re-download a file that already exists
-            // The channel is closed on completion of streaming asynchronously
-            channel = Files.newByteChannel(destinationPath, 
EnumSet.of(CREATE_NEW, WRITE));
-        }
-        catch (FileAlreadyExistsException fileAlreadyExistsException)
-        {
-            LOGGER.info("Skipping download. File already exists. jobId={} 
sliceKey={}",
-                        range.jobId(), range.sliceKey());
-            return CompletableFuture.completedFuture(publisher.response());
-        }
-        catch (IOException e)
+        if (channel != null && channel.isOpen())
         {
-            LOGGER.error("Error occurred while creating channel. 
destinationPath={} jobId={} sliceKey={}",
-                         destinationPath, range.jobId(), range.sliceKey(), e);
-            throw new RuntimeException(e);
+            channel.close();
         }
-        // CompletableFuture that will be notified when all events have been 
consumed or if an error occurs.
-        return publisher
-               .subscribe(buffer -> {
-                   downloadRateLimiter.acquire(buffer.remaining()); // apply 
backpressure on the received bytes
-                   ThrowableUtils.propagate(() -> channel.write(buffer));
-               })
-               .whenComplete((v, subscribeThrowable) -> {
-                   // finally close the channel and log error if failed
-                   closeChannel(channel);
-
-                   if (subscribeThrowable != null)
-                   {
-                       LOGGER.error("Error occurred while downloading. 
jobId={} sliceKey={}",
-                                    range.jobId(), range.sliceKey(), 
subscribeThrowable);
-                   }
-               })
-               .thenApply(v -> publisher.response());
     }
 
-    /**
-     * Closes the channel if not-null. Wraps any {@link IOException} in a 
{@link RuntimeException}
-     *
-     * @param channel the channel to be closed
-     * @throws RuntimeException wrapping any {@link IOException}
-     */
-    private void closeChannel(Channel channel)
+    private static <T> CompletableFuture<T> failedFuture(Throwable cause)
     {
-        if (channel != null && channel.isOpen())
-        {
-            try
-            {
-                channel.close();
-            }
-            catch (IOException e)
-            {
-                LOGGER.error("Failed to close channel", e);
-            }
-        }
+        CompletableFuture<T> failure = new CompletableFuture<>();
+        failure.completeExceptionally(cause);
+        return failure;
     }
 
     private static class Credentials
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java 
b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java
index eafe600e..a14a54d4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java
+++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.restore;
 
 import java.net.URI;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
@@ -28,10 +29,13 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.google.inject.name.Named;
+import io.netty.handler.ssl.OpenSsl;
 import org.apache.cassandra.sidecar.config.S3ClientConfiguration;
 import org.apache.cassandra.sidecar.config.S3ProxyConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -52,6 +56,8 @@ import software.amazon.awssdk.utils.ThreadFactoryBuilder;
 @Singleton
 public class StorageClientPool implements SdkAutoCloseable
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StorageClientPool.class);
+
     private final Map<String, StorageClient> clientPool = new 
ConcurrentHashMap<>();
     private final Map<UUID, StorageClient> clientByJobId = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor sharedExecutor;
@@ -100,12 +106,18 @@ public class StorageClientPool implements SdkAutoCloseable
     private StorageClient storageClient(String region)
     {
         return clientPool.computeIfAbsent(region, k -> {
+            logIfOpenSslUnavailable();
+
             Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions = 
Collections.singletonMap(
             SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, 
sharedExecutor
             );
+            Duration apiCallTimeout = 
Duration.ofSeconds(clientConfig.apiCallTimeoutMillis());
             S3AsyncClientBuilder clientBuilder =
             S3AsyncClient.builder()
                          .region(Region.of(region))
+                         // Setting the same timeout for apiCall and 
apiCallAttempt; There is 1 attempt effectively, as we do retry in the 
application
+                         .overrideConfiguration(b -> 
b.apiCallAttemptTimeout(apiCallTimeout)
+                                                      
.apiCallTimeout(apiCallTimeout))
                          .asyncConfiguration(b -> 
b.advancedOptions(advancedOptions));
             S3ProxyConfiguration s3ProxyConfiguration = 
clientConfig.proxyConfig();
             URI endpointOverride = s3ProxyConfiguration.endpointOverride();
@@ -113,6 +125,7 @@ public class StorageClientPool implements SdkAutoCloseable
                 clientBuilder.endpointOverride(endpointOverride)
                              .forcePathStyle(true);
 
+            NettyNioAsyncHttpClient.Builder nettyClientBuilder = 
NettyNioAsyncHttpClient.builder();
             S3ProxyConfiguration config = clientConfig.proxyConfig();
             if (config.isPresent())
             {
@@ -123,11 +136,11 @@ public class StorageClientPool implements SdkAutoCloseable
                                                                    
.username(config.username())
                                                                    
.password(config.password())
                                                                    .build();
-                NettyNioAsyncHttpClient.Builder httpClientBuilder = 
NettyNioAsyncHttpClient.builder();
-                
clientBuilder.httpClientBuilder(httpClientBuilder.proxyConfiguration(proxyConfig));
+                nettyClientBuilder.proxyConfiguration(proxyConfig);
             }
+            clientBuilder.httpClientBuilder(nettyClientBuilder);
 
-            return new StorageClient(clientBuilder.build(), 
ingressFileRateLimiter);
+            return new StorageClient(clientBuilder.build(), 
clientConfig.rangeGetObjectBytesSize(), ingressFileRateLimiter);
         });
     }
 
@@ -138,4 +151,12 @@ public class StorageClientPool implements SdkAutoCloseable
         clientPool.clear();
         clientByJobId.clear();
     }
+
+    private void logIfOpenSslUnavailable()
+    {
+        if (!OpenSsl.isAvailable())
+        {
+            LOGGER.info("OpenSSL is not available for S3AsyncClient");
+        }
+    }
 }
diff --git 
a/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java 
b/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java
index 53221149..a6d8edd1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java
@@ -138,7 +138,8 @@ public class ServerVerticle extends AbstractVerticle
                 // to a vert.x bug.
                 // See following comment for details
                 if (ex.getMessage() != null
-                    && ex.getMessage().contains("Unable to update traffic 
shaping options because the server was not configured to use traffic shaping 
during startup"))
+                    && ex.getMessage().contains("Unable to update traffic 
shaping options " +
+                                                "because the server was not 
configured to use traffic shaping during startup"))
                 {
                     // TODO: we need to rollback this change once vert.x fixes 
this problem
                     // Swallowing the exception here is okay for now until we 
get a proper fix in vert.x.
diff --git 
a/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java
 
b/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java
index 79e5cfd6..e6d2e25e 100644
--- 
a/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java
+++ 
b/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java
@@ -25,29 +25,36 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
 import com.datastax.driver.core.utils.UUIDs;
+import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets;
 import org.apache.cassandra.sidecar.common.data.RestoreJobStatus;
 import org.apache.cassandra.sidecar.common.data.SSTableImportOptions;
 import org.apache.cassandra.sidecar.common.data.StorageCredentials;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.db.RestoreJob;
 import org.apache.cassandra.sidecar.db.RestoreRange;
 import org.assertj.core.data.Percentage;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
@@ -72,29 +79,28 @@ class StorageClientTest
     private static final String testBucket = "bucket";
     private static final String testData = "testData";
     private static final String checksum = 
BinaryUtils.toHex(Md5Utils.computeMD5Hash(testData.getBytes()));
-    private static final String testFileName = "testFile";
     private static final String largeTestFileName = "largeTestFile";
     private static final String testEncKeyRef =
     "arn:aws:kms:us-east-1:1234567890:key/valid-test-key-ref";
-    private S3MockContainer s3Mock;
-    private S3AsyncClient s3AsyncClient;
-    private StorageClient client;
-    private RestoreJob restoreJob;
-
-    private RestoreRange testRange;
-    private RestoreRange largeTestRange;
+    private static S3MockContainer s3Mock;
+    private static S3AsyncClient s3AsyncClient;
+    private static StorageClient client;
+    private static RestoreJob restoreJob;
+    private static RestoreRange testRange;
+    private static RestoreRange largeTestRange;
+    private static Path largeFilePath;
+    private static TaskExecutorPool taskExecutorPool;
 
     @TempDir
-    Path testFolder;
+    private static Path testFolder;
 
-    @BeforeEach
-    void setup() throws Exception
+    @BeforeAll
+    static void setup() throws Exception
     {
         s3Mock = new S3MockContainer("3.5.1")
                  .withValidKmsKeys(testEncKeyRef)
                  .withInitialBuckets(testBucket);
         s3Mock.start();
-        String httpsEndpoint = s3Mock.getHttpsEndpoint();
         // test credential defined in s3mock
         StorageCredentials credentials = StorageCredentials.builder()
                                                            .accessKeyId("foo")
@@ -107,36 +113,44 @@ class StorageClientTest
                                .jobSecrets(new RestoreJobSecrets(credentials, 
credentials))
                                
.sstableImportOptions(SSTableImportOptions.defaults())
                                .build();
-        s3AsyncClient = S3AsyncClient.builder()
-                                     .region(Region.US_WEST_1)
-                                     // provide a dummy credential to prevent 
client from identifying credentials
-                                     
.credentialsProvider(StaticCredentialsProvider.create(
-                                     AwsBasicCredentials.create("foo", "bar")))
-                                     .endpointOverride(new URI(httpsEndpoint))
-                                     // required to prevent client from 
"manipulating" the object path
-                                     .forcePathStyle(true)
-                                     
.httpClient(NettyNioAsyncHttpClient.builder().buildWithDefaults(
-                                         AttributeMap.builder()
-                                                     
.put(TRUST_ALL_CERTIFICATES, Boolean.TRUE)
-                                                     .build()))
-                                     .build();
+        s3AsyncClient = buildS3AsyncClient(Duration.ofSeconds(60));
         client = new StorageClient(s3AsyncClient);
         client.authenticate(restoreJob);
-        Path testPath = testFolder.resolve(testFileName);
-        Files.deleteIfExists(testPath);
-        testRange = getMockRange(restoreJob.jobId, testBucket, "key", 
checksum, testPath);
+        Path stageDirPath = testFolder.resolve("stage");
+        testRange = getMockRange(restoreJob.jobId, testBucket, "key", 
checksum, stageDirPath, testData.length());
         putObject(testRange, testData);
 
-        Path largeFilePath = prepareTestFile(testFolder, largeTestFileName, 
LARGE_FILE_IN_BYTES); // 1MB
+        largeFilePath = prepareTestFile(testFolder, largeTestFileName, 
LARGE_FILE_IN_BYTES); // 1MB
         largeTestRange = getMockRange(restoreJob.jobId, testBucket, "largeKey",
-                                      computeChecksum(largeFilePath), 
largeFilePath);
+                                      computeChecksum(largeFilePath), 
stageDirPath,
+                                      LARGE_FILE_IN_BYTES);
         putObject(largeTestRange, largeFilePath);
-        // delete file after putting it in S3
-        Files.deleteIfExists(largeFilePath);
+
+        taskExecutorPool = new ExecutorPools(Vertx.vertx(), new 
ServiceConfigurationImpl()).internal();
+    }
+
+    static S3AsyncClient buildS3AsyncClient(Duration apiCallTimeout) throws 
Exception
+    {
+        String httpsEndpoint = s3Mock.getHttpsEndpoint();
+        return S3AsyncClient.builder()
+                            .region(Region.US_WEST_1)
+                            .overrideConfiguration(b -> 
b.apiCallTimeout(apiCallTimeout)
+                                                         
.apiCallAttemptTimeout(apiCallTimeout))
+                            // provide a dummy credential to prevent client 
from identifying credentials
+                            
.credentialsProvider(StaticCredentialsProvider.create(
+                            AwsBasicCredentials.create("foo", "bar")))
+                            .endpointOverride(new URI(httpsEndpoint))
+                            // required to prevent client from "manipulating" 
the object path
+                            .forcePathStyle(true)
+                            
.httpClient(NettyNioAsyncHttpClient.builder().buildWithDefaults(
+                            AttributeMap.builder()
+                                        .put(TRUST_ALL_CERTIFICATES, 
Boolean.TRUE)
+                                        .build()))
+                            .build();
     }
 
-    @AfterEach
-    void cleanup()
+    @AfterAll
+    static void cleanup()
     {
         s3Mock.stop();
         client.close();
@@ -183,7 +197,8 @@ class StorageClientTest
     @Test
     void testGetObject() throws Exception
     {
-        File downloaded = client.downloadObjectIfAbsent(testRange).get();
+        File downloaded = client.downloadObjectIfAbsent(testRange, 
taskExecutorPool)
+                                
.toCompletionStage().toCompletableFuture().get();
         assertThat(downloaded.exists()).isTrue();
         assertThat(new 
String(Files.readAllBytes(downloaded.toPath()))).isEqualTo(testData);
     }
@@ -192,27 +207,52 @@ class StorageClientTest
     void testGetObjectHasExistingFileOnDisk() throws Exception
     {
         Path existingPath = testFolder.resolve(UUID.randomUUID().toString());
+        Files.createDirectories(existingPath);
+        Files.createFile(existingPath.resolve("key"));
         RestoreRange sliceHasFileOnDisk = getMockRange(restoreJob.jobId, 
testBucket, "key", checksum, existingPath);
-        File downloaded = 
client.downloadObjectIfAbsent(sliceHasFileOnDisk).get();
+        File downloaded = client.downloadObjectIfAbsent(sliceHasFileOnDisk, 
taskExecutorPool)
+                                
.toCompletionStage().toCompletableFuture().get();
         
assertThat(downloaded.getAbsolutePath()).isEqualTo(existingPath.resolve("key").toString());
     }
 
     @Test
     void testGetObjectThroughputRateLimited() throws Exception
     {
-        // only allow 1/4 the speed of transfer
-        client = new StorageClient(s3AsyncClient, 
SidecarRateLimiter.create(LARGE_FILE_IN_BYTES >> 2));
+        // only allow 1/4 the speed of transfer; each request downloads 128 KiB
+        StorageClient client = new StorageClient(s3AsyncClient, 128 * 1024, 
SidecarRateLimiter.create(LARGE_FILE_IN_BYTES >> 2));
         client.authenticate(restoreJob);
         // Download should take around 4 seconds (256 KB/s for a 1MB file)
         long startNanos = System.nanoTime();
-        File downloaded = client.downloadObjectIfAbsent(largeTestRange).get();
+        File downloaded = client.downloadObjectIfAbsent(largeTestRange, 
taskExecutorPool)
+                                
.toCompletionStage().toCompletableFuture().get();
         assertThat(downloaded.exists()).isTrue();
         long elapsedNanos = System.nanoTime() - startNanos;
         
assertThat(TimeUnit.NANOSECONDS.toMillis(elapsedNanos)).isCloseTo(TimeUnit.SECONDS.toMillis(4),
                                                                           
Percentage.withPercentage(95));
+        byte[] downloadedBytes = Files.readAllBytes(downloaded.toPath());
+        byte[] originalBytes = Files.readAllBytes(largeFilePath);
+        assertThat(Arrays.equals(downloadedBytes, originalBytes)).isTrue();
+    }
+
+    @Test
+    void testApiCallTimeout() throws Exception
+    {
+        try (S3AsyncClient s3Client = buildS3AsyncClient(Duration.ofMillis(1)))
+        {
+            StorageClient client = new StorageClient(s3Client);
+            client.authenticate(restoreJob);
+            assertThatThrownBy(() -> client.objectExists(testRange).get())
+            .hasMessageContaining(" Client execution did not complete before 
the specified timeout configuration: 1 millis")
+            .hasRootCauseInstanceOf(ApiCallTimeoutException.class);
+        }
     }
 
     private RestoreRange getMockRange(UUID jobId, String bucket, String key, 
String checksum, Path localPath)
+    {
+        return getMockRange(jobId, bucket, key, checksum, localPath, 0);
+    }
+
+    private static RestoreRange getMockRange(UUID jobId, String bucket, String 
key, String checksum, Path localPath, long length)
     {
         RestoreRange mock = mock(RestoreRange.class, RETURNS_DEEP_STUBS);
         when(mock.jobId()).thenReturn(jobId);
@@ -220,6 +260,7 @@ class StorageClientTest
         when(mock.sliceKey()).thenReturn(key);
         when(mock.sliceChecksum()).thenReturn(checksum);
         when(mock.stageDirectory()).thenReturn(localPath);
+        when(mock.sliceObjectLength()).thenReturn(length);
         if (localPath != null)
         {
             when(mock.stagedObjectPath()).thenReturn(localPath.resolve(key));
@@ -227,7 +268,7 @@ class StorageClientTest
         return mock;
     }
 
-    private void putObject(RestoreRange range, String stringData) throws 
Exception
+    private static void putObject(RestoreRange range, String stringData) 
throws Exception
     {
         PutObjectRequest request = PutObjectRequest.builder()
                                                    .bucket(range.sliceBucket())
@@ -237,7 +278,7 @@ class StorageClientTest
         s3AsyncClient.putObject(request, 
AsyncRequestBody.fromString(stringData)).get();
     }
 
-    private void putObject(RestoreRange range, Path path) throws Exception
+    private static void putObject(RestoreRange range, Path path) throws 
Exception
     {
         PutObjectRequest request = PutObjectRequest.builder()
                                                    .bucket(range.sliceBucket())
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java
 
b/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java
new file mode 100644
index 00000000..1eee3018
--- /dev/null
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.restore;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.common.utils.HttpRange;
+import org.assertj.core.util.Lists;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class HttpRangesIteratorTest
+{
+    @Test
+    void testIteration()
+    {
+        HttpRangesIterator iterator = new HttpRangesIterator(19, 10);
+        List<HttpRange> ranges = Lists.newArrayList(iterator);
+        assertThat(ranges).hasSize(2);
+        assertThat(ranges.get(0)).hasToString("bytes=0-9");
+        assertThat(ranges.get(1)).hasToString("bytes=10-18");
+    }
+
+    @Test
+    void testSingleRange()
+    {
+        HttpRangesIterator iterator = new HttpRangesIterator(19, 20);
+        List<HttpRange> ranges = Lists.newArrayList(iterator);
+        assertThat(ranges).hasSize(1);
+        assertThat(ranges.get(0)).hasToString("bytes=0-18");
+    }
+
+    @Test
+    void testInvalidArguments()
+    {
+        assertThatThrownBy(() -> new HttpRangesIterator(0, 1))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("totalBytes must be positive");
+
+        assertThatThrownBy(() -> new HttpRangesIterator(1, 0))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("rangeSize must be positive");
+    }
+}
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
index 2203a1f0..dc6a7a70 100644
--- 
a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
+++ 
b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java
@@ -80,6 +80,7 @@ import static 
org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -138,9 +139,11 @@ class RestoreRangeTaskTest
     void testRestoreSucceeds()
     {
         RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED);
-        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null));
-        when(mockStorageClient.downloadObjectIfAbsent(mockRange))
-        .thenReturn(CompletableFuture.completedFuture(new File(".")));
+        HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class);
+        when(headObjectResponse.contentLength()).thenReturn(1L);
+        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse));
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
+        .thenReturn(Future.succeededFuture(new File(".")));
         RestoreRangeTask task = createTask(mockRange, job);
 
         Promise<RestoreRange> promise = Promise.promise();
@@ -167,8 +170,8 @@ class RestoreRangeTaskTest
         RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), 
RestoreJobStatus.CREATED);
         // the existence of the slice is already confirmed by the s3 client
         when(mockRange.existsOnS3()).thenReturn(true);
-        when(mockStorageClient.downloadObjectIfAbsent(mockRange))
-        .thenReturn(CompletableFuture.completedFuture(new File(".")));
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
+        .thenReturn(Future.succeededFuture(new File(".")));
         RestoreRangeTask task = createTask(mockRange, job);
 
         Promise<RestoreRange> promise = Promise.promise();
@@ -220,9 +223,11 @@ class RestoreRangeTaskTest
         doReturn(true).when(job).isManagedBySidecar();
         when(mockRange.job()).thenReturn(job);
         when(mockRange.stagedObjectPath()).thenReturn(Paths.get("nonexist"));
-        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null));
-        when(mockStorageClient.downloadObjectIfAbsent(mockRange))
-        .thenReturn(CompletableFuture.completedFuture(new File(".")));
+        HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class);
+        when(headObjectResponse.contentLength()).thenReturn(1L);
+        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse));
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
+        .thenReturn(Future.succeededFuture(new File(".")));
         RestoreRangeTask task = createTask(mockRange, job);
 
         Promise<RestoreRange> promise = Promise.promise();
@@ -246,7 +251,7 @@ class RestoreRangeTaskTest
         when(mockRange.stagedObjectPath()).thenReturn(stagedPath);
         when(mockStorageClient.objectExists(mockRange))
         .thenThrow(new RuntimeException("Should not call this method"));
-        when(mockStorageClient.downloadObjectIfAbsent(mockRange))
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
         .thenThrow(new RuntimeException("Should not call this method"));
         RestoreRangeTask task = createTask(mockRange, job);
 
@@ -319,8 +324,11 @@ class RestoreRangeTaskTest
         Path stagedPath = testFolder.resolve("slice.zip");
         when(mockRange.stagedObjectPath()).thenReturn(stagedPath);
         when(mockRange.isCancelled()).thenReturn(false);
-        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null));
-        
when(mockStorageClient.downloadObjectIfAbsent(mockRange)).thenThrow(new 
RuntimeException("Random exception"));
+        HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class);
+        when(headObjectResponse.contentLength()).thenReturn(1L);
+        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse));
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
+        .thenThrow(new RuntimeException("Random exception"));
 
         Promise<RestoreRange> promise = Promise.promise();
 
@@ -356,8 +364,11 @@ class RestoreRangeTaskTest
         Path stagedPath = testFolder.resolve("slice.zip");
         when(mockRange.stagedObjectPath()).thenReturn(stagedPath);
         when(mockRange.isCancelled()).thenReturn(false);
-        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null));
-        
when(mockStorageClient.downloadObjectIfAbsent(mockRange)).thenReturn(CompletableFuture.completedFuture(null));
+        HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class);
+        when(headObjectResponse.contentLength()).thenReturn(1L);
+        
when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse));
+        when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), 
any(TaskExecutorPool.class)))
+        .thenReturn(Future.succeededFuture(null));
 
         Promise<RestoreRange> promise = Promise.promise();
 
diff --git 
a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java 
b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
index e19d8237..0da75a32 100644
--- a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java
@@ -277,7 +277,7 @@ class ServerSSLTest
               .compose(s -> validateHealthEndpoint(clientWithP12Keystore(true, 
false)))
               .onComplete(context.failing(throwable -> {
                   assertThat(throwable).isNotNull()
-                                       .hasMessageContaining("Received fatal 
alert: bad_certificate");
+                                       .hasMessageContaining("Received fatal 
alert: certificate_required");
                   context.completeNow();
               }));
     }
diff --git a/src/test/resources/logback-sidecar.xml 
b/src/test/resources/logback-test.xml
similarity index 100%
rename from src/test/resources/logback-sidecar.xml
rename to src/test/resources/logback-test.xml
diff --git a/vertx-client/build.gradle b/vertx-client/build.gradle
index d9744bbc..1de27609 100644
--- a/vertx-client/build.gradle
+++ b/vertx-client/build.gradle
@@ -56,20 +56,20 @@ dependencies {
     api(group: 'io.vertx', name: 'vertx-web-client', version: 
"${project.vertxVersion}") {
         exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core'
     }
-    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: '2.0.61.Final')  // for openSSL
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}")  // for openSSL
 
     // The newer versions (2.0.48.Final+) of tcnative require explicit 
dependency declarations,
     // including the classifiers. See 
https://netty.io/wiki/forked-tomcat-native.html#gradle-and-bazel
     // for details.
 
     // openSSL native libraries for linux x86-64 architectures
-    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: '2.0.61.Final', classifier: 'linux-x86_64')
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'linux-x86_64')
     // openSSL native libraries for macOS aarch-64 architectures
-    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: '2.0.61.Final', classifier: 'osx-aarch_64')
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'osx-aarch_64')
     // openSSL native libraries for linux aarch-64 architectures
-    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: '2.0.61.Final', classifier: 'linux-aarch_64')
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'linux-aarch_64')
     // openSSL native libraries for macOS x86-64 architectures
-    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: '2.0.61.Final', classifier: 'osx-x86_64')
+    implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', 
version: "${project.boringSslVersion}", classifier: 'osx-x86_64')
     implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
 
     compileOnly('org.jetbrains:annotations:23.0.0')


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to