This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41f6e512 CASSSIDECAR-386: File descriptor leak after file streamed in 
Sidecar Client (#291)
41f6e512 is described below

commit 41f6e51299859419bfb4cc7d2c37ee27587c42d0
Author: Shruti Sekaran <[email protected]>
AuthorDate: Tue Dec 2 13:33:45 2025 -0800

    CASSSIDECAR-386: File descriptor leak after file streamed in Sidecar Client 
(#291)
    
    Patch by Shruti Sekaran; reviewed by Francisco Guerrero, Yifan Cai for 
CASSSIDECAR-386
---
 .../cassandra/sidecar/client/VertxHttpClient.java  |  33 +++-
 .../sidecar/client/VertxHttpClientTest.java        | 199 +++++++++++++++++++++
 2 files changed, 227 insertions(+), 5 deletions(-)

diff --git 
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
 
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
index f5c3d591..d38a93d8 100644
--- 
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
+++ 
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
@@ -60,6 +60,7 @@ import 
org.apache.cassandra.sidecar.common.request.DownloadableRequest;
 import org.apache.cassandra.sidecar.common.request.Request;
 import org.apache.cassandra.sidecar.common.request.UploadableRequest;
 
+import static java.lang.String.valueOf;
 import static 
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
 import static 
org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty;
 
@@ -218,11 +219,7 @@ public class VertxHttpClient implements HttpClient
         Promise<HttpResponse> promise = Promise.promise();
         // open the local file
         openFileForRead(vertx.fileSystem(), filename)
-        .compose(pair -> vertxRequest.ssl(config.ssl())
-                                     
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
-                                                String.valueOf(pair.getKey()))
-                                     .sendStream(pair.getValue()
-                                                     
.setReadBufferSize(config.sendReadBufferSize())))
+        .compose(pair -> sendFileStream(vertxRequest, pair, filename))
         .onFailure(promise::fail)
         .onSuccess(response -> {
             byte[] raw = response.body() != null ? response.body().getBytes() 
: null;
@@ -237,6 +234,32 @@ public class VertxHttpClient implements HttpClient
         return promise.future().toCompletionStage().toCompletableFuture();
     }
 
+    /**
+     * Sends the file stream via HTTP request.
+     *
+     * @param vertxRequest the HTTP request to send the file stream with
+     * @param pair a pair containing file size and the AsyncFile handle
+     * @param filename the name of the file being uploaded (for logging 
purposes)
+     * @return a Future that completes when the file has been sent
+     */
+    protected Future<io.vertx.ext.web.client.HttpResponse<Buffer>> 
sendFileStream(
+        HttpRequest<Buffer> vertxRequest,
+        AbstractMap.SimpleEntry<Long, AsyncFile> pair,
+        String filename)
+    {
+        AsyncFile asyncFile = pair.getValue();
+        return vertxRequest.ssl(config.ssl())
+                           
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
+                                      valueOf(pair.getKey()))
+                           .sendStream(pair.getValue()
+                                           
.setReadBufferSize(config.sendReadBufferSize()))
+                           .onComplete(ar -> {
+                               asyncFile.close().onFailure(err ->
+                                   LOGGER.warn("Failed to close file after 
upload: filename='{}'", filename, err)
+                               );
+                           });
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git 
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
 
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
new file mode 100644
index 00000000..cf0e1ad0
--- /dev/null
+++ 
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
+
+import static java.nio.file.Files.copy;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link VertxHttpClient}
+ */
+class VertxHttpClientTest
+{
+    private Vertx vertx;
+    private MockWebServer mockServer;
+    private HttpClientConfig config;
+    private SidecarInstanceImpl sidecarInstance;
+
+    @BeforeEach
+    void setup() throws IOException
+    {
+        vertx = Vertx.vertx();
+        mockServer = new MockWebServer();
+        mockServer.start();
+
+        config = new HttpClientConfig.Builder<>()
+                 .ssl(false)
+                 .timeoutMillis(30000)
+                 .build();
+        sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException
+    {
+        if (mockServer != null)
+        {
+            mockServer.shutdown();
+        }
+        if (vertx != null)
+        {
+            vertx.close();
+        }
+    }
+
+    @Test
+    void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws 
Exception
+    {
+        runTestScenario(tempDirectory,
+                        new MockResponse().setResponseCode(OK.code()),
+                        new ExposeAsyncFileVertxHttpClient(vertx, config));
+    }
+
+    @Test
+    void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory) 
throws Exception
+    {
+        runTestScenario(tempDirectory,
+                        new 
MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()),
+                        new ExposeAsyncFileVertxHttpClient(vertx, config));
+    }
+
+    @Test
+    void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws 
Exception
+    {
+        mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+        mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+        mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+        Path fileToUpload = prepareFile(tempDirectory);
+
+        ExposeAsyncFileVertxHttpClient httpClient = new 
ExposeAsyncFileVertxHttpClient(vertx, config);
+
+        // Upload the same file 3 times (simulating multiple file uploads)
+        for (int i = 0; i < 3; i++)
+        {
+            HttpRequest<Buffer> vertxRequest = 
httpClient.webClient.put(mockServer.getPort(),
+                                                                        
mockServer.getHostName(),
+                                                                        
"/upload/test" + i);
+            httpClient.executeUploadFileInternal(sidecarInstance, 
vertxRequest, fileToUpload.toString())
+                      .get(30, TimeUnit.SECONDS);
+        }
+
+        assertThat(mockServer.getRequestCount()).isEqualTo(3);
+        assertThat(httpClient.capturedFiles).hasSize(3);
+
+        // Give async file close operations time to complete
+        Thread.sleep(100);
+
+        // Verify all the files are closed by attempting to call .end() which 
should throw IllegalStateException
+        for (AsyncFile file : httpClient.capturedFiles)
+        {
+            assertThatThrownBy(file::end)
+            .isInstanceOf(IllegalStateException.class)
+            .hasMessageContaining("File handle is closed");
+        }
+    }
+
+    private void runTestScenario(Path tempDirectory,
+                                 MockResponse mockResponse,
+                                 ExposeAsyncFileVertxHttpClient httpClient) 
throws Exception
+    {
+        mockServer.enqueue(mockResponse);
+
+        Path fileToUpload = prepareFile(tempDirectory);
+        HttpRequest<Buffer> vertxRequest = 
httpClient.webClient.put(mockServer.getPort(),
+                                                                    
mockServer.getHostName(),
+                                                                    
"/upload/test");
+
+        httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest, 
fileToUpload.toString())
+                  .get(30, TimeUnit.SECONDS);
+
+        assertThat(mockServer.getRequestCount()).isEqualTo(1);
+
+        // Give async file close operation time to complete
+        Thread.sleep(100);
+
+        // Verify file is closed by attempting to call .end() which should 
throw IllegalStateException
+        assertThat(httpClient.capturedFiles).hasSize(1);
+        assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end())
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessageContaining("File handle is closed");
+    }
+
+    /**
+     * Class that extends from {@link VertxHttpClient} for testing purposes 
and holds a reference to the
+     * {@link AsyncFile} to ensure that the file has been closed.
+     */
+    static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient
+    {
+        List<AsyncFile> capturedFiles = new ArrayList<>();
+
+        ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config)
+        {
+            super(vertx, config);
+        }
+
+        @Override
+        protected Future<HttpResponse<Buffer>> 
sendFileStream(HttpRequest<Buffer> vertxRequest,
+                                                              
SimpleEntry<Long, AsyncFile> pair,
+                                                              String filename)
+        {
+            capturedFiles.add(pair.getValue());
+            return super.sendFileStream(vertxRequest, pair, filename);
+        }
+    }
+
+    private Path prepareFile(Path tempDirectory) throws IOException
+    {
+        Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt");
+        try (InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt"))
+        {
+            assertThat(inputStream).isNotNull();
+            copy(inputStream, fileToUpload, 
StandardCopyOption.REPLACE_EXISTING);
+        }
+        return fileToUpload;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to