lukasz-antoniak commented on code in PR #340:
URL: https://github.com/apache/cassandra-sidecar/pull/340#discussion_r3161689454


##########
server/src/main/java/org/apache/cassandra/sidecar/db/DriverUnsupportedSchemaCache.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * The {@link DriverUnsupportedSchemaCache} class maintains cache of CQL 
schema for tables whose definition is not
+ * supported by driver natively. Java driver 3.x does not return {@link 
TableMetadata} for tables which schema
+ * could not be parsed. Since it does not currently support vector type, any 
table containing vector would not
+ * be visible. Cache is refreshed for the first time as soon as CQL connection 
is established or upon first lookup.
+ * Later, it is periodically refreshed according to configured schedule.
+ * TODO: Remove after upgrade to Java driver 4.x (CASSSIDECAR-421).
+ */
+@Singleton
+public class DriverUnsupportedSchemaCache implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DriverUnsupportedSchemaCache.class);
+    private static final String STATEMENT_DELIMITER = "\n\n";
+
+    private final SidecarConfiguration sidecarConfiguration;
+    private final CQLSessionProvider sessionProvider;
+    private final CQLSchemaAccessor schemaAccessor;
+
+    // cache contains only schemas unparseable by Java driver
+    // during every cache refresh, reference is being reassigned to achieve 
atomic swap
+    private volatile SortedMap<QualifiedTableName, String> schemaCache;
+    private volatile boolean initialized; // flag indicating whether cache has 
been populated at least once
+
+    private PreparedStatement tableListStatement;
+
+    public DriverUnsupportedSchemaCache(SidecarConfiguration 
sidecarConfiguration,
+                                        CQLSessionProvider sessionProvider)
+    {
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.sessionProvider = sessionProvider;
+        this.schemaAccessor = new CQLSchemaAccessor(sessionProvider);
+        this.schemaCache = createCache();
+        this.initialized = false;
+    }
+
+    /**
+     * @return Schema for all tables across all keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getFullSchema()
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> true);
+    }
+
+    /**
+     * @return Schema for all tables within given keyspaces (only those not 
supported by Java driver).
+     */
+    @NotNull
+    public String getKeyspaceSchema(@NotNull Name keyspace)
+    {
+        // we cannot know whether schema was updated after last
+        // periodic refresh, so results may be stale
+        refreshIfUninitialized();
+        return getUnsupportedSchema(table -> 
keyspace.equals(table.getKeyspace()));
+    }
+
+    /**
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table)
+    {
+        return getTableSchema(keyspace, table, true);
+    }
+
+    /**
+     * @param allowRefresh Flag indicating whether lookup of table schema via 
CQL query
+     *                     is allowed, when data not found in cache.
+     * @return Schema for table not supported by Java driver's metadata, 
{@code null} otherwise.
+     */
+    @Nullable
+    public String getTableSchema(@NotNull Name keyspace, @NotNull Name table, 
boolean allowRefresh)
+    {
+        refreshIfUninitialized();
+        QualifiedTableName name = new QualifiedTableName(keyspace, table);
+        String schema = schemaCache.get(name);
+        if (schema == null && allowRefresh)
+        {
+            // proactively fetch table schema, not to provide
+            // false-negative when table is not cached yet
+            // attention, method can block
+            schema = populateSchemaCache(schemaCache, name);
+        }
+        return schema;
+    }
+
+    @Override
+    public DurationSpec delay()
+    {
+        return 
sidecarConfiguration.driverConfiguration().unsupportedTableSchemaRefreshTime();
+    }
+
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh(false);
+            promise.tryComplete();
+        }
+        catch (Throwable t)
+        {
+            promise.fail(t);
+        }
+    }
+
+    private String getUnsupportedSchema(Predicate<QualifiedTableName> 
condition)
+    {
+        StringBuilder result = new StringBuilder();
+        for (Map.Entry<QualifiedTableName, String> entry : 
schemaCache.entrySet())
+        {
+            if (condition.test(entry.getKey()))
+            {
+                result.append(entry.getValue()).append(STATEMENT_DELIMITER);
+            }
+        }
+        return result.toString().trim();
+    }
+
+    private void refreshIfUninitialized()
+    {
+        if (!initialized)
+        {
+            refresh(true);
+        }
+    }
+
+    public synchronized void refresh(boolean initializeOnly)
+    {
+        if (initialized && initializeOnly)
+        {
+            // cache has been already initialized, early exit
+            return;
+        }
+        try
+        {
+            Session session = sessionProvider.get();
+            prepareStatements(session);
+
+            Set<QualifiedTableName> tables = queryAllTables(session);
+            Set<QualifiedTableName> driverKnownTables = 
driverKnownTables(session);
+
+            tables.removeAll(driverKnownTables);
+
+            if (!tables.isEmpty())
+            {
+                LOGGER.debug("Tables not supported by Java driver metadata: 
{}", tables);
+                SortedMap<QualifiedTableName, String> newCache = createCache();
+                tables.forEach(table -> populateSchemaCache(newCache, table));
+                // replacing cache, because some tables might have been 
removed in the meanwhile
+                schemaCache = newCache;
+            }
+            else
+            {
+                schemaCache.clear();
+            }
+
+            initialized = true;
+        }
+        catch (CassandraUnavailableException ignored)
+        {
+            LOGGER.debug("Not yet connect to Cassandra cluster");

Review Comment:
   That was copy-paste from `CachedLocalTokenRanges` :). Fixing in both places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to