This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41f6e512 CASSSIDECAR-386: File descriptor leak after file streamed in
Sidecar Client (#291)
41f6e512 is described below
commit 41f6e51299859419bfb4cc7d2c37ee27587c42d0
Author: Shruti Sekaran <[email protected]>
AuthorDate: Tue Dec 2 13:33:45 2025 -0800
CASSSIDECAR-386: File descriptor leak after file streamed in Sidecar Client
(#291)
Patch by Shruti Sekaran; reviewed by Francisco Guerrero, Yifan Cai for
CASSSIDECAR-386
---
.../cassandra/sidecar/client/VertxHttpClient.java | 33 +++-
.../sidecar/client/VertxHttpClientTest.java | 199 +++++++++++++++++++++
2 files changed, 227 insertions(+), 5 deletions(-)
diff --git
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
index f5c3d591..d38a93d8 100644
---
a/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
+++
b/vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java
@@ -60,6 +60,7 @@ import
org.apache.cassandra.sidecar.common.request.DownloadableRequest;
import org.apache.cassandra.sidecar.common.request.Request;
import org.apache.cassandra.sidecar.common.request.UploadableRequest;
+import static java.lang.String.valueOf;
import static
org.apache.cassandra.sidecar.common.http.SidecarHttpHeaderNames.AUTH_ROLE;
import static
org.apache.cassandra.sidecar.common.utils.StringUtils.isNullOrEmpty;
@@ -218,11 +219,7 @@ public class VertxHttpClient implements HttpClient
Promise<HttpResponse> promise = Promise.promise();
// open the local file
openFileForRead(vertx.fileSystem(), filename)
- .compose(pair -> vertxRequest.ssl(config.ssl())
-
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
- String.valueOf(pair.getKey()))
- .sendStream(pair.getValue()
-
.setReadBufferSize(config.sendReadBufferSize())))
+ .compose(pair -> sendFileStream(vertxRequest, pair, filename))
.onFailure(promise::fail)
.onSuccess(response -> {
byte[] raw = response.body() != null ? response.body().getBytes()
: null;
@@ -237,6 +234,32 @@ public class VertxHttpClient implements HttpClient
return promise.future().toCompletionStage().toCompletableFuture();
}
+ /**
+ * Sends the file stream via HTTP request.
+ *
+ * @param vertxRequest the HTTP request to send the file stream with
+ * @param pair a pair containing file size and the AsyncFile handle
+ * @param filename the name of the file being uploaded (for logging
purposes)
+ * @return a Future that completes when the file has been sent
+ */
+ protected Future<io.vertx.ext.web.client.HttpResponse<Buffer>>
sendFileStream(
+ HttpRequest<Buffer> vertxRequest,
+ AbstractMap.SimpleEntry<Long, AsyncFile> pair,
+ String filename)
+ {
+ AsyncFile asyncFile = pair.getValue();
+ return vertxRequest.ssl(config.ssl())
+
.putHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
+ valueOf(pair.getKey()))
+ .sendStream(pair.getValue()
+
.setReadBufferSize(config.sendReadBufferSize()))
+ .onComplete(ar -> {
+ asyncFile.close().onFailure(err ->
+ LOGGER.warn("Failed to close file after
upload: filename='{}'", filename, err)
+ );
+ });
+ }
+
/**
* {@inheritDoc}
*/
diff --git
a/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
new file mode 100644
index 00000000..cf0e1ad0
--- /dev/null
+++
b/vertx-client/src/test/java/org/apache/cassandra/sidecar/client/VertxHttpClientTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.AsyncFile;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import org.apache.cassandra.sidecar.client.request.RequestExecutorTest;
+
+import static java.nio.file.Files.copy;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Unit tests for {@link VertxHttpClient}
+ */
+class VertxHttpClientTest
+{
+ private Vertx vertx;
+ private MockWebServer mockServer;
+ private HttpClientConfig config;
+ private SidecarInstanceImpl sidecarInstance;
+
+ @BeforeEach
+ void setup() throws IOException
+ {
+ vertx = Vertx.vertx();
+ mockServer = new MockWebServer();
+ mockServer.start();
+
+ config = new HttpClientConfig.Builder<>()
+ .ssl(false)
+ .timeoutMillis(30000)
+ .build();
+ sidecarInstance = RequestExecutorTest.newSidecarInstance(mockServer);
+ }
+
+ @AfterEach
+ void tearDown() throws IOException
+ {
+ if (mockServer != null)
+ {
+ mockServer.shutdown();
+ }
+ if (vertx != null)
+ {
+ vertx.close();
+ }
+ }
+
+ @Test
+ void testUploadSSTableClosesFile(@TempDir Path tempDirectory) throws
Exception
+ {
+ runTestScenario(tempDirectory,
+ new MockResponse().setResponseCode(OK.code()),
+ new ExposeAsyncFileVertxHttpClient(vertx, config));
+ }
+
+ @Test
+ void testUploadClosesFileOnUploadFailure(@TempDir Path tempDirectory)
throws Exception
+ {
+ runTestScenario(tempDirectory,
+ new
MockResponse().setResponseCode(INTERNAL_SERVER_ERROR.code()),
+ new ExposeAsyncFileVertxHttpClient(vertx, config));
+ }
+
+ @Test
+ void testMultipleUploadClosesAllFiles(@TempDir Path tempDirectory) throws
Exception
+ {
+ mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+ mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+ mockServer.enqueue(new MockResponse().setResponseCode(OK.code()));
+
+ Path fileToUpload = prepareFile(tempDirectory);
+
+ ExposeAsyncFileVertxHttpClient httpClient = new
ExposeAsyncFileVertxHttpClient(vertx, config);
+
+ // Upload the same file 3 times (simulating multiple file uploads)
+ for (int i = 0; i < 3; i++)
+ {
+ HttpRequest<Buffer> vertxRequest =
httpClient.webClient.put(mockServer.getPort(),
+
mockServer.getHostName(),
+
"/upload/test" + i);
+ httpClient.executeUploadFileInternal(sidecarInstance,
vertxRequest, fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+ }
+
+ assertThat(mockServer.getRequestCount()).isEqualTo(3);
+ assertThat(httpClient.capturedFiles).hasSize(3);
+
+ // Give async file close operations time to complete
+ Thread.sleep(100);
+
+ // Verify all the files are closed by attempting to call .end() which
should throw IllegalStateException
+ for (AsyncFile file : httpClient.capturedFiles)
+ {
+ assertThatThrownBy(file::end)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("File handle is closed");
+ }
+ }
+
+ private void runTestScenario(Path tempDirectory,
+ MockResponse mockResponse,
+ ExposeAsyncFileVertxHttpClient httpClient)
throws Exception
+ {
+ mockServer.enqueue(mockResponse);
+
+ Path fileToUpload = prepareFile(tempDirectory);
+ HttpRequest<Buffer> vertxRequest =
httpClient.webClient.put(mockServer.getPort(),
+
mockServer.getHostName(),
+
"/upload/test");
+
+ httpClient.executeUploadFileInternal(sidecarInstance, vertxRequest,
fileToUpload.toString())
+ .get(30, TimeUnit.SECONDS);
+
+ assertThat(mockServer.getRequestCount()).isEqualTo(1);
+
+ // Give async file close operation time to complete
+ Thread.sleep(100);
+
+ // Verify file is closed by attempting to call .end() which should
throw IllegalStateException
+ assertThat(httpClient.capturedFiles).hasSize(1);
+ assertThatThrownBy(() -> httpClient.capturedFiles.get(0).end())
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("File handle is closed");
+ }
+
+ /**
+ * Class that extends from {@link VertxHttpClient} for testing purposes
and holds a reference to the
+ * {@link AsyncFile} to ensure that the file has been closed.
+ */
+ static class ExposeAsyncFileVertxHttpClient extends VertxHttpClient
+ {
+ List<AsyncFile> capturedFiles = new ArrayList<>();
+
+ ExposeAsyncFileVertxHttpClient(Vertx vertx, HttpClientConfig config)
+ {
+ super(vertx, config);
+ }
+
+ @Override
+ protected Future<HttpResponse<Buffer>>
sendFileStream(HttpRequest<Buffer> vertxRequest,
+
SimpleEntry<Long, AsyncFile> pair,
+ String filename)
+ {
+ capturedFiles.add(pair.getValue());
+ return super.sendFileStream(vertxRequest, pair, filename);
+ }
+ }
+
+ private Path prepareFile(Path tempDirectory) throws IOException
+ {
+ Path fileToUpload = tempDirectory.resolve("nb-1-big-TOC.txt");
+ try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("sstables/nb-1-big-TOC.txt"))
+ {
+ assertThat(inputStream).isNotNull();
+ copy(inputStream, fileToUpload,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ return fileToUpload;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]