jmckenzie-dev commented on code in PR #347:
URL: https://github.com/apache/cassandra-sidecar/pull/347#discussion_r3237034254


##########
server/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##########
@@ -119,21 +116,8 @@ String jarResourceName(String... parts)
 
     public ClassLoader buildClassLoader(String... resourceNames)
     {
-        URL[] urls = Arrays.stream(resourceNames)
-                           
.map(BaseCassandraBridgeFactory::copyClassResourceToFile)
-                           .map(jar -> {
-                               try
-                               {
-                                   return jar.toURI().toURL();
-                               }
-                               catch (MalformedURLException e)
-                               {
-                                   throw new RuntimeException(e);
-                               }
-                           }).toArray(URL[]::new);
-
         return AccessController.doPrivileged((PrivilegedAction<ClassLoader>) 
() ->
-                                                                             
new PostDelegationClassLoader(urls, 
Thread.currentThread().getContextClassLoader()));
+                                                                             
BaseCassandraBridgeFactory.buildClassLoader(resourceNames));

Review Comment:
   Can we left align w/1 tab here instead of having this aligned so far to the 
right?



##########
server/build.gradle:
##########
@@ -151,6 +151,7 @@ dependencies {
     implementation(group: "org.apache.cassandra", name: 
"cassandra-avro-converter_spark3_2.12", version: 
"${[project.analyticsVersion]}")
     implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}")
     implementation(group: "org.apache.cassandra", name: 
"cassandra-analytics-cdc-sidecar_spark3_2.12", version: 
"${[project.analyticsVersion]}")
+    compileOnly(group: "org.apache.cassandra", name: 
"cassandra-analytics-sidecar-client", version: "${[project.analyticsVersion]}")

Review Comment:
   We should comment on why this is here and `compileOnly` so it's clear for 
future maintainers. Not immediately obvious.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -176,17 +209,26 @@ void onSchemaChanged()
                         
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
                     }
                     LOGGER.info("Re-generating Avro Schema after schema change 
keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table());
-                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter.convert(cqlTable));
                 }
                 return v;
             });
         }
-        loadPublisher();
-        publishSchemas();
+        try
+        {
+            loadPublisher();
+            publishSchemas();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to publish schemas to Kafka, CDC will still 
start", e);

Review Comment:
   Similar to publisher init above, why do we continue even if schema 
publishing fails? Let's document the reasoning and intent here. And if we're 
not 100% sure this is the right default, we should consider making this a 
configurable parameter for operators.



##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcPublisherTests.java:
##########
@@ -111,204 +112,22 @@ void setUp()
 
         cdcPublisher = new CdcPublisher(
             vertx,
-            sidecarConfiguration,
             executorPools,
             clusterConfigProvider,
             schemaSupplier,
-            sidecarInstancesProvider,
-            clientConfig,
             instanceMetadataFetcher,
             cdcConfig,
             databaseAccessor,
             cdcStats,
             virtualTables,
             sidecarCdcStats,
-            avroSerializer,
-            rangeManager
-        );
-    }
-
-
-    @Test
-    void testSecretsProviderReturnsNullWhenSslDisabled()

Review Comment:
   I think we lost this test in the migration over to the new test file - was 
that deliberate?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -176,17 +209,26 @@ void onSchemaChanged()
                         
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
                     }
                     LOGGER.info("Re-generating Avro Schema after schema change 
keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table());
-                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter.convert(cqlTable));
                 }
                 return v;
             });
         }
-        loadPublisher();
-        publishSchemas();
+        try
+        {
+            loadPublisher();
+            publishSchemas();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to publish schemas to Kafka, CDC will still 
start", e);
+        }
         // Remove any old schema entries for deleted tables, this operation 
can be done in the end as this is
         // only for removing stale entries and no one is going to use these 
entries once the table is removed.
         // This doesn't have to be an atomic operation.
-        
avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream().map(cqlTable -> 
TableIdentifier.of(cqlTable.keyspace(), 
cqlTable.table())).collect(Collectors.toList()));
+        avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream()
+                                                               .map(cqlTable 
-> TableIdentifier.of(cqlTable.keyspace(), cqlTable.table()))
+                                                               
.collect(Collectors.toList()));
         vertx.eventBus().publish(ON_CDC_CACHE_WARMED_UP.address(), "Cdc cache 
warmed up");

Review Comment:
   We only publish the `ON_CDC_CACHE_WARMED_UP` method on `onSchemaChanged`. 
Does this mean initialization has a logical and temporal coupling on some 
reactive, config-based, event driven thing to take place to warm up the cache?
   
   I'd expect we would initialize and be in a ready state out of the gate 
w/this published, not having to rely on a reactive pipeline of events to 
cascade through to trigger this.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -91,13 +105,13 @@ public class CachingSchemaStore implements SchemaStore
         this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
         this.sidecarSchema = sidecarSchema;
         this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
-        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
-        AvroSchemas.registerLogicalTypes();
-        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
         this.vertx = vertx;
         this.cdcConfig = cdcConfig;
         this.sidecarCdcStats = sidecarCdcStats;
         this.schemaStorePublisherFactory = schemaStorePublisherFactory;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));

Review Comment:
   Why the ordering change here? If there's a temporal or ordering based 
requirement, we should add a comment there about that so future maintainers 
don't regress.



