This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cd024c0 CASSSIDECAR-359: Avoid resuming stream early during SSTable
upload (#271)
2cd024c0 is described below
commit 2cd024c0727c18671ed7c3cfcce6b0a78207dd4b
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri Oct 24 12:59:52 2025 -0700
CASSSIDECAR-359: Avoid resuming stream early during SSTable upload (#271)
Patch by Francisco Guerrero; reviewed by Yifan Cai for CASSSIDECAR-359
---
CHANGES.txt | 1 +
.../cassandra/sidecar/utils/SSTableUploader.java | 12 ++++++---
.../sstableuploads/BaseUploadsHandlerTest.java | 29 ++++++++++++++++++++++
.../sstableuploads/SSTableUploadHandlerTest.java | 12 +++++++++
4 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 14e7fcb0..4decb80f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Avoid resuming stream early during SSTable upload (CASSSIDECAR-359)
* Add cache for Authorization layer (CASSSIDECAR-357)
* Avoid creating objects in the CassandraAdapter implementation
(CASSSIDECAR-355)
* Code refactoring for some configuration classes and migrate
RoleBasedAuthorizationIntegrationTest to integration-tests (CASSSIDECAR-351)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 8adfd5ce..954e6076 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -40,6 +40,7 @@ import io.vertx.core.file.OpenOptions;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* A class that handles SSTable Uploads
@@ -107,10 +108,13 @@ public class SSTableUploader
LOGGER.debug("Uploading data to={}", tempFilename);
return fs.open(tempFilename, new OpenOptions()) // open the temp file
.map(file -> new RateLimitedWriteStream(rateLimiter, file))
- .compose(file -> {
- readStream.resume();
- return readStream.pipeTo(file);
- }); // stream to file
+ .compose(file -> pipeStreamToFile(readStream, file)); //
stream to file
+ }
+
+ @VisibleForTesting
+ protected Future<Void> pipeStreamToFile(ReadStream<Buffer> readStream,
RateLimitedWriteStream file)
+ {
+ return readStream.pipeTo(file);
}
private Future<String> createTempFile(String uploadDirectory, String
componentFileName, String permissions)
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
index 9a3231ba..64b4ebc0 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
@@ -44,9 +44,13 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Singleton;
+import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
+import io.vertx.core.Future;
import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxTestContext;
import org.apache.cassandra.sidecar.TestCassandraAdapterDelegate;
@@ -64,6 +68,7 @@ import
org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
import org.apache.cassandra.sidecar.modules.SidecarModules;
import org.apache.cassandra.sidecar.server.Server;
import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
+import org.apache.cassandra.sidecar.utils.SSTableUploader;
import static
org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_CHECK_INTERVAL;
import static
org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_INBOUND_FILE_GLOBAL_BANDWIDTH_LIMIT;
@@ -94,11 +99,13 @@ class BaseUploadsHandlerTest
protected TrafficShapingConfiguration trafficShapingConfiguration;
protected SidecarRateLimiter ingressFileRateLimiter;
protected CassandraTableOperations mockCFOperations;
+ protected long artificialDelayInMillisBeforeStreamingToFile;
@BeforeEach
void setup() throws InterruptedException, IOException
{
+ artificialDelayInMillisBeforeStreamingToFile = 0;
canonicalTemporaryPath = temporaryPath.toFile().getCanonicalPath();
mockSSTableUploadConfiguration =
mock(SSTableUploadConfiguration.class);
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(3);
@@ -218,5 +225,27 @@ class BaseUploadsHandlerTest
{
return sidecarConfiguration;
}
+
+ @Singleton
+ @Provides
+ public SSTableUploader ssTableUploader(Vertx vertx,
@Named("IngressFileRateLimiter") SidecarRateLimiter rateLimiter)
+ {
+ return new SSTableUploader(vertx, rateLimiter)
+ {
+ @Override
+ protected Future<Void> pipeStreamToFile(ReadStream<Buffer>
readStream, RateLimitedWriteStream file)
+ {
+ if (artificialDelayInMillisBeforeStreamingToFile > 0)
+ {
+ return
vertx.timer(artificialDelayInMillisBeforeStreamingToFile, TimeUnit.MILLISECONDS)
+ .compose(t ->
super.pipeStreamToFile(readStream, file));
+ }
+ else
+ {
+ return super.pipeStreamToFile(readStream, file);
+ }
+ }
+ };
+ }
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
index 53065db4..090bbe32 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
@@ -115,6 +115,18 @@ class SSTableUploadHandlerTest extends
BaseUploadsHandlerTest
false);
}
+ @Test
+ void
testUploadWithDelayBeforePipingStreamToFile_expectSuccessfulUpload(VertxTestContext
context) throws IOException
+ {
+ artificialDelayInMillisBeforeStreamingToFile = 250;
+ UUID uploadId = UUID.randomUUID();
+ sendUploadRequestAndVerify(context, uploadId, "ks", "tbl",
"with-correct-xxhash-Data.db",
+ new XXHash32Digest("b9510d6b", "55555555"),
+ Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+ HttpResponseStatus.OK.code(),
+ false);
+ }
+
@Test
void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context)
throws IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]