This is an automated email from the ASF dual-hosted git repository.

bernardobotella pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa93d664 CASSANALYTICS-94: Expose SidecarCdc (#147)
aa93d664 is described below

commit aa93d664aabd85a375a0ac7d9fc39dfe99d3c8e9
Author: Bernardo Botella <[email protected]>
AuthorDate: Mon Nov 3 13:35:18 2025 +0100

    CASSANALYTICS-94: Expose SidecarCdc (#147)
    
    Patch by Bernardo Botella; reviewed by Jyothsna Konisa, Francisco Guerrero 
for CASSANALYTICS-94
---
 CHANGES.txt                                        |   1 +
 .../cassandra/cdc/sidecar/CdcSidecarInstance.java  |  35 ++++
 .../cdc/sidecar/CdcSidecarInstancesProvider.java   |  34 +++
 .../apache/cassandra/cdc/sidecar/SidecarCdc.java   |  32 +++
 .../cassandra/cdc/sidecar/SidecarCdcBuilder.java   |  17 +-
 .../cassandra/cdc/sidecar/SidecarCdcClient.java    | 230 ++++++++++++++++++++-
 .../cdc/sidecar/SidecarCdcCommitLogSegment.java    |   3 +-
 .../cassandra/cdc/sidecar/SidecarCdcTest.java      |  79 +++++++
 8 files changed, 421 insertions(+), 10 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96b619ab..28cd7e3f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
  * Fix bulk reader node availability comparator ordering (CASSANALYTICS-99)
  * Remove not needed buffer flips (CASSANALYTICS-95)
  * Refactor BulkWriterContext broadcasting to use immutable config class 
(CASSANALYTICS-89)
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
new file mode 100644
index 00000000..e87fe421
--- /dev/null
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+/**
+ * Holds information about the Cassandra Sidecar Instance host and port
+ */
+public interface CdcSidecarInstance
+{
+    /**
+     * @return the port where the Cassandra Sidecar instance is listening
+     */
+    int port();
+
+    /**
+     * @return the hostname where the Cassandra Sidecar instance is running
+     */
+    String hostname();
+}
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
new file mode 100644
index 00000000..ec0cfbec
--- /dev/null
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.util.List;
+
+
+/**
+ * A class that provides the list of {@link CdcSidecarInstance}s. This class 
allows for statically or dynamically
+ * providing a list of instances. It is meant to support expansions and shrink 
of Cassandra clusters.
+ */
+public interface CdcSidecarInstancesProvider
+{
+    /**
+     * @return the list of {@link CdcSidecarInstance}s
+     */
+    List<CdcSidecarInstance> instances();
+}
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
index 6a652f39..e5ac33b6 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
@@ -19,11 +19,18 @@
 
 package org.apache.cassandra.cdc.sidecar;
 
+import java.io.IOException;
 import java.util.Comparator;
 import java.util.Optional;
 import java.util.Set;
 
 import org.apache.cassandra.cdc.Cdc;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.utils.FutureUtils;
@@ -43,6 +50,31 @@ public class SidecarCdc extends Cdc
         initSchema();
     }
 
+    public static SidecarCdcBuilder builder(@NotNull String jobId,
+                                            int partitionId,
+                                            CdcOptions cdcOptions,
+                                            ClusterConfigProvider 
clusterConfigProvider,
+                                            EventConsumer eventConsumer,
+                                            SchemaSupplier schemaSupplier,
+                                            TokenRangeSupplier 
tokenRangeSupplier,
+                                            CdcSidecarInstancesProvider 
sidecarInstancesProvider,
+                                            SidecarCdcClient.ClientConfig 
clientConfig,
+                                            SecretsProvider secretsProvider,
+                                            ICdcStats cdcStats) throws 
IOException
+    {
+        return new SidecarCdcBuilder(jobId,
+                                     partitionId,
+                                     cdcOptions,
+                                     clusterConfigProvider,
+                                     eventConsumer,
+                                     schemaSupplier,
+                                     tokenRangeSupplier,
+                                     sidecarInstancesProvider,
+                                     clientConfig,
+                                     secretsProvider,
+                                     cdcStats);
+    }
+
     public void initSchema()
     {
         Set<CqlTable> tables = 
FutureUtils.get(schemaSupplier.getCdcEnabledTables());
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
index b03695b3..ecc5fa93 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
@@ -20,9 +20,12 @@
 package org.apache.cassandra.cdc.sidecar;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 
+import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
+import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
 import org.apache.cassandra.cdc.CdcBuilder;
 import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
@@ -32,7 +35,6 @@ import org.apache.cassandra.cdc.stats.ICdcStats;
 import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.secrets.SecretsProvider;
 import o.a.c.sidecar.client.shaded.client.SidecarClient;
-import o.a.c.sidecar.client.shaded.client.SidecarInstancesProvider;
 import org.apache.cassandra.spark.utils.AsyncExecutor;
 import org.jetbrains.annotations.NotNull;
 
@@ -54,8 +56,8 @@ public class SidecarCdcBuilder extends CdcBuilder
                       EventConsumer eventConsumer,
                       SchemaSupplier schemaSupplier,
                       TokenRangeSupplier tokenRangeSupplier,
-                      SidecarInstancesProvider sidecarInstancesProvider,
-                      Sidecar.ClientConfig clientConfig,
+                      CdcSidecarInstancesProvider sidecarInstancesProvider,
+                      SidecarCdcClient.ClientConfig clientConfig,
                       SecretsProvider secretsProvider,
                       ICdcStats cdcStats) throws IOException
     {
@@ -68,7 +70,10 @@ public class SidecarCdcBuilder extends CdcBuilder
         schemaSupplier,
         tokenRangeSupplier,
         clientConfig,
-        Sidecar.from(sidecarInstancesProvider, clientConfig, secretsProvider),
+        Sidecar.from(new 
SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream()
+                                                                               
 .map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
+                                                                               
 .collect(Collectors.toList())),
+                     clientConfig.toGenericSidecarConfig(), secretsProvider),
         cdcStats
         );
     }
