This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new c7b170c CASSANDRASC-74: Stream sstable components API fails on
secondary index files
c7b170c is described below
commit c7b170cf2e7764159a3d9cf4ca0abc6db1659e51
Author: Francisco Guerrero <[email protected]>
AuthorDate: Thu Sep 21 14:16:56 2023 -0700
CASSANDRASC-74: Stream sstable components API fails on secondary index files
In this commit, we fix streaming secondary index SSTable component files.
We add
tests to validate that the index files can be streamed. We also add
compatibility
for older clients that don't have the fix.
Patch by Francisco Guerrero; Reviewed by Dinesh Joshi, Yifan Cai for
CASSANDRASC-74
---
CHANGES.txt | 1 +
.../cassandra/sidecar/common/ApiEndpointsV1.java | 5 +
.../sidecar/common/data/SSTableComponent.java | 19 ++-
.../cassandra/sidecar/CassandraSidecarDaemon.java | 2 +-
.../org/apache/cassandra/sidecar/MainModule.java | 5 +
.../data/StreamSSTableComponentRequest.java | 37 ++++-
.../routes/StreamSSTableComponentHandler.java | 58 +++----
.../sidecar/snapshots/SnapshotPathBuilder.java | 34 ++++-
.../sidecar/utils/CassandraInputValidator.java | 2 +-
...reamSSTableComponentHandlerIntegrationTest.java | 170 +++++++++++++++++++++
.../data/StreamSSTableComponentRequestTest.java | 25 ++-
.../routes/StreamSSTableComponentHandlerTest.java | 3 +-
.../snapshots/AbstractSnapshotPathBuilderTest.java | 52 ++++++-
13 files changed, 359 insertions(+), 54 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4dff0fd..255f614 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
1.0.0
-----
+ * Fix unable to stream secondary index files (CASSANDRASC-74)
* Updates token-ranges endpoint to return additional instance metadata
(CASSANDRASC-73)
* Shade Jackson completely to prevent incompatibility issues (CASSANDRASC-75)
* Adds endpoint to serve read/write replica-sets by token-ranges
(CASSANDRASC-60)
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 77ad601..5fe2d5b 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -32,12 +32,15 @@ public final class ApiEndpointsV1
public static final String TABLE_PATH_PARAM = ":table";
public static final String SNAPSHOT_PATH_PARAM = ":snapshot";
public static final String COMPONENT_PATH_PARAM = ":component";
+ public static final String INDEX_PATH_PARAM = ":index";
public static final String UPLOAD_ID_PATH_PARAM = ":uploadId";
public static final String PER_KEYSPACE = "/keyspaces/" +
KEYSPACE_PATH_PARAM;
public static final String PER_TABLE = "/tables/" + TABLE_PATH_PARAM;
public static final String PER_SNAPSHOT = "/snapshots/" +
SNAPSHOT_PATH_PARAM;
public static final String PER_COMPONENT = "/components/" +
COMPONENT_PATH_PARAM;
+ public static final String PER_SECONDARY_INDEX_COMPONENT = "/components/"
+ INDEX_PATH_PARAM
+ + "/" +
COMPONENT_PATH_PARAM;
public static final String PER_UPLOAD = "/uploads/" + UPLOAD_ID_PATH_PARAM;
public static final String HEALTH_ROUTE = API_V1 + HEALTH;
@@ -55,6 +58,8 @@ public final class ApiEndpointsV1
public static final String SNAPSHOTS_ROUTE = API_V1 + PER_KEYSPACE +
PER_TABLE + PER_SNAPSHOT;
// Replaces DEPRECATED_COMPONENTS_ROUTE
public static final String COMPONENTS_ROUTE = SNAPSHOTS_ROUTE +
PER_COMPONENT;
+ public static final String COMPONENTS_WITH_SECONDARY_INDEX_ROUTE_SUPPORT =
SNAPSHOTS_ROUTE
+
+ PER_SECONDARY_INDEX_COMPONENT;
@Deprecated
public static final String DEPRECATED_ALL_KEYSPACES_SCHEMA_ROUTE = API_V1
+ "/schema/keyspaces";
diff --git
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
index 00acd0f..5f28565 100644
---
a/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
+++
b/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.sidecar.common.data;
import java.util.Objects;
+import org.jetbrains.annotations.Nullable;
+
/**
* Represents an SSTable component that includes a keyspace, table name and
component name
*/
@@ -27,16 +29,22 @@ public class SSTableComponent
{
private final QualifiedTableName qualifiedTableName;
private final String componentName;
+ @Nullable
+ private final String secondaryIndexName;
/**
* Constructor for the holder class
*
* @param qualifiedTableName the qualified table name in Cassandra
+ * @param secondaryIndexName the name of the secondary index for the
SSTable component
* @param componentName the name of the SSTable component
*/
- public SSTableComponent(QualifiedTableName qualifiedTableName, String
componentName)
+ public SSTableComponent(QualifiedTableName qualifiedTableName,
+ @Nullable String secondaryIndexName,
+ String componentName)
{
this.qualifiedTableName = Objects.requireNonNull(qualifiedTableName,
"qualifiedTableName must not be null");
+ this.secondaryIndexName = secondaryIndexName;
this.componentName = Objects.requireNonNull(componentName,
"componentName must not be null");
}
@@ -64,6 +72,15 @@ public class SSTableComponent
return qualifiedTableName.tableName();
}
+ /**
+ * @return the secondary index name when the SSTable component is an index
component, {@code null} otherwise
+ */
+ @Nullable
+ public String secondaryIndexName()
+ {
+ return secondaryIndexName;
+ }
+
/**
* @return the name of the SSTable component
*/
diff --git
a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 883675b..9ebf3ac 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -111,7 +111,7 @@ public class CassandraSidecarDaemon
try
{
// Some closing action is executed on the executorPool (which is
closed when closing vertx).
- // Reflecting the dependncy below.
+ // Reflecting the dependency below.
CompositeFuture.all(closingFutures)
.onComplete(v -> vertx.close()).toCompletionStage()
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index f530898..924ac83 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -181,6 +181,11 @@ public class MainModule extends AbstractModule
.handler(streamSSTableComponentHandler)
.handler(fileStreamHandler);
+ // Support for routes that want to stream SStable index components
+
router.get(ApiEndpointsV1.COMPONENTS_WITH_SECONDARY_INDEX_ROUTE_SUPPORT)
+ .handler(streamSSTableComponentHandler)
+ .handler(fileStreamHandler);
+
//noinspection deprecation
router.get(ApiEndpointsV1.DEPRECATED_SNAPSHOTS_ROUTE)
.handler(snapshotsHandler);
diff --git
a/src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java
b/src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java
index 9fa8bbd..637ca55 100644
---
a/src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java
+++
b/src/main/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.sidecar.data;
import java.util.Objects;
+import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
import org.apache.cassandra.sidecar.common.data.SSTableComponent;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
/**
@@ -43,7 +45,23 @@ public class StreamSSTableComponentRequest extends
SSTableComponent
@VisibleForTesting
public StreamSSTableComponentRequest(String keyspace, String tableName,
String snapshotName, String componentName)
{
- this(new QualifiedTableName(keyspace, tableName, true), snapshotName,
componentName);
+ this(new QualifiedTableName(keyspace, tableName, true), snapshotName,
null, componentName);
+ }
+
+ /**
+ * Constructor for the holder class
+ *
+ * @param keyspace the keyspace in Cassandra
+ * @param tableName the table name in Cassandra
+ * @param snapshotName the name of the snapshot
+ * @param secondaryIndexName the name of the secondary index for the
SSTable component
+ * @param componentName the name of the SSTable component
+ */
+ @VisibleForTesting
+ public StreamSSTableComponentRequest(String keyspace, String tableName,
String snapshotName,
+ String secondaryIndexName, String
componentName)
+ {
+ this(new QualifiedTableName(keyspace, tableName, true), snapshotName,
secondaryIndexName, componentName);
}
/**
@@ -51,16 +69,30 @@ public class StreamSSTableComponentRequest extends
SSTableComponent
*
* @param qualifiedTableName the qualified table name in Cassandra
* @param snapshotName the name of the snapshot
+ * @param secondaryIndexName the name of the secondary index for the
SSTable component
* @param componentName the name of the SSTable component
*/
public StreamSSTableComponentRequest(QualifiedTableName qualifiedTableName,
String snapshotName,
+ @Nullable String secondaryIndexName,
String componentName)
{
- super(qualifiedTableName, componentName);
+ super(qualifiedTableName, secondaryIndexName, componentName);
this.snapshotName = Objects.requireNonNull(snapshotName, "snapshotName
must not be null");
}
+ public static StreamSSTableComponentRequest from(QualifiedTableName
qualifiedTableName, RoutingContext context)
+ {
+ String snapshotName = context.pathParam("snapshot");
+ String secondaryIndexName = context.pathParam("index");
+ String componentName = context.pathParam("component");
+
+ return new StreamSSTableComponentRequest(qualifiedTableName,
+ snapshotName,
+ secondaryIndexName,
+ componentName);
+ }
+
/**
* @return the name of the snapshot
*/
@@ -78,6 +110,7 @@ public class StreamSSTableComponentRequest extends
SSTableComponent
"keyspace='" + keyspace() + '\'' +
", tableName='" + tableName() + '\'' +
", snapshot='" + snapshotName + '\'' +
+ ", secondaryIndexName='" + secondaryIndexName() + '\'' +
", componentName='" + componentName() + '\'' +
'}';
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
index 6d3abdf..93dae6e 100644
---
a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
+++
b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
@@ -24,7 +24,6 @@ import java.nio.file.NoSuchFileException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
-import io.vertx.core.Future;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.web.RoutingContext;
@@ -44,7 +43,6 @@ import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpExceptio
public class StreamSSTableComponentHandler extends
AbstractHandler<StreamSSTableComponentRequest>
{
private final SnapshotPathBuilder snapshotPathBuilder;
- private final CassandraInputValidator validator;
@Inject
public StreamSSTableComponentHandler(InstanceMetadataFetcher
metadataFetcher,
@@ -54,7 +52,6 @@ public class StreamSSTableComponentHandler extends
AbstractHandler<StreamSSTable
{
super(metadataFetcher, executorPools, validator);
this.snapshotPathBuilder = snapshotPathBuilder;
- this.validator = validator;
}
@Override
@@ -64,43 +61,32 @@ public class StreamSSTableComponentHandler extends
AbstractHandler<StreamSSTable
SocketAddress remoteAddress,
StreamSSTableComponentRequest request)
{
- validate(request)
- .compose(validParams ->
- snapshotPathBuilder.build(host, validParams)
- .onSuccess(path -> {
-
logger.debug("StreamSSTableComponentHandler handled {} for client {}. "
- + "Instance: {}", path,
remoteAddress, host);
-
context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path)
- .next();
- }))
- .onFailure(cause -> {
- String errMsg =
- "StreamSSTableComponentHandler failed for request: {} from: {}.
Instance: {}";
- logger.error(errMsg, request, remoteAddress, host, cause);
- if (cause instanceof NoSuchFileException)
- {
- context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
- }
- else
- {
- context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"Invalid request for " + request));
- }
- });
+ snapshotPathBuilder.build(host, request)
+ .onSuccess(path -> {
+ logger.debug("StreamSSTableComponentHandler
handled {} for client {}. "
+ + "Instance: {}", path,
remoteAddress, host);
+
context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path)
+ .next();
+ })
+ .onFailure(cause -> {
+ String errMsg =
+ "StreamSSTableComponentHandler failed for
request: {} from: {}. Instance: {}";
+ logger.error(errMsg, request, remoteAddress,
host, cause);
+ if (cause instanceof NoSuchFileException)
+ {
+
context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND,
cause.getMessage()));
+ }
+ else
+ {
+
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+ "Invalid
request for " + request));
+ }
+ });
}
@Override
protected StreamSSTableComponentRequest
extractParamsOrThrow(RoutingContext context)
{
- return new StreamSSTableComponentRequest(qualifiedTableName(context),
- context.pathParam("snapshot"),
- context.pathParam("component")
- );
- }
-
- private Future<StreamSSTableComponentRequest>
validate(StreamSSTableComponentRequest request)
- {
- validator.validateComponentName(request.componentName());
- validator.validateSnapshotName(request.snapshotName());
- return Future.succeededFuture(request);
+ return StreamSSTableComponentRequest.from(qualifiedTableName(context),
context);
}
}
diff --git
a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
index 9f5d9d3..2077f0f 100644
---
a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
+++
b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
@@ -44,11 +44,13 @@ import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.file.FileProps;
import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.utils.Preconditions;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.data.SnapshotRequest;
import org.apache.cassandra.sidecar.data.StreamSSTableComponentRequest;
import org.apache.cassandra.sidecar.utils.BaseFileSystem;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.jetbrains.annotations.Nullable;
/**
* This class builds the snapshot path on a given host validating that it
exists
@@ -94,7 +96,9 @@ public class SnapshotPathBuilder extends BaseFileSystem
return dataDirectories(host)
.compose(dataDirs -> findKeyspaceDirectory(dataDirs,
request.keyspace()))
.compose(keyspaceDirectory ->
findTableDirectory(keyspaceDirectory, request.tableName()))
- .compose(tableDirectory -> findComponent(tableDirectory,
request.snapshotName(),
+ .compose(tableDirectory -> findComponent(tableDirectory,
+ request.snapshotName(),
+
request.secondaryIndexName(),
request.componentName()));
}
@@ -283,6 +287,13 @@ public class SnapshotPathBuilder extends BaseFileSystem
validator.validateTableName(request.tableName());
validator.validateSnapshotName(request.snapshotName());
// Only allow .db and TOC.txt components here
+ String secondaryIndexName = request.secondaryIndexName();
+ if (secondaryIndexName != null)
+ {
+ Preconditions.checkArgument(!secondaryIndexName.isEmpty(),
"secondaryIndexName cannot be empty");
+ Preconditions.checkArgument(secondaryIndexName.charAt(0) == '.',
"Invalid secondary index name");
+ validator.validatePattern(secondaryIndexName.substring(1),
"secondary index");
+ }
validator.validateRestrictedComponentName(request.componentName());
}
@@ -375,16 +386,23 @@ public class SnapshotPathBuilder extends BaseFileSystem
* Constructs the path to the component using the {@code baseDirectory},
{@code snapshotName}, and
* {@code componentName} and returns if it is a valid path to the
component, or a failure otherwise.
*
- * @param baseDirectory the base directory where we search the table
directory
- * @param snapshotName the name of the snapshot
- * @param componentName the name of the component
+ * @param baseDirectory the base directory where we search the table
directory
+ * @param snapshotName the name of the snapshot
+ * @param secondaryIndexName the name of the secondary index (if provided)
+ * @param componentName the name of the component
* @return the path to the component if it's valid, a failure otherwise
*/
- protected Future<String> findComponent(String baseDirectory, String
snapshotName, String componentName)
+ protected Future<String> findComponent(String baseDirectory, String
snapshotName,
+ @Nullable String
secondaryIndexName, String componentName)
{
- String componentFilename = StringUtils.removeEnd(baseDirectory,
File.separator) +
- File.separator + SNAPSHOTS_DIR_NAME +
File.separator + snapshotName +
- File.separator + componentName;
+ StringBuilder sb = new
StringBuilder(StringUtils.removeEnd(baseDirectory, File.separator))
+ .append(File.separator).append(SNAPSHOTS_DIR_NAME)
+ .append(File.separator).append(snapshotName);
+ if (secondaryIndexName != null)
+ {
+ sb.append(File.separator).append(secondaryIndexName);
+ }
+ String componentFilename =
sb.append(File.separator).append(componentName).toString();
return isValidFilename(componentFilename)
.recover(t -> {
diff --git
a/src/main/java/org/apache/cassandra/sidecar/utils/CassandraInputValidator.java
b/src/main/java/org/apache/cassandra/sidecar/utils/CassandraInputValidator.java
index dc4bf8c..838388e 100644
---
a/src/main/java/org/apache/cassandra/sidecar/utils/CassandraInputValidator.java
+++
b/src/main/java/org/apache/cassandra/sidecar/utils/CassandraInputValidator.java
@@ -166,7 +166,7 @@ public class CassandraInputValidator
* @param name a name for the exception
* @throws HttpException when the {@code input} does not match the pattern
*/
- private void validatePattern(String input, String name)
+ public void validatePattern(String input, String name)
{
if
(!input.matches(validationConfiguration.allowedPatternForDirectory()))
throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
diff --git
a/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
b/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
new file mode 100644
index 0000000..a53329e
--- /dev/null
+++
b/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.IntegrationTestBase;
+import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class StreamSSTableComponentHandlerIntegrationTest extends IntegrationTestBase
+{
+ @CassandraIntegrationTest
+ void testStreamIncludingIndexFiles(VertxTestContext context) throws
InterruptedException
+ {
+ runTestScenario(context);
+ }
+
+ private void runTestScenario(VertxTestContext context) throws
InterruptedException
+ {
+ createTestKeyspace();
+ QualifiedTableName table =
createTestTableAndPopulate(sidecarTestContext);
+
+ List<String> expectedFileList =
Arrays.asList(".ryear/[a-z]{2}-1-big-Data.db",
+
".ryear/[a-z]{2}-1-big-TOC.txt",
+ "[a-z]{2}-1-big-Data.db",
+
"[a-z]{2}-1-big-TOC.txt");
+
+ WebClient client = WebClient.create(vertx);
+ String testRoute =
String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot",
+ table.keyspace(), table.tableName());
+
+ createSnapshot(client, testRoute)
+ .compose(route -> listSnapshot(client, route))
+ .onComplete(context.succeeding(response -> {
+
+ List<ListSnapshotFilesResponse.FileInfo> filesToStream =
+ response.snapshotFilesInfo()
+ .stream()
+ .filter(info -> info.fileName.endsWith("-Data.db") ||
info.fileName.endsWith("-TOC.txt"))
+ .sorted(Comparator.comparing(o -> o.fileName))
+ .collect(Collectors.toList());
+
+ assertThat(filesToStream).hasSize(4);
+ for (int i = 0; i < filesToStream.size(); i++)
+ {
+ ListSnapshotFilesResponse.FileInfo fileInfo =
filesToStream.get(i);
+ assertThat(fileInfo.fileName).matches(expectedFileList.get(i));
+ }
+
+ List<Future> futures = new ArrayList<>();
+ // Stream all the files including index files
+ for (ListSnapshotFilesResponse.FileInfo fileInfo : filesToStream)
+ {
+ futures.add(streamSSTableComponent(client, fileInfo));
+ }
+
+ CompositeFuture.all(futures)
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ }));
+
+ // wait until test completes
+ assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
+ }
+
+ Future<String> createSnapshot(WebClient client, String route)
+ {
+ Promise<String> promise = Promise.promise();
+ client.put(server.actualPort(), "127.0.0.1", route)
+ .expect(ResponsePredicate.SC_OK)
+ .send()
+ .onSuccess(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+ promise.complete(route);
+ })
+ .onFailure(promise::fail);
+ return promise.future();
+ }
+
+ Future<ListSnapshotFilesResponse> listSnapshot(WebClient client, String
route)
+ {
+ Promise<ListSnapshotFilesResponse> promise = Promise.promise();
+ client.get(server.actualPort(), "127.0.0.1", route +
"?includeSecondaryIndexFiles=true")
+ .expect(ResponsePredicate.SC_OK)
+ .send()
+ .onSuccess(response -> {
+ assertThat(response.statusCode()).isEqualTo(OK.code());
+
promise.complete(response.bodyAsJson(ListSnapshotFilesResponse.class));
+ })
+ .onFailure(promise::fail);
+ return promise.future();
+ }
+
+ Future<HttpResponse<Buffer>> streamSSTableComponent(WebClient client,
+
ListSnapshotFilesResponse.FileInfo fileInfo)
+ {
+ String route = "/keyspaces/" + fileInfo.keySpaceName +
+ "/tables/" + fileInfo.tableName +
+ "/snapshots/" + fileInfo.snapshotName +
+ "/components/" + fileInfo.fileName;
+
+ return client.get(server.actualPort(), "localhost", "/api/v1" + route)
+ .expect(ResponsePredicate.SC_OK)
+ .as(BodyCodec.buffer())
+ .send();
+ }
+
+ QualifiedTableName createTestTableAndPopulate(CassandraSidecarTestContext
cassandraTestContext)
+ {
+ QualifiedTableName tableName = createTestTable(
+ "CREATE TABLE %s ( \n" +
+ " race_year int, \n" +
+ " race_name text, \n" +
+ " cyclist_name text, \n" +
+ " rank int, \n" +
+ " PRIMARY KEY ((race_year, race_name), rank) \n" +
+ ");");
+ Session session = maybeGetSession();
+
+ session.execute("CREATE INDEX ryear ON " + tableName + "
(race_year);");
+ session.execute("INSERT INTO " + tableName + " (race_year, race_name,
rank, cyclist_name) " +
+ "VALUES (2015, 'Tour of Japan - Stage 4 - Minami >
Shinshu', 1, 'Benjamin PRADES');");
+ session.execute("INSERT INTO " + tableName + " (race_year, race_name,
rank, cyclist_name) " +
+ "VALUES (2015, 'Tour of Japan - Stage 4 - Minami >
Shinshu', 2, 'Adam PHELAN');");
+ session.execute("INSERT INTO " + tableName + " (race_year, race_name,
rank, cyclist_name) " +
+ "VALUES (2015, 'Tour of Japan - Stage 4 - Minami >
Shinshu', 3, 'Thomas LEBAS');");
+ return tableName;
+ }
+}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequestTest.java
b/src/test/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequestTest.java
index 4222535..50da1e1 100644
---
a/src/test/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequestTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/data/StreamSSTableComponentRequestTest.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.sidecar.data;
import org.junit.jupiter.api.Test;
+import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -28,7 +30,8 @@ class StreamSSTableComponentRequestTest
@Test
void failsWhenKeyspaceIsNull()
{
- assertThatThrownBy(() -> new StreamSSTableComponentRequest(null,
"table", "snapshot", "component"))
+ String keyspace = null;
+ assertThatThrownBy(() -> new StreamSSTableComponentRequest(keyspace,
"table", "snapshot", "component"))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("keyspace must not be null");
}
@@ -67,7 +70,25 @@ class StreamSSTableComponentRequestTest
assertThat(req.tableName()).isEqualTo("table");
assertThat(req.snapshotName()).isEqualTo("snapshot");
assertThat(req.componentName()).isEqualTo("data.db");
+ assertThat(req.secondaryIndexName()).isNull();
+
assertThat(req.toString()).isEqualTo("StreamSSTableComponentRequest{keyspace='ks',
tableName='table', " +
+ "snapshot='snapshot',
secondaryIndexName='null', " +
+ "componentName='data.db'}");
+ }
+
+ @Test
+ void testValidRequestWithIndexName()
+ {
+ StreamSSTableComponentRequest req =
+ new StreamSSTableComponentRequest(new QualifiedTableName("ks",
"table"), "snapshot", ".index", "data.db");
+
+ assertThat(req.keyspace()).isEqualTo("ks");
+ assertThat(req.tableName()).isEqualTo("table");
+ assertThat(req.snapshotName()).isEqualTo("snapshot");
+ assertThat(req.secondaryIndexName()).isEqualTo(".index");
+ assertThat(req.componentName()).isEqualTo("data.db");
assertThat(req.toString()).isEqualTo("StreamSSTableComponentRequest{keyspace='ks',
tableName='table', " +
- "snapshot='snapshot',
componentName='data.db'}");
+ "snapshot='snapshot',
secondaryIndexName='.index', " +
+ "componentName='data.db'}");
}
}
diff --git
a/src/test/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerTest.java
b/src/test/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerTest.java
index b769280..0c41b40 100644
---
a/src/test/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerTest.java
+++
b/src/test/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerTest.java
@@ -246,7 +246,8 @@ public class StreamSSTableComponentHandlerTest
}
@ParameterizedTest
- @ValueSource(strings = { "i_❤_u.db", "this-is-not-allowed.jar",
"../../../etc/passwd.db" })
+ @ValueSource(strings = { "i_❤_u.db", "this-is-not-allowed.jar",
"../../../etc/passwd.db",
+ "../not-an-index-file-Data.db" })
void failsWhenComponentNameContainsInvalidCharacters(String
invalidComponentName)
{
VertxTestContext context = new VertxTestContext();
diff --git
a/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
b/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
index e2dbd9b..a6727fc 100644
---
a/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
+++
b/src/testFixtures/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
@@ -44,7 +44,9 @@ import
org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
import org.apache.cassandra.sidecar.data.SnapshotRequest;
import org.apache.cassandra.sidecar.data.StreamSSTableComponentRequest;
import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.from;
@@ -150,8 +152,7 @@ public abstract class AbstractSnapshotPathBuilderTest
@ParameterizedTest
@ValueSource(strings = { "i_❤_u.db", "this-is-not-allowed.jar",
"cql-is-not-allowed-here.cql",
- "json-is-not-allowed-here.json",
"crc32-is-not-allowed-here.crc32",
- "../../../etc/passwd.db" })
+ "json-is-not-allowed-here.json",
"crc32-is-not-allowed-here.crc32" })
void failsWhenComponentNameContainsInvalidCharacters(String
invalidComponentName)
{
assertThatThrownBy(() -> instance.build("localhost",
@@ -166,6 +167,53 @@ public abstract class AbstractSnapshotPathBuilderTest
.getPayload()));
}
+ @ParameterizedTest
+ @ValueSource(strings = { "[email protected]", ".s./../../etc/passwd.db",
"../../../bad-Data.db" })
+ void failsWhenIndexComponentNameContainsInvalidCharacters(String
invalidComponentName)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new
StreamSSTableComponentRequest("ks",
+
"table",
+
"snapshot",
+
".index",
+
invalidComponentName)))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t ->
((HttpException) t).getStatusCode()))
+ .returns("Invalid component name: " + invalidComponentName, from(t ->
((HttpException) t)
+
.getPayload()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { "", "does_not_start_with_dot" })
+ void failsWhenIndexNameIsInvalid(String invalidIndexName)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new
StreamSSTableComponentRequest("ks",
+
"table",
+
"snapshot",
+
invalidIndexName,
+
"component.db")))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = { ".", "../bad-Data.db", ".f@o/bad-Data.db",
".bl@h/bad-TOC.txt" })
+ void failsWhenIndexNameContainsInvalidCharacters(String invalidIndexName)
+ {
+ assertThatThrownBy(() -> instance.build("localhost",
+ new
StreamSSTableComponentRequest("ks",
+
"table",
+
"snapshot",
+
invalidIndexName,
+
"component.db")))
+ .isInstanceOf(HttpException.class)
+ .hasMessageContaining("Bad Request")
+ .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t ->
((HttpException) t).getStatusCode()))
+ .extracting(from(t -> ((HttpException) t).getPayload()),
as(InstanceOfAssertFactories.STRING))
+ .contains("Invalid characters in secondary index: ");
+ }
+
@Test
void failsWhenDataDirsAreEmpty()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]