##########
build.gradle:
##########
@@ -140,6 +140,10 @@ allprojects {
 
         // for dtest jar
         mavenLocal()
+
+        maven {
+            url 
"https://repository.apache.org/content/repositories/orgapachecassandra-1468/";

Review Comment:
   This needs to be removed before merge right?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########


Review Comment:
   The fact that we don't call `initialize` during the regular `run` and 
startup path is quite surprising and confusing. Why is this constructed this 
way? The `initialize` method looks like it actually only increments metrics and 
sets `isInitialized` to true. The flow there seems generally somewhat odd to me.
   
   My naive assumption is we'd have some flow that is "initialize CDC 
publishing; when it's done, flag `isInitialized` as true." I'd expect both 
`run` and `restart` to lean on this shared code.
   
   Right now, best I can tell we call `startConsumers` in `run` and then don't 
call it again. This is a problem because we call `stopConsumers` in our 
`CdcPublisher#stop` call, meaning post restart our consumers will be stopped 
and never restarted, at least from the context inside this class. We should 
either be responsible for all start/stop on consumers or none in this file.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/SidecarReplicationFactorSupplier.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Comparator;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.sidecar.ReplicationFactorSupplier;
+import org.apache.cassandra.cdc.sidecar.SidecarCommitLogProvider;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.utils.FutureUtils;
+
+/**
+ * {@link ReplicationFactorSupplier} implementation that reads the actual 
replication factor
+ * from Cassandra cluster metadata via {@link CdcOptions}, rather than using 
the default RF=1
+ * SimpleStrategy fallback. Used by {@link SidecarCommitLogProvider} to build 
a correctly
+ * replicated {@link 
org.apache.cassandra.spark.data.partitioner.CassandraRing}.
+ */
+public class SidecarReplicationFactorSupplier implements 
ReplicationFactorSupplier
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarReplicationFactorSupplier.class);
+    private final CdcOptions cdcOptions;
+    private final SchemaSupplier schemaSupplier;
+
+    public SidecarReplicationFactorSupplier(CdcOptions cdcOptions, 
SchemaSupplier schemaSupplier)
+    {
+        this.cdcOptions = cdcOptions;
+        this.schemaSupplier = schemaSupplier;
+    }
+
+    @Override
+    public ReplicationFactor getReplicationFactor(String keyspace)
+    {
+        return cdcOptions.replicationFactor(keyspace);
+    }
+
+    @Override
+    public ReplicationFactor getMaximalReplicationFactor()
+    {
+        String dc = cdcOptions.dc();
+        Set<CqlTable> tables = 
FutureUtils.get(schemaSupplier.getCdcEnabledTables());
+        return tables.stream()
+                     .map(CqlTable::replicationFactor)
+                     .filter(rf -> rf.getOptions().containsKey(dc))
+                     .max(Comparator.comparingInt(rf -> 
rf.getOptions().get(dc)))
+                     .orElseGet(() -> {
+                         LOGGER.warn("No CDC-enabled tables found for DC '{}'; 
falling back to RF=1 SimpleStrategy", dc);

Review Comment:
   Why do we _have_ this here? What's the cause for this fallback, what are the 
implications, etc. It's not clear to me why this wouldn't be a Bad Thing vs. 
failing out if we hit this point and can't get a replication factor for a table 
/ keyspace.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java:
##########
@@ -98,39 +96,40 @@ public CdcManager(EventConsumer eventConsumer,
                       RangeManager rangeManager,
                       InstanceMetadataFetcher instanceFetcher,
                       ClusterConfigProvider clusterConfigProvider,
-                      CdcSidecarInstancesProvider sidecarInstancesProvider,
-                      SecretsProvider secretsProvider,
-                      SidecarCdcClient.ClientConfig clientConfig,
+                      SidecarCdcClient sidecarCdcClient,
                       ICdcStats cdcStats,
                       TaskExecutorPool taskExecutorPool,
-                      CdcDatabaseAccessor cdcDatabaseAccessor)
+                      CdcDatabaseAccessor cdcDatabaseAccessor,
+                      CdcOptions cdcOptions)
     {
         this.eventConsumer = eventConsumer;
         this.schemaSupplier = schemaSupplier;
         this.conf = conf;
         this.rangeManager = rangeManager;
         this.instanceFetcher = instanceFetcher;
         this.clusterConfigProvider = clusterConfigProvider;
-        this.sidecarInstancesProvider = sidecarInstancesProvider;
-        this.secretsProvider = secretsProvider;
-        this.clientConfig = clientConfig;
+        this.sidecarCdcClient = sidecarCdcClient;
         this.cdcStats = cdcStats;
-        this.taskExecutorPool = taskExecutorPool;
-        this.cdcDatabaseAccessor = cdcDatabaseAccessor;
+        this.cdcOptions = cdcOptions;
+        this.asyncExecutor = new ExecutorPoolsExecutor(taskExecutorPool);
+        this.cassandraClient = new 
StateSidecarCdcCassandraClient(cdcDatabaseAccessor);
+        this.rfSupplier = new SidecarReplicationFactorSupplier(cdcOptions, 
schemaSupplier);
     }
 
-    List<SidecarCdc> buildCdcConsumers()
+    List<CdcConsumerEntry> buildCdcConsumers()
     {
         Map<String, Set<TokenRange>> ownedRanges = 
rangeManager.ownedTokenRanges();
         if (ownedRanges == null || ownedRanges.isEmpty())
         {
             throw new IllegalStateException("No owned token ranges right now, 
cql session may still be initializing.");
         }
 
-        // NEW: Deduplicate by (instanceId, tokenRange) to prevent duplicate 
consumers
-        Map<String, SidecarCdc> uniqueConsumers = new 
HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum());
+        // Deduplicate by (instanceId, tokenRange) to prevent duplicate 
consumers
+        Map<String, CdcConsumerEntry> uniqueEntries = new 
HashMap<>(ownedRanges.values().stream().mapToInt(Set::size).sum());

Review Comment:
   I'd prefer if we named this something like `uniqueCDCStates` or something 
else that, via the name, informs _what_ it is. "uniqueEntries" is tautological 
and doesn't really give us much to go off.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to