@@ -80,7 +85,7 @@ public class SidecarCdcBuilder extends CdcBuilder
                       EventConsumer eventConsumer,
                       SchemaSupplier schemaSupplier,
                       TokenRangeSupplier tokenRangeSupplier,
-                      Sidecar.ClientConfig clientConfig,
+                      SidecarCdcClient.ClientConfig clientConfig,
                       SidecarClient sidecarClient,
                       ICdcStats cdcStats)
     {
@@ -103,7 +108,7 @@ public class SidecarCdcBuilder extends CdcBuilder
         return this;
     }
 
-    public SidecarCdcBuilder withSidecarClient(Sidecar.ClientConfig 
clientConfig,
+    public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig 
clientConfig,
                                                SidecarClient sidecarClient,
                                                ICdcStats cdcStats)
     {
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
index 585b48d2..2dc98803 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
@@ -21,7 +21,10 @@ package org.apache.cassandra.cdc.sidecar;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -32,18 +35,30 @@ import org.apache.cassandra.clients.Sidecar;
 import o.a.c.sidecar.client.shaded.client.SidecarClient;
 import o.a.c.sidecar.client.shaded.client.SidecarInstance;
 import o.a.c.sidecar.client.shaded.client.StreamBuffer;
+import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.exceptions.TransportFailureException;
+import org.apache.cassandra.spark.utils.MapUtils;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MILLIS_TO_SLEEP;
+import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
+import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;
 
 public class SidecarCdcClient
 {
-    final Sidecar.ClientConfig config;
+    final ClientConfig config;
     final SidecarClient sidecarClient;
     final ICdcStats stats;
 
-    public SidecarCdcClient(Sidecar.ClientConfig config,
+    public SidecarCdcClient(ClientConfig config,
                             SidecarClient sidecarClient,
                             ICdcStats stats)
     {
@@ -150,4 +165,215 @@ public class SidecarCdcClient
             }
         };
     }
+
+    public static final class ClientConfig
+    {
+        public static final String MAX_BUFFER_SIZE_BYTES_KEY = 
"maxBufferSizeBytes";
+        public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = 
"chunkBufferSizeBytes";
+        public static final String DEFAULT_CASSANDRA_ROLE = null;
+
+        private final int userProvidedPort;
+        private final int maxRetries;
+        private final int maxPoolSize;
+        private final int timeoutSeconds;
+        private final long millisToSleep;
+        private final long maxMillisToSleep;
+        private final long maxBufferSize;
+        private final long chunkSize;
+        private final String cassandraRole;
+        private final Map<FileType, Long> maxBufferOverride;
+        private final Map<FileType, Long> chunkBufferOverride;
+
+        // CHECKSTYLE IGNORE: Constructor with many parameters
+        private ClientConfig(int userProvidedPort,
+                             int maxRetries,
+                             long millisToSleep,
+                             long maxMillisToSleep,
+                             long maxBufferSize,
+                             long chunkSize,
+                             int maxPoolSize,
+                             int timeoutSeconds,
+                             String cassandraRole,
+                             Map<FileType, Long> maxBufferOverride,
+                             Map<FileType, Long> chunkBufferOverride)
+        {
+            this.userProvidedPort = userProvidedPort;
+            this.maxRetries = maxRetries;
+            this.millisToSleep = millisToSleep;
+            this.maxMillisToSleep = maxMillisToSleep;
+            this.maxBufferSize = maxBufferSize;
+            this.chunkSize = chunkSize;
+            this.maxPoolSize = maxPoolSize;
+            this.timeoutSeconds = timeoutSeconds;
+            this.cassandraRole = cassandraRole;
+            this.maxBufferOverride = maxBufferOverride;
+            this.chunkBufferOverride = chunkBufferOverride;
+        }
+
+        public int userProvidedPort()
+        {
+            return userProvidedPort;
+        }
+
+        public int effectivePort()
+        {
+            return userProvidedPort == -1 ? DEFAULT_SIDECAR_PORT : 
userProvidedPort;
+        }
+
+        public int maxRetries()
+        {
+            return maxRetries;
+        }
+
+        public long millisToSleep()
+        {
+            return millisToSleep;
+        }
+
+        public long maxMillisToSleep()
+        {
+            return maxMillisToSleep;
+        }
+
+        public long maxBufferSize()
+        {
+            return maxBufferSize(FileType.COMMITLOG);
+        }
+
+        public long maxBufferSize(FileType fileType)
+        {
+            return maxBufferOverride.getOrDefault(fileType, maxBufferSize);
+        }
+
+        public Map<FileType, Long> maxBufferOverride()
+        {
+            return maxBufferOverride;
+        }
+
+        public long chunkBufferSize()
+        {
+            return chunkBufferSize(FileType.COMMITLOG);
+        }
+
+        public long chunkBufferSize(FileType fileType)
+        {
+            return chunkBufferOverride.getOrDefault(fileType, chunkSize);
+        }
+
+        public Map<FileType, Long> chunkBufferOverride()
+        {
+            return chunkBufferOverride;
+        }
+
+        public int maxPoolSize()
+        {
+            return maxPoolSize;
+        }
+
+        public int timeoutSeconds()
+        {
+            return timeoutSeconds;
+        }
+
+        @Nullable
+        public String cassandraRole()
+        {
+            return cassandraRole;
+        }
+
+        public static ClientConfig create()
+        {
+            return ClientConfig.create(-1, DEFAULT_MAX_RETRIES, 
DEFAULT_MILLIS_TO_SLEEP);
+        }
+
+        public static ClientConfig create(int userProvidedPort, int 
effectivePort)
+        {
+            return ClientConfig.create(userProvidedPort, DEFAULT_MAX_RETRIES, 
DEFAULT_MILLIS_TO_SLEEP);
+        }
+
+        public static ClientConfig create(int userProvidedPort, int 
maxRetries, long millisToSleep)
+        {
+            Map<FileType, Long> chunkOverride = new HashMap<>();
+            chunkOverride.put(FileType.COMMITLOG, 4 * 1024 * 1024L);  // 4MB 
chunks
+
+            return ClientConfig.create(userProvidedPort,
+                                       maxRetries,
+                                       millisToSleep,
+                                       DEFAULT_MAX_MILLIS_TO_SLEEP,
+                                       DEFAULT_MAX_BUFFER_SIZE,
+                                       4 * 1024 * 1024L,
+                                       DEFAULT_MAX_POOL_SIZE,
+                                       DEFAULT_TIMEOUT_SECONDS,
+                                       DEFAULT_CASSANDRA_ROLE,
+                                       DEFAULT_MAX_BUFFER_OVERRIDE,
+                                       chunkOverride);
+        }
+
+        public static Map<FileType, Long> buildMaxBufferOverride(Map<String, 
String> options,
+                                                                 Map<FileType, 
Long> defaultValue)
+        {
+            return buildOverrideMap(MAX_BUFFER_SIZE_BYTES_KEY, options, 
defaultValue);
+        }
+
+        public static Map<FileType, Long> buildChunkBufferOverride(Map<String, 
String> options,
+                                                                   
Map<FileType, Long> defaultValue)
+        {
+            return buildOverrideMap(CHUNK_BUFFER_SIZE_BYTES_KEY, options, 
defaultValue);
+        }
+
+        private static Map<FileType, Long> buildOverrideMap(String keyPrefix,
+                                                            Map<String, 
String> options,
+                                                            Map<FileType, 
Long> defaultValue)
+        {
+            Map<FileType, Long> result = new HashMap<>(defaultValue);
+            for (FileType type : FileType.values())
+            {
+                // Override with DataSourceOptions if set, e.g. 
maxBufferSizeBytes_Index.db
+                String key = MapUtils.lowerCaseKey(String.format("%s_%s", 
keyPrefix, type.getFileSuffix()));
+                
Optional.ofNullable(options.get(key)).map(Long::parseLong).ifPresent(s -> 
result.put(type, s));
+            }
+            return result;
+        }
+
+        // CHECKSTYLE IGNORE: Method with many parameters
+        public static ClientConfig create(int userProvidedPort,
+                                          int maxRetries,
+                                          long millisToSleep,
+                                          long maxMillisToSleep,
+                                          long maxBufferSizeBytes,
+                                          long chunkSizeBytes,
+                                          int maxPoolSize,
+                                          int timeoutSeconds,
+                                          String cassandraRole,
+                                          Map<FileType, Long> 
maxBufferOverride,
+                                          Map<FileType, Long> 
chunkBufferOverride)
+        {
+            return new ClientConfig(userProvidedPort,
+                                    maxRetries,
+                                    millisToSleep,
+                                    maxMillisToSleep,
+                                    maxBufferSizeBytes,
+                                    chunkSizeBytes,
+                                    maxPoolSize,
+                                    timeoutSeconds,
+                                    cassandraRole,
+                                    maxBufferOverride,
+                                    chunkBufferOverride);
+        }
+
+        public Sidecar.ClientConfig toGenericSidecarConfig()
+        {
+            return Sidecar.ClientConfig.create(this.userProvidedPort,
+                                               this.maxRetries,
+                                               this.millisToSleep,
+                                               this.maxMillisToSleep,
+                                               DEFAULT_MAX_BUFFER_SIZE,
+                                               4 * 1024 * 1024L,
+                                               this.maxPoolSize,
+                                               this.timeoutSeconds,
+                                               this.cassandraRole,
+                                               this.maxBufferOverride,
+                                               this.chunkBufferOverride);
+        }
+    }
 }
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
index b20ca3e4..30662e29 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
@@ -25,7 +25,6 @@ import 
o.a.c.sidecar.client.shaded.common.response.data.CdcSegmentInfo;
 import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
 import org.apache.cassandra.cdc.api.CommitLog;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
