yifan-c commented on code in PR #340:
URL: https://github.com/apache/cassandra-sidecar/pull/340#discussion_r3172176371
##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java:
##########
@@ -76,7 +76,7 @@ void testNodeDrainOperationSuccess()
trustedClient().put(serverWrapper.serverPort, "localhost",
ApiEndpointsV1.NODE_DRAIN_ROUTE)
.send());
- assertThat(drainResponse.statusCode()).isEqualTo(OK.code());
+ assertThat(drainResponse.statusCode()).isIn(OK.code(),
ACCEPTED.code());
Review Comment:
Why is it necessary to change the assertion?
##########
server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+ private static final String STATEMENT_DELIMITER = "\n\n";
+
+ private final SidecarConfiguration sidecarConfiguration;
+ private final CQLSessionProvider sessionProvider;
+ private final CQLSchemaAccessor schemaAccessor;
+
+ // cache contains only schemas unparseable by Java driver
+ // during every cache refresh, reference is being reassigned to achieve
atomic swap
+ private volatile SortedMap<QualifiedTableName, String> schemaCache;
+ private volatile boolean initialized; // flag indicating whether cache has
been populated at least once
+
+ private PreparedStatement tableListStatement;
+
+ public DriverUnsupportedSchemaCache(SidecarConfiguration
sidecarConfiguration,
+ CQLSessionProvider sessionProvider)
+ {
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.sessionProvider = sessionProvider;
+ this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+ this.schemaCache = createCache();
+ this.initialized = false;
+ }
+
+ /**
+ * @return Schema for all tables across all keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getFullSchema()
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table -> true);
+ }
+
+ /**
+ * @return Schema for all tables within given keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getKeyspaceSchema(@NotNull Name keyspace)
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table ->
keyspace.equals(table.getKeyspace()));
+ }
+
+ /**
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+ {
+ return getTableSchema(keyspace, table, true);
+ }
+
+ /**
+ * @param allowRefresh Flag indicating whether lookup of table schema via
CQL query
+ * is allowed, when data not found in cache.
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table,
boolean allowRefresh)
+ {
+ refreshIfUninitialized();
Review Comment:
Does it mean that the `allowRefresh: false` might not be honored if
uninitialized?
##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java:
##########
@@ -209,13 +214,17 @@ private Future<SSTableUploadRequestParam>
validateKeyspaceAndTable(String host,
logger.error(message);
return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
}
-
if (MetadataUtils.table(keyspaceMetadata,
request.table()) == null)
{
- String message = String.format("Invalid table name
'%s' supplied for keyspace '%s'",
- request.table(),
request.keyspace());
- logger.error(message);
- return
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
+ // check for tables that are not parseable by Java
driver's metadata feature
+ boolean tableExists =
driverUnsupportedSchemaCache.getTableSchema(request.keyspace(),
request.table(), false) != null;
Review Comment:
Same. `getTableSchema` may block unconditionally, if not initialized.
##########
server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+ private static final String STATEMENT_DELIMITER = "\n\n";
+
+ private final SidecarConfiguration sidecarConfiguration;
+ private final CQLSessionProvider sessionProvider;
+ private final CQLSchemaAccessor schemaAccessor;
+
+ // cache contains only schemas unparseable by Java driver
+ // during every cache refresh, reference is being reassigned to achieve
atomic swap
+ private volatile SortedMap<QualifiedTableName, String> schemaCache;
+ private volatile boolean initialized; // flag indicating whether cache has
been populated at least once
+
+ private PreparedStatement tableListStatement;
+
+ public DriverUnsupportedSchemaCache(SidecarConfiguration
sidecarConfiguration,
+ CQLSessionProvider sessionProvider)
+ {
+ this.sidecarConfiguration = sidecarConfiguration;
+ this.sessionProvider = sessionProvider;
+ this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+ this.schemaCache = createCache();
+ this.initialized = false;
+ }
+
+ /**
+ * @return Schema for all tables across all keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getFullSchema()
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table -> true);
+ }
+
+ /**
+ * @return Schema for all tables within given keyspaces (only those not
supported by Java driver).
+ */
+ @NotNull
+ public String getKeyspaceSchema(@NotNull Name keyspace)
+ {
+ // we cannot know whether schema was updated after last
+ // periodic refresh, so results may be stale
+ refreshIfUninitialized();
+ return getUnsupportedSchema(table ->
keyspace.equals(table.getKeyspace()));
+ }
+
+ /**
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+ {
+ return getTableSchema(keyspace, table, true);
+ }
+
+ /**
+ * @param allowRefresh Flag indicating whether lookup of table schema via
CQL query
+ * is allowed, when data not found in cache.
+ * @return Schema for table not supported by Java driver's metadata,
{@code null} otherwise.
+ */
+ @Nullable
+ public String getTableSchema(@NotNull Name keyspace, @NotNull Name table,
boolean allowRefresh)
+ {
+ refreshIfUninitialized();
+ QualifiedTableName name = new QualifiedTableName(keyspace, table);
+ String schema = schemaCache.get(name);
+ if (schema == null && allowRefresh)
+ {
+ // proactively fetch table schema, not to provide
+ // false-negative when table is not cached yet
+ // attention, method can block
+ schema = populateSchemaCache(schemaCache, name);
+ }
+ return schema;
+ }
+
+ @Override
+ public DurationSpec delay()
+ {
+ return
sidecarConfiguration.driverConfiguration().unsupportedTableSchemaRefreshTime();
+ }
+
+ @Override
+ public void execute(Promise<Void> promise)
+ {
+ try
+ {
+ refresh(false);
+ promise.tryComplete();
+ }
+ catch (Throwable t)
+ {
+ promise.fail(t);
+ }
+ }
+
+ private String getUnsupportedSchema(Predicate<QualifiedTableName>
condition)
+ {
+ StringBuilder result = new StringBuilder();
+ for (Map.Entry<QualifiedTableName, String> entry :
schemaCache.entrySet())
+ {
+ if (condition.test(entry.getKey()))
+ {
+ result.append(entry.getValue()).append(STATEMENT_DELIMITER);
+ }
+ }
+ return result.toString().trim();
+ }
+
+ private void refreshIfUninitialized()
+ {
+ if (!initialized)
+ {
+ refresh(true);
+ }
+ }
+
+ public synchronized void refresh(boolean initializeOnly)
+ {
+ if (initialized && initializeOnly)
+ {
+ // cache has been already initialized, early exit
+ return;
+ }
+ try
+ {
+ Session session = sessionProvider.get();
+ prepareStatements(session);
+
+ Set<QualifiedTableName> tables = queryAllTables(session);
+ Set<QualifiedTableName> driverKnownTables =
driverKnownTables(session);
+
+ tables.removeAll(driverKnownTables);
+
+ SortedMap<QualifiedTableName, String> newCache = createCache();
+ if (!tables.isEmpty())
+ {
+ LOGGER.debug("Tables not supported by Java driver metadata:
{}", tables);
+ tables.forEach(table -> populateSchemaCache(newCache, table));
+ // replacing cache, because some tables might have been
removed in the meanwhile
+ }
+ schemaCache = newCache;
+
+ initialized = true;
+ }
+ catch (CassandraUnavailableException ignored)
+ {
+ LOGGER.debug("Not yet connected to Cassandra cluster");
+ }
+ }
+
+ private String populateSchemaCache(Map<QualifiedTableName, String> cache,
QualifiedTableName table)
+ {
+ List<String> cqlSchema =
schemaAccessor.getTableSchema(table.getKeyspace(), table.table());
Review Comment:
getKeyspace and table can return null value, but the `getTableSchema`
method expects `@NotNull` values.
##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java:
##########
@@ -130,6 +144,27 @@ private Future<Metadata> metadata(String host)
});
}
+ /**
+ * Get CQL schema not parseable by Java driver (and therefore not present
in {@link Metadata}).
+ *
+ * @param keyspace optional keyspace name, {@code null} includes schema
for all keyspaces
+ * @return {@link Future} containing CQL schema
+ */
+ private Future<String> unsupportedSchema(Name keyspace)
+ {
+ return executorPools.service().executeBlocking(() -> {
+ // schema cache can block if it was not successfully initialized
at least once
Review Comment:
👍
##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java:
##########
@@ -97,7 +103,14 @@ protected void handleInternal(RoutingContext context,
}
TableMetadata tableMetadata = keyspaceMetadata.getTable(table);
- if (tableMetadata == null)
+ boolean tableExists = tableMetadata != null;
+ if (!tableExists)
+ {
+ tableExists = driverUnsupportedSchemaCache.getTableSchema(new
Name(keyspaceMetadata.getName()),
+ new
Name(table),
+
false) != null;
Review Comment:
If table is not initialized yet, it still may block, even through `false` is
passed.
##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableVectorUploadHandlerIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableImportHandler;
+import
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableUploadHandler;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.IClusterExtension;
+import org.apache.cassandra.testing.utils.AssertionUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for {@link SSTableUploadHandler}, {@link
SSTableImportHandler} and vector type
+ * TODO: Consider removing after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@ExtendWith(VertxExtension.class)
+public class SSTableVectorUploadHandlerIntegrationTest extends
IntegrationTestBase
+{
+ public static final SimpleCassandraVersion MIN_VERSION_WITH_VECTOR =
SimpleCassandraVersion.create("5.0.0");
+
+ @CassandraIntegrationTest
+ void testSSTableImport(VertxTestContext vertxTestContext)
+ throws Exception
+ {
+ assumeThat(sidecarTestContext.version)
+ .as("Vector type is supported since Cassandra 5.0")
+ .isGreaterThanOrEqualTo(MIN_VERSION_WITH_VECTOR);
+
+ // Create a table. Insert some data, create a snapshot that we'll use
for import.
+ // Move snapshot files to a temporary destination.
+ // Truncate the table and insert more data.
+ // Upload the SSTables.
+ // Test the import SSTable endpoint by importing data that was
originally truncated.
+ // Verify by querying the table contains all the results before
truncation and after truncation.
+
+ createTestKeyspace();
+ Session session = maybeGetSession();
+ QualifiedTableName tableName =
createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
+
+ // wait for schema to be present
+ // periodic refresh of Java driver unsupported schemas is configured
to 3 seconds
+ AssertionUtils.loopAssert(15, 1000, () -> {
+ String testSchemaRoute = "/api/v1/cassandra/schema";
+ HttpResponse<Buffer> response = client.get(server.actualPort(),
"127.0.0.1", testSchemaRoute).send()
+
.toCompletionStage().toCompletableFuture().join();
+
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+
assertThat(response.bodyAsString()).contains(tableName.tableName());
+ });
+
+ // create a snapshot called <tableName>-snapshot for tbl1
+ IClusterExtension<? extends IInstance> cluster =
sidecarTestContext.cluster();
+ final String snapshotStdout = cluster.get(1).nodetoolResult("snapshot",
Review Comment:
nit: drop the `final` to be consistent with the code style.
(That said, I know it is easy to miss)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]