This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 4601e285 CASSANDRASC-142: Improve S3 download throttling with range-GetObject 4601e285 is described below commit 4601e28529996a3447e74093cc6cc35879143031 Author: Yifan Cai <y...@apache.org> AuthorDate: Tue Aug 13 23:29:44 2024 -0700 CASSANDRASC-142: Improve S3 download throttling with range-GetObject The patch adds a few changes according to the best practice for AWS SDK S3 client, including: - download throttling with range-GetObject - configure apiCallTimeout - use OpenSSL for S3 client Patch by Yifan Cai; Reviewed by Doug Rohrer, Saranya Krishnakumar for CASSANDRASC-142 --- CHANGES.txt | 1 + build.gradle | 16 +- gradle.properties | 2 + spotbugs-exclude.xml | 1 + src/main/dist/conf/sidecar.yaml | 2 + .../sidecar/config/S3ClientConfiguration.java | 13 ++ .../config/yaml/S3ClientConfigurationImpl.java | 51 ++++- .../apache/cassandra/sidecar/db/RestoreRange.java | 28 ++- .../sidecar/restore/HttpRangesIterator.java | 62 ++++++ .../sidecar/restore/RestoreProcessor.java | 2 - .../sidecar/restore/RestoreRangeTask.java | 11 +- .../cassandra/sidecar/restore/StorageClient.java | 221 ++++++++++----------- .../sidecar/restore/StorageClientPool.java | 27 ++- .../cassandra/sidecar/server/ServerVerticle.java | 3 +- .../sidecar/restore/StorageClientTest.java | 127 ++++++++---- .../sidecar/restore/HttpRangesIteratorTest.java | 63 ++++++ .../sidecar/restore/RestoreRangeTaskTest.java | 37 ++-- .../cassandra/sidecar/server/ServerSSLTest.java | 2 +- .../{logback-sidecar.xml => logback-test.xml} | 0 vertx-client/build.gradle | 10 +- 20 files changed, 475 insertions(+), 204 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9b62fa86..6079f284 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Improve S3 download throttling with range-GetObject (CASSANDRASC-142) * Updating traffic shaping options throws IllegalStateException (CASSANDRASC-140) * Add restore job progress endpoint and consistency check on restore ranges (CASSANDRASC-132) * Upgrade asciidoctor plugin to version 3.3.2 (CASSANDRASC-139) diff --git a/build.gradle b/build.gradle index 04ead46f..db600a07 100644 --- a/build.gradle +++ b/build.gradle @@ -218,6 +218,21 @@ dependencies { } } + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}") // for openSSL + + // The newer versions (2.0.48.Final+) of tcnative require explicit dependency declarations, + // including the classifiers. See https://netty.io/wiki/forked-tomcat-native.html#gradle-and-bazel + // for details. + + // openSSL native libraries for linux x86-64 architectures + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'linux-x86_64') + // openSSL native libraries for macOS aarch-64 architectures + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'osx-aarch_64') + // openSSL native libraries for linux aarch-64 architectures + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'linux-aarch_64') + // openSSL native libraries for macOS x86-64 architectures + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'osx-x86_64') + jolokia 'org.jolokia:jolokia-jvm:1.6.0:agent' testImplementation "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}" @@ -319,7 +334,6 @@ clean.doLast { test { systemProperty "vertxweb.environment", "dev" - systemProperty "logback.configurationFile", "logback-sidecar.xml" systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory" // ordinarily we don't need integration tests // see the integrationTest task diff --git a/gradle.properties b/gradle.properties index 4ad89e04..09afe79f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -29,6 +29,8 @@ dtestDependencyName=cassandra-dtest-local-all awsSdkVersion=2.26.12 # The dep is to introduce xxhash impl commonsCodecVersion=1.16.1 +# openSSL +boringSslVersion=2.0.61.Final # If running MacOS then you need to increase the max # open FD limit org.gradle.jvmargs=-XX:-MaxFDLimit diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 223f2a1c..8df098d2 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -82,6 +82,7 @@ <Class name="org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder$SnapshotFile" /> <Class name="org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandlerTest" /> <Class name="org.apache.cassandra.sidecar.db.RestoreSliceDatabaseAccessor" /> + <Class name="org.apache.cassandra.sidecar.restore.StorageClient" /> </Or> </Match> diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml index 2595689b..76b8d0bb 100644 --- a/src/main/dist/conf/sidecar.yaml +++ b/src/main/dist/conf/sidecar.yaml @@ -185,6 +185,8 @@ s3_client: concurrency: 4 thread_name_prefix: s3-client thread_keep_alive_seconds: 60 + api_call_timeout_millis: 60000 # 1 minute + range_get_object_bytes_size: 5242880 # 5 MiB # proxy_config: # uri: # username: diff --git a/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java index 803f4d8a..41cd3dff 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/S3ClientConfiguration.java @@ -35,6 +35,19 @@ public interface S3ClientConfiguration */ long threadKeepAliveSeconds(); + /** + * Returns range bytes size to produce <a href="https://www.rfc-editor.org/rfc/rfc9110.html#name-range">Range header</a> for range-get object. + * The size should not be too large (long request) or too small (too many request). 5 to 10 MiB would be ideal to start with. + * @return range bytes size. + */ + int rangeGetObjectBytesSize(); + + /** + * API call timeout in milliseconds for S3 API calls. + * @return duration of timeout in milliseconds + */ + long apiCallTimeoutMillis(); + /** * Route traffic through a proxy in the environment that requires doing so, when a proxy is specified * diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java index d5bf5767..505f13d7 100644 --- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java +++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/S3ClientConfigurationImpl.java @@ -18,7 +18,10 @@ package org.apache.cassandra.sidecar.config.yaml; +import java.util.concurrent.TimeUnit; + import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.config.S3ClientConfiguration; import org.apache.cassandra.sidecar.config.S3ProxyConfiguration; import org.jetbrains.annotations.NotNull; @@ -31,18 +34,32 @@ public class S3ClientConfigurationImpl implements S3ClientConfiguration public static final String DEFAULT_THREAD_NAME_PREFIX = "s3-client"; public static final String PROXY_PROPERTY = "proxy_config"; + public static final String API_CALL_TIMEOUT_MILLIS = "api_call_timeout_millis"; + public static final String RANGE_GET_OBJECT_BYTES_SIZE = "range_get_object_bytes_size"; + public static final String THREAD_KEEP_ALIVE_SECONDS = "thread_keep_alive_seconds"; + public static final String CONCURRENCY = "concurrency"; + public static final String THREAD_NAME_PREFIX = "thread_name_prefix"; + public static final long DEFAULT_THREAD_KEEP_ALIVE_SECONDS = 60; public static final int DEFAULT_S3_CLIENT_CONCURRENCY = 4; + public static final long DEFAULT_API_CALL_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60); + public static final int DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE = 5 * 1024 * 1024; // 5 MiB - @JsonProperty(value = "thread_name_prefix") + @JsonProperty(value = THREAD_NAME_PREFIX) protected final String threadNamePrefix; - @JsonProperty(value = "concurrency") + @JsonProperty(value = CONCURRENCY) protected final int concurrency; - @JsonProperty(value = "thread_keep_alive_seconds") + @JsonProperty(value = THREAD_KEEP_ALIVE_SECONDS) protected final long threadKeepAliveSeconds; + @JsonProperty(value = RANGE_GET_OBJECT_BYTES_SIZE) + protected final int rangeGetObjectBytesSize; + + @JsonProperty(value = API_CALL_TIMEOUT_MILLIS) + protected final long apiCallTimeoutMillis; + @JsonProperty(value = PROXY_PROPERTY) protected final S3ProxyConfiguration proxyConfig; @@ -51,18 +68,26 @@ public class S3ClientConfigurationImpl implements S3ClientConfiguration this(DEFAULT_THREAD_NAME_PREFIX, DEFAULT_S3_CLIENT_CONCURRENCY, DEFAULT_THREAD_KEEP_ALIVE_SECONDS, + DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE, + DEFAULT_API_CALL_TIMEOUT_MILLIS, new S3ProxyConfigurationImpl()); } public S3ClientConfigurationImpl(String threadNamePrefix, int concurrency, long threadKeepAliveSeconds, + int rangeGetObjectBytesSize, + long apiCallTimeoutMillis, S3ProxyConfiguration proxyConfig) { + Preconditions.checkArgument(apiCallTimeoutMillis > TimeUnit.SECONDS.toMillis(10), + "apiCallTimeout cannot be smaller than 10 seconds. Configured: " + apiCallTimeoutMillis + " ms"); this.threadNamePrefix = threadNamePrefix; this.concurrency = concurrency; this.threadKeepAliveSeconds = threadKeepAliveSeconds; + this.rangeGetObjectBytesSize = rangeGetObjectBytesSize; this.proxyConfig = proxyConfig; + this.apiCallTimeoutMillis = apiCallTimeoutMillis; } /** @@ -70,7 +95,7 @@ public class S3ClientConfigurationImpl implements S3ClientConfiguration */ @NotNull @Override - @JsonProperty(value = "thread_name_prefix") + @JsonProperty(value = THREAD_NAME_PREFIX) public String threadNamePrefix() { return threadNamePrefix; @@ -80,7 +105,7 @@ public class S3ClientConfigurationImpl implements S3ClientConfiguration * {@inheritDoc} */ @Override - @JsonProperty(value = "concurrency") + @JsonProperty(value = CONCURRENCY) public int concurrency() { return concurrency; @@ -90,12 +115,26 @@ public class S3ClientConfigurationImpl implements S3ClientConfiguration * {@inheritDoc} */ @Override - @JsonProperty(value = "thread_keep_alive_seconds") + @JsonProperty(value = THREAD_KEEP_ALIVE_SECONDS) public long threadKeepAliveSeconds() { return threadKeepAliveSeconds; } + @Override + @JsonProperty(value = RANGE_GET_OBJECT_BYTES_SIZE) + public int rangeGetObjectBytesSize() + { + return rangeGetObjectBytesSize; + } + + @Override + @JsonProperty(value = API_CALL_TIMEOUT_MILLIS) + public long apiCallTimeoutMillis() + { + return apiCallTimeoutMillis; + } + /** * {@inheritDoc} */ diff --git a/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java b/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java index ed20d872..d0b71730 100644 --- a/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java +++ b/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java @@ -33,7 +33,6 @@ import org.apache.cassandra.sidecar.cluster.locator.LocalTokenRangesProvider; import org.apache.cassandra.sidecar.common.DataObjectBuilder; import org.apache.cassandra.sidecar.common.response.data.RestoreRangeJson; import org.apache.cassandra.sidecar.common.server.data.RestoreRangeStatus; -import org.apache.cassandra.sidecar.common.utils.Preconditions; import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; import org.apache.cassandra.sidecar.exceptions.RestoreJobExceptions; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; @@ -108,6 +107,7 @@ public class RestoreRange private final RestoreJobProgressTracker tracker; // mutable states + private Long sliceObjectLength; // content length value from the HeadObjectResponse; it should be the same with the slice#compressedSize private boolean existsOnS3 = false; private boolean hasStaged = false; private boolean hasImported = false; @@ -234,9 +234,10 @@ public class RestoreRange tracker.requestOutOfRangeDataCleanup(); } - public void setExistsOnS3() + public void setExistsOnS3(Long objectLength) { this.existsOnS3 = true; + this.sliceObjectLength = objectLength; } public void incrementDownloadAttempt() @@ -329,19 +330,22 @@ public class RestoreRange return readSliceProperty(RestoreSlice::checksum); } - public Long sliceCreationTimeNanos() + public long sliceCreationTimeNanos() { - return readSliceProperty(RestoreSlice::creationTimeNanos); + return Objects.requireNonNull(source, "Source slice does not exist") + .creationTimeNanos(); } - public Long sliceCompressedSize() + public long sliceCompressedSize() { - return readSliceProperty(RestoreSlice::compressedSize); + return Objects.requireNonNull(source, "Source slice does not exist") + .compressedSize(); } - public Long sliceUncompressedSize() + public long sliceUncompressedSize() { - return readSliceProperty(RestoreSlice::uncompressedSize); + return Objects.requireNonNull(source, "Source slice does not exist") + .uncompressedSize(); } public String keyspace() @@ -413,6 +417,11 @@ public class RestoreRange return existsOnS3; } + public long sliceObjectLength() + { + return sliceObjectLength == null ? 0 : sliceObjectLength; + } + public boolean hasStaged() { return hasStaged; @@ -447,8 +456,7 @@ public class RestoreRange public long estimatedSpaceRequiredInBytes() { - Preconditions.checkState(source != null, "Cannot estimate space requirement without source slice"); - return source.compressedSize() + source.uncompressedSize(); + return sliceCompressedSize() + sliceUncompressedSize(); } // ------------- diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java b/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java new file mode 100644 index 00000000..2ad42697 --- /dev/null +++ b/src/main/java/org/apache/cassandra/sidecar/restore/HttpRangesIterator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.restore; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.cassandra.sidecar.common.utils.HttpRange; +import org.apache.cassandra.sidecar.common.utils.Preconditions; + +/** + * Iterator over the produced http ranges + */ +public class HttpRangesIterator implements Iterator<HttpRange> +{ + private final long totalBytes; + private final int rangeSize; + private long offset = 0; + + public HttpRangesIterator(long totalBytes, int rangeSize) + { + Preconditions.checkArgument(totalBytes > 0, "totalBytes must be positive"); + Preconditions.checkArgument(rangeSize > 0, "rangeSize must be positive"); + this.totalBytes = totalBytes; + this.rangeSize = rangeSize; + } + + @Override + public boolean hasNext() + { + return offset < totalBytes; + } + + @Override + public HttpRange next() + { + if (!hasNext()) + { + throw new NoSuchElementException("No more HttpRanges"); + } + long end = Math.min(offset + rangeSize, totalBytes) - 1; + HttpRange range = HttpRange.of(offset, end); + offset = end + 1; + return range; + } +} diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java index 036f4eb6..a68eb09e 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java @@ -274,8 +274,6 @@ public class RestoreProcessor implements PeriodicTask else { LOGGER.error("Slice failed with unrecoverable failure. sliceKey={}", range.sliceKey(), cause); - // fail the slice and mark the slice has failed on its owning instance. - // In the phase 1 implementation, all slices of the job get aborted range.fail(RestoreJobExceptions.toFatal(cause)); if (range.job().isManagedBySidecar()) { diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java index a57867cb..763b9c12 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java @@ -199,9 +199,10 @@ public class RestoreRangeTask implements RestoreRangeHandler return failOnCancelled(range, null) // 1. check object existence and validate eTag / checksum .compose(value -> checkObjectExistence()) - // 2. download slice/object when the remote object exists .compose(value -> failOnCancelled(range, value)) - .compose(headObject -> downloadSlice()) + // 2. download slice/object (with partNumber) when the remote object exists + .compose(value -> downloadSlice()) + .compose(value -> failOnCancelled(range, value)) // 3. unzip the file and import/commit .compose(this::unzipAndImport); } @@ -218,10 +219,10 @@ public class RestoreRangeTask implements RestoreRangeHandler // even if the file already exists on disk, we should still check the object existence return fromCompletionStage(s3Client.objectExists(range)) - .compose(exists -> { // on success + .compose(headObjectResponse -> { // on success long durationNanos = currentTimeInNanos() - range.sliceCreationTimeNanos(); metrics.sliceReplicationTime.metric.update(durationNanos, TimeUnit.NANOSECONDS); - range.setExistsOnS3(); + range.setExistsOnS3(headObjectResponse.contentLength()); LOGGER.debug("Slice is now available on S3. jobId={} sliceKey={} replicationTimeNanos={}", range.jobId(), range.sliceKey(), durationNanos); return Future.succeededFuture(); @@ -285,7 +286,7 @@ public class RestoreRangeTask implements RestoreRangeHandler LOGGER.info("Begin downloading restore slice. sliceKey={}", range.sliceKey()); Future<File> future = - fromCompletionStage(s3Client.downloadObjectIfAbsent(range)) + s3Client.downloadObjectIfAbsent(range, executorPool) .recover(cause -> { // converts to restore job exception LOGGER.warn("Failed to download restore slice. sliceKey={}", range.sliceKey(), cause); diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java index cf3b12d7..e0fc259c 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java @@ -21,8 +21,7 @@ package org.apache.cassandra.sidecar.restore; import java.io.File; import java.io.IOException; import java.nio.channels.Channel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.FileAlreadyExistsException; +import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.EnumSet; @@ -38,8 +37,13 @@ import com.google.common.util.concurrent.SidecarRateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.vertx.core.Future; import org.apache.cassandra.sidecar.common.data.StorageCredentials; import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils; +import org.apache.cassandra.sidecar.common.utils.HttpRange; +import org.apache.cassandra.sidecar.common.utils.Preconditions; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.yaml.S3ClientConfigurationImpl; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreRange; import org.apache.cassandra.sidecar.exceptions.RestoreJobFatalException; @@ -47,8 +51,8 @@ import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.async.ResponsePublisher; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -66,6 +70,7 @@ public class StorageClient private static final Logger LOGGER = LoggerFactory.getLogger(StorageClient.class); private final S3AsyncClient client; + private final int rangeHeaderSize; private final SidecarRateLimiter downloadRateLimiter; private final Map<UUID, Credentials> credentialsProviders = new ConcurrentHashMap<>(); @@ -73,12 +78,15 @@ public class StorageClient StorageClient(S3AsyncClient client) { // no rate-limiting - this(client, SidecarRateLimiter.create(-1)); + this(client, + S3ClientConfigurationImpl.DEFAULT_RANGE_GET_OBJECT_BYTES_SIZE, + SidecarRateLimiter.create(-1)); } - StorageClient(S3AsyncClient client, SidecarRateLimiter downloadRateLimiter) + StorageClient(S3AsyncClient client, int rangeHeaderSize, SidecarRateLimiter downloadRateLimiter) { this.client = client; + this.rangeHeaderSize = rangeHeaderSize; this.downloadRateLimiter = downloadRateLimiter; } @@ -117,14 +125,18 @@ public class StorageClient credentialsProviders.remove(jobId); } + /** + * Check object existence with matching checksum + * @param range restore range + * @return future of HeadObjectResponse + */ public CompletableFuture<HeadObjectResponse> objectExists(RestoreRange range) { Credentials credentials = credentialsProviders.get(range.jobId()); if (credentials == null) { - CompletableFuture<HeadObjectResponse> failedFuture = new CompletableFuture<>(); - failedFuture.completeExceptionally(credentialsNotFound(range)); - return failedFuture; + LOGGER.warn("Credentials not found. jobId={}", range.jobId()); + return failedFuture(credentialsNotFound(range)); } // https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html @@ -140,15 +152,13 @@ public class StorageClient .whenComplete(logCredentialOnRequestFailure(range, credentials)); } - public CompletableFuture<File> downloadObjectIfAbsent(RestoreRange range) + public Future<File> downloadObjectIfAbsent(RestoreRange range, TaskExecutorPool taskExecutorPool) { Credentials credentials = credentialsProviders.get(range.jobId()); if (credentials == null) { LOGGER.warn("Credentials to download object not found. jobId={}", range.jobId()); - CompletableFuture<File> failedFuture = new CompletableFuture<>(); - failedFuture.completeExceptionally(credentialsNotFound(range)); - return failedFuture; + return Future.failedFuture(credentialsNotFound(range)); } Path objectPath = range.stagedObjectPath(); @@ -163,27 +173,22 @@ public class StorageClient // TODO 2: extend restore_job table to define the multi-part upload chunk size, in order to perform local // verification of the etag/checksum // For now, we just skip download, assuming the scenario is rare and no maliciousness - return CompletableFuture.completedFuture(object); + return Future.succeededFuture(object); } - if (!object.getParentFile().mkdirs()) + try { - LOGGER.warn("Error occurred while creating directory. jobId={} sliceKey={}", - range.jobId(), range.sliceKey()); - + Files.createDirectories(objectPath.getParent()); + } + catch (Exception ex) + { + LOGGER.error("Error occurred while creating directory. jobId={} sliceKey={}", + range.jobId(), range.sliceKey(), ex); + return Future.failedFuture(ex); } LOGGER.info("Downloading object. jobId={} sliceKey={}", range.jobId(), range.sliceKey()); - // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html - GetObjectRequest request = - GetObjectRequest.builder() - .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider())) - .bucket(range.sliceBucket()) - .key(range.sliceKey()) - .build(); - return rateLimitedGetObject(range, client, request, objectPath) - .whenComplete(logCredentialOnRequestFailure(range, credentials)) - .thenApply(res -> object); + return rangeGetObject(range, credentials, objectPath, taskExecutorPool); } public void close() @@ -198,6 +203,71 @@ public class StorageClient } } + // Range-GetObject with http range header + private Future<File> rangeGetObject(RestoreRange range, Credentials credentials, Path destinationPath, TaskExecutorPool taskExecutorPool) + { + HttpRangesIterator iterator = new HttpRangesIterator(range.sliceObjectLength(), rangeHeaderSize); + Preconditions.checkState(iterator.hasNext(), "SliceObject is empty. sliceKey=" + range.sliceKey()); + + SeekableByteChannel seekableByteChannel; + try + { + seekableByteChannel = Files.newByteChannel(destinationPath, EnumSet.of(CREATE_NEW, WRITE)); + } + catch (IOException e) + { + LOGGER.error("Failed to create file channel for downloading. jobId={} sliceKey={}", + range.jobId(), range.sliceKey(), e); + return Future.failedFuture(e); + } + Future<SeekableByteChannel> channelFuture = Future.succeededFuture(seekableByteChannel); + while (iterator.hasNext()) + { + HttpRange httpRange = iterator.next(); + // Schedule each part download in the taskExecutorPool with ordered == true. + // Parts are downloaded one by one in sequence. + channelFuture = channelFuture.compose(channel -> taskExecutorPool.executeBlocking(() -> { + // the length is guaranteed to be no greater than rangeHeaderSize (int) + int actualRangeSize = (int) httpRange.length(); + // throttle the download throughput + downloadRateLimiter.acquire(actualRangeSize); + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html + GetObjectRequest request = + GetObjectRequest.builder() + .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider())) + .bucket(range.sliceBucket()) + .key(range.sliceKey()) + .range(httpRange.toString()) + .build(); + // note: it is a blocking get; No parallelism in getting the ranges of the same object + ResponseBytes<GetObjectResponse> bytes = client.getObject(request, AsyncResponseTransformer.toBytes()).get(); + channel.write(bytes.asByteBuffer()); + return channel; + }, true)); + } + return channelFuture + // eventually is evaluated in both success and failure cases + .eventually(() -> taskExecutorPool.runBlocking(() -> { + ThrowableUtils.propagate(() -> closeChannel(seekableByteChannel)); + }, true)) + .compose(channel -> Future.succeededFuture(destinationPath.toFile()), + failure -> { // failure mapper; log the credential on failure + LOGGER.error("Request is not successful. jobId={} credentials={}", + range.jobId(), credentials.readCredentials, failure); + try + { + Files.deleteIfExists(destinationPath); + } + catch (IOException e) + { + LOGGER.warn("Failed to clean up the failed download. jobId={} sliceKey={}", + range.jobId(), range.sliceKey(), e); + failure.addSuppressed(e); + } + return Future.failedFuture(failure); + }); + } + private boolean matches(Credentials c1, Credentials c2) { if (c1 == c2) @@ -225,107 +295,30 @@ public class StorageClient return (ignored, cause) -> { if (cause != null) { - LOGGER.error("GetObjectRequest is not successful. jobId={} credentials={}", + LOGGER.error("Request is not successful. jobId={} credentials={}", range.jobId(), credentials.readCredentials, cause); } }; } /** - * Returns a {@link CompletableFuture} to the {@link GetObjectResponse}. It writes the object from S3 to a file - * applying rate limiting on the download throughput. - * - * @param range the range to be restored - * @param client the S3 client - * @param request the {@link GetObjectRequest request} - * @param destinationPath the path where the object will be persisted - * @return a {@link CompletableFuture} of the {@link GetObjectResponse} - */ - private CompletableFuture<GetObjectResponse> rateLimitedGetObject(RestoreRange range, - S3AsyncClient client, - GetObjectRequest request, - Path destinationPath) - { - return client.getObject(request, AsyncResponseTransformer.toPublisher()) - .thenCompose(responsePublisher -> subscribeRateLimitedWrite(range, - destinationPath, - responsePublisher)); - } - - /** - * Returns a {@link CompletableFuture} to the {@link GetObjectResponse} and consuming the GetObjectResponse - * by subscribing to the {@code publisher}. Applying backpressure on the received bytes by rate limiting - * the download throughput using the {@code downloadRateLimiter} object. + * Closes the channel if not-null and open * - * @param range the range to be restored - * @param destinationPath the path where the object will be persisted - * @param publisher the {@link ResponsePublisher} - * @return a {@link CompletableFuture} to the {@link GetObjectResponse} + * @param channel the channel to be closed */ - CompletableFuture<GetObjectResponse> subscribeRateLimitedWrite(RestoreRange range, - Path destinationPath, - ResponsePublisher<GetObjectResponse> publisher) + private void closeChannel(Channel channel) throws IOException { - // closed at the completion of subscribeFuture - WritableByteChannel channel; - try - { - // always create new file, and fails if it already exists - // this is consistent with the expectation that we won't - // re-download a file that already exists - // The channel is closed on completion of streaming asynchronously - channel = Files.newByteChannel(destinationPath, EnumSet.of(CREATE_NEW, WRITE)); - } - catch (FileAlreadyExistsException fileAlreadyExistsException) - { - LOGGER.info("Skipping download. File already exists. jobId={} sliceKey={}", - range.jobId(), range.sliceKey()); - return CompletableFuture.completedFuture(publisher.response()); - } - catch (IOException e) + if (channel != null && channel.isOpen()) { - LOGGER.error("Error occurred while creating channel. destinationPath={} jobId={} sliceKey={}", - destinationPath, range.jobId(), range.sliceKey(), e); - throw new RuntimeException(e); + channel.close(); } - // CompletableFuture that will be notified when all events have been consumed or if an error occurs. - return publisher - .subscribe(buffer -> { - downloadRateLimiter.acquire(buffer.remaining()); // apply backpressure on the received bytes - ThrowableUtils.propagate(() -> channel.write(buffer)); - }) - .whenComplete((v, subscribeThrowable) -> { - // finally close the channel and log error if failed - closeChannel(channel); - - if (subscribeThrowable != null) - { - LOGGER.error("Error occurred while downloading. jobId={} sliceKey={}", - range.jobId(), range.sliceKey(), subscribeThrowable); - } - }) - .thenApply(v -> publisher.response()); } - /** - * Closes the channel if not-null. Wraps any {@link IOException} in a {@link RuntimeException} - * - * @param channel the channel to be closed - * @throws RuntimeException wrapping any {@link IOException} - */ - private void closeChannel(Channel channel) + private static <T> CompletableFuture<T> failedFuture(Throwable cause) { - if (channel != null && channel.isOpen()) - { - try - { - channel.close(); - } - catch (IOException e) - { - LOGGER.error("Failed to close channel", e); - } - } + CompletableFuture<T> failure = new CompletableFuture<>(); + failure.completeExceptionally(cause); + return failure; } private static class Credentials diff --git a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java index eafe600e..a14a54d4 100644 --- a/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java +++ b/src/main/java/org/apache/cassandra/sidecar/restore/StorageClientPool.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.restore; import java.net.URI; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.UUID; @@ -28,10 +29,13 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.SidecarRateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.inject.Inject; import com.google.inject.Singleton; import com.google.inject.name.Named; +import io.netty.handler.ssl.OpenSsl; import org.apache.cassandra.sidecar.config.S3ClientConfiguration; import org.apache.cassandra.sidecar.config.S3ProxyConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; @@ -52,6 +56,8 @@ import software.amazon.awssdk.utils.ThreadFactoryBuilder; @Singleton public class StorageClientPool implements SdkAutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(StorageClientPool.class); + private final Map<String, StorageClient> clientPool = new ConcurrentHashMap<>(); private final Map<UUID, StorageClient> clientByJobId = new ConcurrentHashMap<>(); private final ThreadPoolExecutor sharedExecutor; @@ -100,12 +106,18 @@ public class StorageClientPool implements SdkAutoCloseable private StorageClient storageClient(String region) { return clientPool.computeIfAbsent(region, k -> { + logIfOpenSslUnavailable(); + Map<SdkAdvancedAsyncClientOption<?>, ?> advancedOptions = Collections.singletonMap( SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, sharedExecutor ); + Duration apiCallTimeout = Duration.ofSeconds(clientConfig.apiCallTimeoutMillis()); S3AsyncClientBuilder clientBuilder = S3AsyncClient.builder() .region(Region.of(region)) + // Setting the same timeout for apiCall and apiCallAttempt; There is 1 attempt effectively, as we do retry in the application + .overrideConfiguration(b -> b.apiCallAttemptTimeout(apiCallTimeout) + .apiCallTimeout(apiCallTimeout)) .asyncConfiguration(b -> b.advancedOptions(advancedOptions)); S3ProxyConfiguration s3ProxyConfiguration = clientConfig.proxyConfig(); URI endpointOverride = s3ProxyConfiguration.endpointOverride(); @@ -113,6 +125,7 @@ public class StorageClientPool implements SdkAutoCloseable clientBuilder.endpointOverride(endpointOverride) .forcePathStyle(true); + NettyNioAsyncHttpClient.Builder nettyClientBuilder = NettyNioAsyncHttpClient.builder(); S3ProxyConfiguration config = clientConfig.proxyConfig(); if (config.isPresent()) { @@ -123,11 +136,11 @@ public class StorageClientPool implements SdkAutoCloseable .username(config.username()) .password(config.password()) .build(); - NettyNioAsyncHttpClient.Builder httpClientBuilder = NettyNioAsyncHttpClient.builder(); - clientBuilder.httpClientBuilder(httpClientBuilder.proxyConfiguration(proxyConfig)); + nettyClientBuilder.proxyConfiguration(proxyConfig); } + clientBuilder.httpClientBuilder(nettyClientBuilder); - return new StorageClient(clientBuilder.build(), ingressFileRateLimiter); + return new StorageClient(clientBuilder.build(), clientConfig.rangeGetObjectBytesSize(), ingressFileRateLimiter); }); } @@ -138,4 +151,12 @@ public class StorageClientPool implements SdkAutoCloseable clientPool.clear(); clientByJobId.clear(); } + + private void logIfOpenSslUnavailable() + { + if (!OpenSsl.isAvailable()) + { + LOGGER.info("OpenSSL is not available for S3AsyncClient"); + } + } } diff --git a/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java b/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java index 53221149..a6d8edd1 100644 --- a/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java +++ b/src/main/java/org/apache/cassandra/sidecar/server/ServerVerticle.java @@ -138,7 +138,8 @@ public class ServerVerticle extends AbstractVerticle // to a vert.x bug. // See following comment for details if (ex.getMessage() != null - && ex.getMessage().contains("Unable to update traffic shaping options because the server was not configured to use traffic shaping during startup")) + && ex.getMessage().contains("Unable to update traffic shaping options " + + "because the server was not configured to use traffic shaping during startup")) { // TODO: we need to rollback this change once vert.x fixes this problem // Swallowing the exception here is okay for now until we get a proper fix in vert.x. diff --git a/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java b/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java index 79e5cfd6..e6d2e25e 100644 --- a/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java +++ b/src/test/containerTest/org/apache/cassandra/sidecar/restore/StorageClientTest.java @@ -25,29 +25,36 @@ import java.io.OutputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.SidecarRateLimiter; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import com.adobe.testing.s3mock.testcontainers.S3MockContainer; import com.datastax.driver.core.utils.UUIDs; +import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.common.data.RestoreJobSecrets; import org.apache.cassandra.sidecar.common.data.RestoreJobStatus; import org.apache.cassandra.sidecar.common.data.SSTableImportOptions; import org.apache.cassandra.sidecar.common.data.StorageCredentials; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool; +import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl; import org.apache.cassandra.sidecar.db.RestoreJob; import org.apache.cassandra.sidecar.db.RestoreRange; import org.assertj.core.data.Percentage; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.ApiCallTimeoutException; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -72,29 +79,28 @@ class StorageClientTest private static final String testBucket = "bucket"; private static final String testData = "testData"; private static final String checksum = BinaryUtils.toHex(Md5Utils.computeMD5Hash(testData.getBytes())); - private static final String testFileName = "testFile"; private static final String largeTestFileName = "largeTestFile"; private static final String testEncKeyRef = "arn:aws:kms:us-east-1:1234567890:key/valid-test-key-ref"; - private S3MockContainer s3Mock; - private S3AsyncClient s3AsyncClient; - private StorageClient client; - private RestoreJob restoreJob; - - private RestoreRange testRange; - private RestoreRange largeTestRange; + private static S3MockContainer s3Mock; + private static S3AsyncClient s3AsyncClient; + private static StorageClient client; + private static RestoreJob restoreJob; + private static RestoreRange testRange; + private static RestoreRange largeTestRange; + private static Path largeFilePath; + private static TaskExecutorPool taskExecutorPool; @TempDir - Path testFolder; + private static Path testFolder; - @BeforeEach - void setup() throws Exception + @BeforeAll + static void setup() throws Exception { s3Mock = new S3MockContainer("3.5.1") .withValidKmsKeys(testEncKeyRef) .withInitialBuckets(testBucket); s3Mock.start(); - String httpsEndpoint = s3Mock.getHttpsEndpoint(); // test credential defined in s3mock StorageCredentials credentials = StorageCredentials.builder() .accessKeyId("foo") @@ -107,36 +113,44 @@ class StorageClientTest .jobSecrets(new RestoreJobSecrets(credentials, credentials)) .sstableImportOptions(SSTableImportOptions.defaults()) .build(); - s3AsyncClient = S3AsyncClient.builder() - .region(Region.US_WEST_1) - // provide a dummy credential to prevent client from identifying credentials - .credentialsProvider(StaticCredentialsProvider.create( - AwsBasicCredentials.create("foo", "bar"))) - .endpointOverride(new URI(httpsEndpoint)) - // required to prevent client from "manipulating" the object path - .forcePathStyle(true) - .httpClient(NettyNioAsyncHttpClient.builder().buildWithDefaults( - AttributeMap.builder() - .put(TRUST_ALL_CERTIFICATES, Boolean.TRUE) - .build())) - .build(); + s3AsyncClient = buildS3AsyncClient(Duration.ofSeconds(60)); client = new StorageClient(s3AsyncClient); client.authenticate(restoreJob); - Path testPath = testFolder.resolve(testFileName); - Files.deleteIfExists(testPath); - testRange = getMockRange(restoreJob.jobId, testBucket, "key", checksum, testPath); + Path stageDirPath = testFolder.resolve("stage"); + testRange = getMockRange(restoreJob.jobId, testBucket, "key", checksum, stageDirPath, testData.length()); putObject(testRange, testData); - Path largeFilePath = prepareTestFile(testFolder, largeTestFileName, LARGE_FILE_IN_BYTES); // 1MB + largeFilePath = prepareTestFile(testFolder, largeTestFileName, LARGE_FILE_IN_BYTES); // 1MB largeTestRange = getMockRange(restoreJob.jobId, testBucket, "largeKey", - computeChecksum(largeFilePath), largeFilePath); + computeChecksum(largeFilePath), stageDirPath, + LARGE_FILE_IN_BYTES); putObject(largeTestRange, largeFilePath); - // delete file after putting it in S3 - Files.deleteIfExists(largeFilePath); + + taskExecutorPool = new ExecutorPools(Vertx.vertx(), new ServiceConfigurationImpl()).internal(); + } + + static S3AsyncClient buildS3AsyncClient(Duration apiCallTimeout) throws Exception + { + String httpsEndpoint = s3Mock.getHttpsEndpoint(); + return S3AsyncClient.builder() + .region(Region.US_WEST_1) + .overrideConfiguration(b -> b.apiCallTimeout(apiCallTimeout) + .apiCallAttemptTimeout(apiCallTimeout)) + // provide a dummy credential to prevent client from identifying credentials + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("foo", "bar"))) + .endpointOverride(new URI(httpsEndpoint)) + // required to prevent client from "manipulating" the object path + .forcePathStyle(true) + .httpClient(NettyNioAsyncHttpClient.builder().buildWithDefaults( + AttributeMap.builder() + .put(TRUST_ALL_CERTIFICATES, Boolean.TRUE) + .build())) + .build(); } - @AfterEach - void cleanup() + @AfterAll + static void cleanup() { s3Mock.stop(); client.close(); @@ -183,7 +197,8 @@ class StorageClientTest @Test void testGetObject() throws Exception { - File downloaded = client.downloadObjectIfAbsent(testRange).get(); + File downloaded = client.downloadObjectIfAbsent(testRange, taskExecutorPool) + .toCompletionStage().toCompletableFuture().get(); assertThat(downloaded.exists()).isTrue(); assertThat(new String(Files.readAllBytes(downloaded.toPath()))).isEqualTo(testData); } @@ -192,27 +207,52 @@ class StorageClientTest void testGetObjectHasExistingFileOnDisk() throws Exception { Path existingPath = testFolder.resolve(UUID.randomUUID().toString()); + Files.createDirectories(existingPath); + Files.createFile(existingPath.resolve("key")); RestoreRange sliceHasFileOnDisk = getMockRange(restoreJob.jobId, testBucket, "key", checksum, existingPath); - File downloaded = client.downloadObjectIfAbsent(sliceHasFileOnDisk).get(); + File downloaded = client.downloadObjectIfAbsent(sliceHasFileOnDisk, taskExecutorPool) + .toCompletionStage().toCompletableFuture().get(); assertThat(downloaded.getAbsolutePath()).isEqualTo(existingPath.resolve("key").toString()); } @Test void testGetObjectThroughputRateLimited() throws Exception { - // only allow 1/4 the speed of transfer - client = new StorageClient(s3AsyncClient, SidecarRateLimiter.create(LARGE_FILE_IN_BYTES >> 2)); + // only allow 1/4 the speed of transfer; each request downloads 128 KiB + StorageClient client = new StorageClient(s3AsyncClient, 128 * 1024, SidecarRateLimiter.create(LARGE_FILE_IN_BYTES >> 2)); client.authenticate(restoreJob); // Download should take around 4 seconds (256 KB/s for a 1MB file) long startNanos = System.nanoTime(); - File downloaded = client.downloadObjectIfAbsent(largeTestRange).get(); + File downloaded = client.downloadObjectIfAbsent(largeTestRange, taskExecutorPool) + .toCompletionStage().toCompletableFuture().get(); assertThat(downloaded.exists()).isTrue(); long elapsedNanos = System.nanoTime() - startNanos; assertThat(TimeUnit.NANOSECONDS.toMillis(elapsedNanos)).isCloseTo(TimeUnit.SECONDS.toMillis(4), Percentage.withPercentage(95)); + byte[] downloadedBytes = Files.readAllBytes(downloaded.toPath()); + byte[] originalBytes = Files.readAllBytes(largeFilePath); + assertThat(Arrays.equals(downloadedBytes, originalBytes)).isTrue(); + } + + @Test + void testApiCallTimeout() throws Exception + { + try (S3AsyncClient s3Client = buildS3AsyncClient(Duration.ofMillis(1))) + { + StorageClient client = new StorageClient(s3Client); + client.authenticate(restoreJob); + assertThatThrownBy(() -> client.objectExists(testRange).get()) + .hasMessageContaining(" Client execution did not complete before the specified timeout configuration: 1 millis") + .hasRootCauseInstanceOf(ApiCallTimeoutException.class); + } } private RestoreRange getMockRange(UUID jobId, String bucket, String key, String checksum, Path localPath) + { + return getMockRange(jobId, bucket, key, checksum, localPath, 0); + } + + private static RestoreRange getMockRange(UUID jobId, String bucket, String key, String checksum, Path localPath, long length) { RestoreRange mock = mock(RestoreRange.class, RETURNS_DEEP_STUBS); when(mock.jobId()).thenReturn(jobId); @@ -220,6 +260,7 @@ class StorageClientTest when(mock.sliceKey()).thenReturn(key); when(mock.sliceChecksum()).thenReturn(checksum); when(mock.stageDirectory()).thenReturn(localPath); + when(mock.sliceObjectLength()).thenReturn(length); if (localPath != null) { when(mock.stagedObjectPath()).thenReturn(localPath.resolve(key)); @@ -227,7 +268,7 @@ class StorageClientTest return mock; } - private void putObject(RestoreRange range, String stringData) throws Exception + private static void putObject(RestoreRange range, String stringData) throws Exception { PutObjectRequest request = PutObjectRequest.builder() .bucket(range.sliceBucket()) @@ -237,7 +278,7 @@ class StorageClientTest s3AsyncClient.putObject(request, AsyncRequestBody.fromString(stringData)).get(); } - private void putObject(RestoreRange range, Path path) throws Exception + private static void putObject(RestoreRange range, Path path) throws Exception { PutObjectRequest request = PutObjectRequest.builder() .bucket(range.sliceBucket()) diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java new file mode 100644 index 00000000..1eee3018 --- /dev/null +++ b/src/test/java/org/apache/cassandra/sidecar/restore/HttpRangesIteratorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.restore; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.common.utils.HttpRange; +import org.assertj.core.util.Lists; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class HttpRangesIteratorTest +{ + @Test + void testIteration() + { + HttpRangesIterator iterator = new HttpRangesIterator(19, 10); + List<HttpRange> ranges = Lists.newArrayList(iterator); + assertThat(ranges).hasSize(2); + assertThat(ranges.get(0)).hasToString("bytes=0-9"); + assertThat(ranges.get(1)).hasToString("bytes=10-18"); + } + + @Test + void testSingleRange() + { + HttpRangesIterator iterator = new HttpRangesIterator(19, 20); + List<HttpRange> ranges = Lists.newArrayList(iterator); + assertThat(ranges).hasSize(1); + assertThat(ranges.get(0)).hasToString("bytes=0-18"); + } + + @Test + void testInvalidArguments() + { + assertThatThrownBy(() -> new HttpRangesIterator(0, 1)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("totalBytes must be positive"); + + assertThatThrownBy(() -> new HttpRangesIterator(1, 0)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("rangeSize must be positive"); + } +} diff --git a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java index 2203a1f0..dc6a7a70 100644 --- a/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/restore/RestoreRangeTaskTest.java @@ -80,6 +80,7 @@ import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -138,9 +139,11 @@ class RestoreRangeTaskTest void testRestoreSucceeds() { RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED); - when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null)); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)) - .thenReturn(CompletableFuture.completedFuture(new File("."))); + HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); + when(headObjectResponse.contentLength()).thenReturn(1L); + when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse)); + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) + .thenReturn(Future.succeededFuture(new File("."))); RestoreRangeTask task = createTask(mockRange, job); Promise<RestoreRange> promise = Promise.promise(); @@ -167,8 +170,8 @@ class RestoreRangeTaskTest RestoreJob job = RestoreJobTest.createTestingJob(UUIDs.timeBased(), RestoreJobStatus.CREATED); // the existence of the slice is already confirmed by the s3 client when(mockRange.existsOnS3()).thenReturn(true); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)) - .thenReturn(CompletableFuture.completedFuture(new File("."))); + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) + .thenReturn(Future.succeededFuture(new File("."))); RestoreRangeTask task = createTask(mockRange, job); Promise<RestoreRange> promise = Promise.promise(); @@ -220,9 +223,11 @@ class RestoreRangeTaskTest doReturn(true).when(job).isManagedBySidecar(); when(mockRange.job()).thenReturn(job); when(mockRange.stagedObjectPath()).thenReturn(Paths.get("nonexist")); - when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null)); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)) - .thenReturn(CompletableFuture.completedFuture(new File("."))); + HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); + when(headObjectResponse.contentLength()).thenReturn(1L); + when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse)); + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) + .thenReturn(Future.succeededFuture(new File("."))); RestoreRangeTask task = createTask(mockRange, job); Promise<RestoreRange> promise = Promise.promise(); @@ -246,7 +251,7 @@ class RestoreRangeTaskTest when(mockRange.stagedObjectPath()).thenReturn(stagedPath); when(mockStorageClient.objectExists(mockRange)) .thenThrow(new RuntimeException("Should not call this method")); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)) + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) .thenThrow(new RuntimeException("Should not call this method")); RestoreRangeTask task = createTask(mockRange, job); @@ -319,8 +324,11 @@ class RestoreRangeTaskTest Path stagedPath = testFolder.resolve("slice.zip"); when(mockRange.stagedObjectPath()).thenReturn(stagedPath); when(mockRange.isCancelled()).thenReturn(false); - when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null)); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)).thenThrow(new RuntimeException("Random exception")); + HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); + when(headObjectResponse.contentLength()).thenReturn(1L); + when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse)); + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) + .thenThrow(new RuntimeException("Random exception")); Promise<RestoreRange> promise = Promise.promise(); @@ -356,8 +364,11 @@ class RestoreRangeTaskTest Path stagedPath = testFolder.resolve("slice.zip"); when(mockRange.stagedObjectPath()).thenReturn(stagedPath); when(mockRange.isCancelled()).thenReturn(false); - when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(null)); - when(mockStorageClient.downloadObjectIfAbsent(mockRange)).thenReturn(CompletableFuture.completedFuture(null)); + HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); + when(headObjectResponse.contentLength()).thenReturn(1L); + when(mockStorageClient.objectExists(mockRange)).thenReturn(CompletableFuture.completedFuture(headObjectResponse)); + when(mockStorageClient.downloadObjectIfAbsent(eq(mockRange), any(TaskExecutorPool.class))) + .thenReturn(Future.succeededFuture(null)); Promise<RestoreRange> promise = Promise.promise(); diff --git a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java index e19d8237..0da75a32 100644 --- a/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java +++ b/src/test/java/org/apache/cassandra/sidecar/server/ServerSSLTest.java @@ -277,7 +277,7 @@ class ServerSSLTest .compose(s -> validateHealthEndpoint(clientWithP12Keystore(true, false))) .onComplete(context.failing(throwable -> { assertThat(throwable).isNotNull() - .hasMessageContaining("Received fatal alert: bad_certificate"); + .hasMessageContaining("Received fatal alert: certificate_required"); context.completeNow(); })); } diff --git a/src/test/resources/logback-sidecar.xml b/src/test/resources/logback-test.xml similarity index 100% rename from src/test/resources/logback-sidecar.xml rename to src/test/resources/logback-test.xml diff --git a/vertx-client/build.gradle b/vertx-client/build.gradle index d9744bbc..1de27609 100644 --- a/vertx-client/build.gradle +++ b/vertx-client/build.gradle @@ -56,20 +56,20 @@ dependencies { api(group: 'io.vertx', name: 'vertx-web-client', version: "${project.vertxVersion}") { exclude group: 'com.fasterxml.jackson.core', module: 'jackson-core' } - implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: '2.0.61.Final') // for openSSL + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}") // for openSSL // The newer versions (2.0.48.Final+) of tcnative require explicit dependency declarations, // including the classifiers. See https://netty.io/wiki/forked-tomcat-native.html#gradle-and-bazel // for details. // openSSL native libraries for linux x86-64 architectures - implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: '2.0.61.Final', classifier: 'linux-x86_64') + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'linux-x86_64') // openSSL native libraries for macOS aarch-64 architectures - implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: '2.0.61.Final', classifier: 'osx-aarch_64') + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'osx-aarch_64') // openSSL native libraries for linux aarch-64 architectures - implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: '2.0.61.Final', classifier: 'linux-aarch_64') + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'linux-aarch_64') // openSSL native libraries for macOS x86-64 architectures - implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: '2.0.61.Final', classifier: 'osx-x86_64') + implementation(group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: "${project.boringSslVersion}", classifier: 'osx-x86_64') implementation("org.slf4j:slf4j-api:${project.slf4jVersion}") compileOnly('org.jetbrains:annotations:23.0.0') --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org