@@ -46,7 +45,7 @@ public class SidecarCdcCommitLogSegment implements CommitLog
     public SidecarCdcCommitLogSegment(SidecarCdcClient sidecar,
                                       CassandraInstance instance,
                                       CdcSegmentInfo segment,
-                                      Sidecar.ClientConfig clientConfig)
+                                      SidecarCdcClient.ClientConfig 
clientConfig)
     {
         this.instance = instance;
         this.segment = segment;
diff --git 
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
 
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
new file mode 100644
index 00000000..3ad306d8
--- /dev/null
+++ 
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.client.SidecarClient;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for SidecarCdc class
+ */
+public class SidecarCdcTest
+{
+    @Test
+    public void testBuilderMethodCreatesValidBuilder()
+    {
+        String jobId = "test-job-123";
+        int partitionId = 0;
+        CdcOptions cdcOptions = mock(CdcOptions.class);
+        ClusterConfigProvider clusterConfigProvider = 
mock(ClusterConfigProvider.class);
+        EventConsumer eventConsumer = mock(EventConsumer.class);
+        SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
+        TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
+        SidecarCdcClient.ClientConfig clientConfig = 
SidecarCdcClient.ClientConfig.create();
+        SidecarClient mockSidecarClient = mock(SidecarClient.class);
+        ICdcStats cdcStats = mock(ICdcStats.class);
+
+        SidecarCdcBuilder builder = new SidecarCdcBuilder(
+            jobId,
+            partitionId,
+            cdcOptions,
+            clusterConfigProvider,
+            eventConsumer,
+            schemaSupplier,
+            tokenRangeSupplier,
+            clientConfig,
+            mockSidecarClient,
+            cdcStats
+        );
+
+        // Verify the builder is properly created and configured
+        assertThat(builder).isNotNull();
+        assertThat(builder).isInstanceOf(SidecarCdcBuilder.class);
+
+        // Verify the builder has the cluster config provider set
+        
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);
+
+        // Verify the builder has a sidecar CDC client configured
+        assertThat(builder.sidecarCdcClient).isNotNull();
+        
assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient);
+        assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig);
+        assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to