This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2cd024c0 CASSSIDECAR-359: Avoid resuming stream early during SSTable 
upload (#271)
2cd024c0 is described below

commit 2cd024c0727c18671ed7c3cfcce6b0a78207dd4b
Author: Francisco Guerrero <[email protected]>
AuthorDate: Fri Oct 24 12:59:52 2025 -0700

    CASSSIDECAR-359: Avoid resuming stream early during SSTable upload (#271)
    
    Patch by Francisco Guerrero; reviewed by Yifan Cai for CASSSIDECAR-359
---
 CHANGES.txt                                        |  1 +
 .../cassandra/sidecar/utils/SSTableUploader.java   | 12 ++++++---
 .../sstableuploads/BaseUploadsHandlerTest.java     | 29 ++++++++++++++++++++++
 .../sstableuploads/SSTableUploadHandlerTest.java   | 12 +++++++++
 4 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 14e7fcb0..4decb80f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Avoid resuming stream early during SSTable upload (CASSSIDECAR-359)
  * Add cache for Authorization layer (CASSSIDECAR-357)
  * Avoid creating objects in the CassandraAdapter implementation 
(CASSSIDECAR-355)
  * Code refactoring for some configuration classes and migrate 
RoleBasedAuthorizationIntegrationTest to integration-tests (CASSSIDECAR-351)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
index 8adfd5ce..954e6076 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableUploader.java
@@ -40,6 +40,7 @@ import io.vertx.core.file.OpenOptions;
 import io.vertx.core.streams.ReadStream;
 import io.vertx.core.streams.WriteStream;
 import org.apache.cassandra.sidecar.common.server.utils.ThrowableUtils;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * A class that handles SSTable Uploads
@@ -107,10 +108,13 @@ public class SSTableUploader
         LOGGER.debug("Uploading data to={}", tempFilename);
         return fs.open(tempFilename, new OpenOptions()) // open the temp file
                  .map(file -> new RateLimitedWriteStream(rateLimiter, file))
-                 .compose(file -> {
-                     readStream.resume();
-                     return readStream.pipeTo(file);
-                 }); // stream to file
+                 .compose(file -> pipeStreamToFile(readStream, file)); // 
stream to file
+    }
+
+    @VisibleForTesting
+    protected Future<Void> pipeStreamToFile(ReadStream<Buffer> readStream, 
RateLimitedWriteStream file)
+    {
+        return readStream.pipeTo(file);
     }
 
     private Future<String> createTempFile(String uploadDirectory, String 
componentFileName, String permissions)
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
index 9a3231ba..64b4ebc0 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/BaseUploadsHandlerTest.java
@@ -44,9 +44,13 @@ import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
+import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 import com.google.inject.util.Modules;
+import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.sidecar.TestCassandraAdapterDelegate;
@@ -64,6 +68,7 @@ import 
org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
 import org.apache.cassandra.sidecar.modules.SidecarModules;
 import org.apache.cassandra.sidecar.server.Server;
 import org.apache.cassandra.sidecar.snapshots.SnapshotUtils;
+import org.apache.cassandra.sidecar.utils.SSTableUploader;
 
 import static 
org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_CHECK_INTERVAL;
 import static 
org.apache.cassandra.sidecar.config.yaml.TrafficShapingConfigurationImpl.DEFAULT_INBOUND_FILE_GLOBAL_BANDWIDTH_LIMIT;
@@ -94,11 +99,13 @@ class BaseUploadsHandlerTest
     protected TrafficShapingConfiguration trafficShapingConfiguration;
     protected SidecarRateLimiter ingressFileRateLimiter;
     protected CassandraTableOperations mockCFOperations;
+    protected long artificialDelayInMillisBeforeStreamingToFile;
 
 
     @BeforeEach
     void setup() throws InterruptedException, IOException
     {
+        artificialDelayInMillisBeforeStreamingToFile = 0;
         canonicalTemporaryPath = temporaryPath.toFile().getCanonicalPath();
         mockSSTableUploadConfiguration = 
mock(SSTableUploadConfiguration.class);
         
when(mockSSTableUploadConfiguration.concurrentUploadsLimit()).thenReturn(3);
@@ -218,5 +225,27 @@ class BaseUploadsHandlerTest
         {
             return sidecarConfiguration;
         }
+
+        @Singleton
+        @Provides
+        public SSTableUploader ssTableUploader(Vertx vertx, 
@Named("IngressFileRateLimiter") SidecarRateLimiter rateLimiter)
+        {
+            return new SSTableUploader(vertx, rateLimiter)
+            {
+                @Override
+                protected Future<Void> pipeStreamToFile(ReadStream<Buffer> 
readStream, RateLimitedWriteStream file)
+                {
+                    if (artificialDelayInMillisBeforeStreamingToFile > 0)
+                    {
+                        return 
vertx.timer(artificialDelayInMillisBeforeStreamingToFile, TimeUnit.MILLISECONDS)
+                                    .compose(t -> 
super.pipeStreamToFile(readStream, file));
+                    }
+                    else
+                    {
+                        return super.pipeStreamToFile(readStream, file);
+                    }
+                }
+            };
+        }
     }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
index 53065db4..090bbe32 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandlerTest.java
@@ -115,6 +115,18 @@ class SSTableUploadHandlerTest extends 
BaseUploadsHandlerTest
                                    false);
     }
 
+    @Test
+    void 
testUploadWithDelayBeforePipingStreamToFile_expectSuccessfulUpload(VertxTestContext
 context) throws IOException
+    {
+        artificialDelayInMillisBeforeStreamingToFile = 250;
+        UUID uploadId = UUID.randomUUID();
+        sendUploadRequestAndVerify(context, uploadId, "ks", "tbl", 
"with-correct-xxhash-Data.db",
+                                   new XXHash32Digest("b9510d6b", "55555555"),
+                                   Files.size(Paths.get(FILE_TO_BE_UPLOADED)),
+                                   HttpResponseStatus.OK.code(),
+                                   false);
+    }
+
     @Test
     void testUploadWithIncorrectMd5_expectErrorCode(VertxTestContext context) 
throws IOException
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to