yifan-c commented on code in PR #340:
URL: https://github.com/apache/cassandra-sidecar/pull/340#discussion_r3172176371


##########
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/CassandraNodeOperationsIntegrationTest.java:
##########
@@ -76,7 +76,7 @@ void testNodeDrainOperationSuccess()
         trustedClient().put(serverWrapper.serverPort, "localhost", 
ApiEndpointsV1.NODE_DRAIN_ROUTE)
                        .send());
 
-        assertThat(drainResponse.statusCode()).isEqualTo(OK.code());
+        assertThat(drainResponse.statusCode()).isIn(OK.code(), 
ACCEPTED.code());

Review Comment:
   Why is it necessary to change the assertion?



##########
server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL 
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link 
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any 
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection 
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+    private static final String STATEMENT_DELIMITER = "\n\n";
+
+    private final SidecarConfiguration sidecarConfiguration;
+    private final CQLSessionProvider sessionProvider;
+    private final CQLSchemaAccessor schemaAccessor;
+
+    // cache contains only schemas unparseable by Java driver
+    // during every cache refresh, reference is being reassigned to achieve 
atomic swap
+    private volatile SortedMap<QualifiedTableName, String> schemaCache;
+    private volatile boolean initialized; // flag indicating whether cache has 
been populated at least once
+
+    private PreparedStatement tableListStatement;
+
+    public DriverUnsupportedSchemaCache(SidecarConfiguration 
sidecarConfiguration,
+                                        CQLSessionProvider sessionProvider)
+    {
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.sessionProvider = sessionProvider;
+        this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+        this.schemaCache = createCache();
+        this.initialized = false;
+    }
+
+    /**
+     * @return Schema for all tables across all keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getFullSchema()
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> true);
+    }
+
+    /**
+     * @return Schema for all tables within given keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getKeyspaceSchema(@NotNull Name keyspace)
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> 
keyspace.equals(table.getKeyspace()));
+    }
+
+    /**
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+    {
+        return getTableSchema(keyspace, table, true);
+    }
+
+    /**
+     * @param allowRefresh Flag indicating whether lookup of table schema via 
CQL query
+     *                     is allowed, when data not found in cache.
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table, 
boolean allowRefresh)
+    {
+        refreshIfUninitialized();

Review Comment:
   Does it mean that the `allowRefresh: false` might not be honored if 
uninitialized?



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/sstableuploads/SSTableUploadHandler.java:
##########
@@ -209,13 +214,17 @@ private Future<SSTableUploadRequestParam> 
validateKeyspaceAndTable(String host,
                            logger.error(message);
                            return 
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
                        }
-
                        if (MetadataUtils.table(keyspaceMetadata, 
request.table()) == null)
                        {
-                           String message = String.format("Invalid table name 
'%s' supplied for keyspace '%s'",
-                                                          request.table(), 
request.keyspace());
-                           logger.error(message);
-                           return 
Future.failedFuture(wrapHttpException(HttpResponseStatus.BAD_REQUEST, message));
+                           // check for tables that are not parseable by Java 
driver's metadata feature
+                           boolean tableExists = 
driverUnsupportedSchemaCache.getTableSchema(request.keyspace(), 
request.table(), false) != null;

Review Comment:
   Same. `getTableSchema` may block unconditionally, if not initialized. 



##########
server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.common.utils.StringUtils;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL 
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link 
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any 
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection 
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+    private static final String STATEMENT_DELIMITER = "\n\n";
+
+    private final SidecarConfiguration sidecarConfiguration;
+    private final CQLSessionProvider sessionProvider;
+    private final CQLSchemaAccessor schemaAccessor;
+
+    // cache contains only schemas unparseable by Java driver
+    // during every cache refresh, reference is being reassigned to achieve 
atomic swap
+    private volatile SortedMap<QualifiedTableName, String> schemaCache;
+    private volatile boolean initialized; // flag indicating whether cache has 
been populated at least once
+
+    private PreparedStatement tableListStatement;
+
+    public DriverUnsupportedSchemaCache(SidecarConfiguration 
sidecarConfiguration,
+                                        CQLSessionProvider sessionProvider)
+    {
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.sessionProvider = sessionProvider;
+        this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+        this.schemaCache = createCache();
+        this.initialized = false;
+    }
+
+    /**
+     * @return Schema for all tables across all keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getFullSchema()
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> true);
+    }
+
+    /**
+     * @return Schema for all tables within given keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getKeyspaceSchema(@NotNull Name keyspace)
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> 
keyspace.equals(table.getKeyspace()));
+    }
+
+    /**
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+    {
+        return getTableSchema(keyspace, table, true);
+    }
+
+    /**
+     * @param allowRefresh Flag indicating whether lookup of table schema via 
CQL query
+     *                     is allowed, when data not found in cache.
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table, 
boolean allowRefresh)
+    {
+        refreshIfUninitialized();
+        QualifiedTableName name = new QualifiedTableName(keyspace, table);
+        String schema = schemaCache.get(name);
+        if (schema == null && allowRefresh)
+        {
+            // proactively fetch table schema, not to provide
+            // false-negative when table is not cached yet
+            // attention, method can block
+            schema = populateSchemaCache(schemaCache, name);
+        }
+        return schema;
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return 
sidecarConfiguration.driverConfiguration().unsupportedTableSchemaRefreshTime();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh(false);
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    private String getUnsupportedSchema(Predicate<QualifiedTableName> 
condition)
+    {
+        StringBuilder result = new StringBuilder();
+        for (Map.Entry<QualifiedTableName, String> entry : 
schemaCache.entrySet())
+        {
+            if (condition.test(entry.getKey()))
+            {
+                result.append(entry.getValue()).append(STATEMENT_DELIMITER);
+            }
+        }
+        return result.toString().trim();
+    }
+
+    private void refreshIfUninitialized()
+    {
+        if (!initialized)
+        {
+            refresh(true);
+        }
+    }
+
+    public synchronized void refresh(boolean initializeOnly)
+    {
+        if (initialized && initializeOnly)
+        {
+            // cache has been already initialized, early exit
+            return;
+        }
+        try
+        {
+            Session session = sessionProvider.get();
+            prepareStatements(session);
+
+            Set<QualifiedTableName> tables = queryAllTables(session);
+            Set<QualifiedTableName> driverKnownTables = 
driverKnownTables(session);
+
+            tables.removeAll(driverKnownTables);
+
+            SortedMap<QualifiedTableName, String> newCache = createCache();
+            if (!tables.isEmpty())
+            {
+                LOGGER.debug("Tables not supported by Java driver metadata: 
{}", tables);
+                tables.forEach(table -> populateSchemaCache(newCache, table));
+                // replacing cache, because some tables might have been 
removed in the meanwhile
+            }
+            schemaCache = newCache;
+
+            initialized = true;
+        }
+        catch (CassandraUnavailableException ignored)
+        {
+            LOGGER.debug("Not yet connected to Cassandra cluster");
+        }
+    }
+
+    private String populateSchemaCache(Map<QualifiedTableName, String> cache, 
QualifiedTableName table)
+    {
+        List<String> cqlSchema = 
schemaAccessor.getTableSchema(table.getKeyspace(), table.table());

Review Comment:
   getKeyspace and  table can return null value, but the `getTableSchema` 
method expects `@NotNull` values. 



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/KeyspaceSchemaHandler.java:
##########
@@ -130,6 +144,27 @@ private Future<Metadata> metadata(String host)
         });
     }
 
+    /**
+     * Get CQL schema not parseable by Java driver (and therefore not present 
in {@link Metadata}).
+     *
+     * @param keyspace optional keyspace name, {@code null} includes schema 
for all keyspaces
+     * @return {@link Future} containing CQL schema
+     */
+    private Future<String> unsupportedSchema(Name keyspace)
+    {
+        return executorPools.service().executeBlocking(() -> {
+            // schema cache can block if it was not successfully initialized 
at least once

Review Comment:
   👍 



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/validations/ValidateTableExistenceHandler.java:
##########
@@ -97,7 +103,14 @@ protected void handleInternal(RoutingContext context,
             }
 
             TableMetadata tableMetadata = keyspaceMetadata.getTable(table);
-            if (tableMetadata == null)
+            boolean tableExists = tableMetadata != null;
+            if (!tableExists)
+            {
+                tableExists = driverUnsupportedSchemaCache.getTableSchema(new 
Name(keyspaceMetadata.getName()),
+                                                                          new 
Name(table),
+                                                                          
false) != null;

Review Comment:
   If table is not initialized yet, it still may block, even through `false` is 
passed. 



##########
server/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableVectorUploadHandlerIntegrationTest.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.sstableuploads;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.file.OpenOptions;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import 
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableImportHandler;
+import 
org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableUploadHandler;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.IClusterExtension;
+import org.apache.cassandra.testing.utils.AssertionUtils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration tests for {@link SSTableUploadHandler}, {@link 
SSTableImportHandler} and vector type
+ * TODO: Consider removing after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@ExtendWith(VertxExtension.class)
+public class SSTableVectorUploadHandlerIntegrationTest extends 
IntegrationTestBase
+{
+    public static final SimpleCassandraVersion MIN_VERSION_WITH_VECTOR = 
SimpleCassandraVersion.create("5.0.0");
+
+    @CassandraIntegrationTest
+    void testSSTableImport(VertxTestContext vertxTestContext)
+    throws Exception
+    {
+        assumeThat(sidecarTestContext.version)
+        .as("Vector type is supported since Cassandra 5.0")
+        .isGreaterThanOrEqualTo(MIN_VERSION_WITH_VECTOR);
+
+        // Create a table. Insert some data, create a snapshot that we'll use 
for import.
+        // Move snapshot files to a temporary destination.
+        // Truncate the table and insert more data.
+        // Upload the SSTables.
+        // Test the import SSTable endpoint by importing data that was 
originally truncated.
+        // Verify by querying the table contains all the results before 
truncation and after truncation.
+
+        createTestKeyspace();
+        Session session = maybeGetSession();
+        QualifiedTableName tableName = 
createTestTableAndPopulate(sidecarTestContext, Arrays.asList("a", "b"));
+
+        // wait for schema to be present
+        // periodic refresh of Java driver unsupported schemas is configured 
to 3 seconds
+        AssertionUtils.loopAssert(15, 1000, () -> {
+            String testSchemaRoute = "/api/v1/cassandra/schema";
+            HttpResponse<Buffer> response = client.get(server.actualPort(), 
"127.0.0.1", testSchemaRoute).send()
+                                                  
.toCompletionStage().toCompletableFuture().join();
+            
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+            
assertThat(response.bodyAsString()).contains(tableName.tableName());
+        });
+
+        // create a snapshot called <tableName>-snapshot for tbl1
+        IClusterExtension<? extends IInstance> cluster = 
sidecarTestContext.cluster();
+        final String snapshotStdout = cluster.get(1).nodetoolResult("snapshot",

Review Comment:
   nit: drop the `final` to be consistent with the code style. 
   
   (That said, I know it is easy to miss)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to