This is an automated email from the ASF dual-hosted git repository.
bernardobotella pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new aa93d664 CASSANALYTICS-94: Expose SidecarCdc (#147)
aa93d664 is described below
commit aa93d664aabd85a375a0ac7d9fc39dfe99d3c8e9
Author: Bernardo Botella <[email protected]>
AuthorDate: Mon Nov 3 13:35:18 2025 +0100
CASSANALYTICS-94: Expose SidecarCdc (#147)
Patch by Bernardo Botella; reviewed by Jyothsna Konisa, Francisco Guerrero
for CASSANALYTICS-94
---
CHANGES.txt | 1 +
.../cassandra/cdc/sidecar/CdcSidecarInstance.java | 35 ++++
.../cdc/sidecar/CdcSidecarInstancesProvider.java | 34 +++
.../apache/cassandra/cdc/sidecar/SidecarCdc.java | 32 +++
.../cassandra/cdc/sidecar/SidecarCdcBuilder.java | 17 +-
.../cassandra/cdc/sidecar/SidecarCdcClient.java | 230 ++++++++++++++++++++-
.../cdc/sidecar/SidecarCdcCommitLogSegment.java | 3 +-
.../cassandra/cdc/sidecar/SidecarCdcTest.java | 79 +++++++
8 files changed, 421 insertions(+), 10 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 96b619ab..28cd7e3f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.2.0
-----
+ * Expose SidecarCdc builders and interfaces (CASSANALYTICS-94)
* Fix bulk reader node availability comparator ordering (CASSANALYTICS-99)
* Remove not needed buffer flips (CASSANALYTICS-95)
* Refactor BulkWriterContext broadcasting to use immutable config class
(CASSANALYTICS-89)
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
new file mode 100644
index 00000000..e87fe421
--- /dev/null
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstance.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+/**
+ * Holds information about the Cassandra Sidecar Instance host and port
+ */
+public interface CdcSidecarInstance
+{
+ /**
+ * @return the port where the Cassandra Sidecar instance is listening
+ */
+ int port();
+
+ /**
+ * @return the hostname where the Cassandra Sidecar instance is running
+ */
+ String hostname();
+}
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
new file mode 100644
index 00000000..ec0cfbec
--- /dev/null
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/CdcSidecarInstancesProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import java.util.List;
+
+
+/**
+ * A class that provides the list of {@link CdcSidecarInstance}s. This class
allows for statically or dynamically
+ * providing a list of instances. It is meant to support expansions and shrink
of Cassandra clusters.
+ */
+public interface CdcSidecarInstancesProvider
+{
+ /**
+ * @return the list of {@link CdcSidecarInstance}s
+ */
+ List<CdcSidecarInstance> instances();
+}
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
index 6a652f39..e5ac33b6 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
@@ -19,11 +19,18 @@
package org.apache.cassandra.cdc.sidecar;
+import java.io.IOException;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import org.apache.cassandra.cdc.Cdc;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.utils.FutureUtils;
@@ -43,6 +50,31 @@ public class SidecarCdc extends Cdc
initSchema();
}
+ public static SidecarCdcBuilder builder(@NotNull String jobId,
+ int partitionId,
+ CdcOptions cdcOptions,
+ ClusterConfigProvider
clusterConfigProvider,
+ EventConsumer eventConsumer,
+ SchemaSupplier schemaSupplier,
+ TokenRangeSupplier
tokenRangeSupplier,
+ CdcSidecarInstancesProvider
sidecarInstancesProvider,
+ SidecarCdcClient.ClientConfig
clientConfig,
+ SecretsProvider secretsProvider,
+ ICdcStats cdcStats) throws
IOException
+ {
+ return new SidecarCdcBuilder(jobId,
+ partitionId,
+ cdcOptions,
+ clusterConfigProvider,
+ eventConsumer,
+ schemaSupplier,
+ tokenRangeSupplier,
+ sidecarInstancesProvider,
+ clientConfig,
+ secretsProvider,
+ cdcStats);
+ }
+
public void initSchema()
{
Set<CqlTable> tables =
FutureUtils.get(schemaSupplier.getCdcEnabledTables());
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
index b03695b3..ecc5fa93 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
@@ -20,9 +20,12 @@
package org.apache.cassandra.cdc.sidecar;
import java.io.IOException;
+import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
+import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
+import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
import org.apache.cassandra.cdc.CdcBuilder;
import org.apache.cassandra.cdc.api.CdcOptions;
import org.apache.cassandra.cdc.api.EventConsumer;
@@ -32,7 +35,6 @@ import org.apache.cassandra.cdc.stats.ICdcStats;
import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.secrets.SecretsProvider;
import o.a.c.sidecar.client.shaded.client.SidecarClient;
-import o.a.c.sidecar.client.shaded.client.SidecarInstancesProvider;
import org.apache.cassandra.spark.utils.AsyncExecutor;
import org.jetbrains.annotations.NotNull;
@@ -54,8 +56,8 @@ public class SidecarCdcBuilder extends CdcBuilder
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
- SidecarInstancesProvider sidecarInstancesProvider,
- Sidecar.ClientConfig clientConfig,
+ CdcSidecarInstancesProvider sidecarInstancesProvider,
+ SidecarCdcClient.ClientConfig clientConfig,
SecretsProvider secretsProvider,
ICdcStats cdcStats) throws IOException
{
@@ -68,7 +70,10 @@ public class SidecarCdcBuilder extends CdcBuilder
schemaSupplier,
tokenRangeSupplier,
clientConfig,
- Sidecar.from(sidecarInstancesProvider, clientConfig, secretsProvider),
+ Sidecar.from(new
SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream()
+
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
+
.collect(Collectors.toList())),
+ clientConfig.toGenericSidecarConfig(), secretsProvider),
cdcStats
);
}
@@ -80,7 +85,7 @@ public class SidecarCdcBuilder extends CdcBuilder
EventConsumer eventConsumer,
SchemaSupplier schemaSupplier,
TokenRangeSupplier tokenRangeSupplier,
- Sidecar.ClientConfig clientConfig,
+ SidecarCdcClient.ClientConfig clientConfig,
SidecarClient sidecarClient,
ICdcStats cdcStats)
{
@@ -103,7 +108,7 @@ public class SidecarCdcBuilder extends CdcBuilder
return this;
}
- public SidecarCdcBuilder withSidecarClient(Sidecar.ClientConfig
clientConfig,
+ public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig
clientConfig,
SidecarClient sidecarClient,
ICdcStats cdcStats)
{
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
index 585b48d2..2dc98803 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
@@ -21,7 +21,10 @@ package org.apache.cassandra.cdc.sidecar;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -32,18 +35,30 @@ import org.apache.cassandra.clients.Sidecar;
import o.a.c.sidecar.client.shaded.client.SidecarClient;
import o.a.c.sidecar.client.shaded.client.SidecarInstance;
import o.a.c.sidecar.client.shaded.client.StreamBuffer;
+import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.exceptions.TransportFailureException;
+import org.apache.cassandra.spark.utils.MapUtils;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.streaming.StreamConsumer;
+import org.jetbrains.annotations.Nullable;
+
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_OVERRIDE;
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_BUFFER_SIZE;
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_MILLIS_TO_SLEEP;
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_POOL_SIZE;
+import static org.apache.cassandra.spark.utils.Properties.DEFAULT_MAX_RETRIES;
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_MILLIS_TO_SLEEP;
+import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
+import static
org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;
public class SidecarCdcClient
{
- final Sidecar.ClientConfig config;
+ final ClientConfig config;
final SidecarClient sidecarClient;
final ICdcStats stats;
- public SidecarCdcClient(Sidecar.ClientConfig config,
+ public SidecarCdcClient(ClientConfig config,
SidecarClient sidecarClient,
ICdcStats stats)
{
@@ -150,4 +165,215 @@ public class SidecarCdcClient
}
};
}
+
+ public static final class ClientConfig
+ {
+ public static final String MAX_BUFFER_SIZE_BYTES_KEY =
"maxBufferSizeBytes";
+ public static final String CHUNK_BUFFER_SIZE_BYTES_KEY =
"chunkBufferSizeBytes";
+ public static final String DEFAULT_CASSANDRA_ROLE = null;
+
+ private final int userProvidedPort;
+ private final int maxRetries;
+ private final int maxPoolSize;
+ private final int timeoutSeconds;
+ private final long millisToSleep;
+ private final long maxMillisToSleep;
+ private final long maxBufferSize;
+ private final long chunkSize;
+ private final String cassandraRole;
+ private final Map<FileType, Long> maxBufferOverride;
+ private final Map<FileType, Long> chunkBufferOverride;
+
+ // CHECKSTYLE IGNORE: Constructor with many parameters
+ private ClientConfig(int userProvidedPort,
+ int maxRetries,
+ long millisToSleep,
+ long maxMillisToSleep,
+ long maxBufferSize,
+ long chunkSize,
+ int maxPoolSize,
+ int timeoutSeconds,
+ String cassandraRole,
+ Map<FileType, Long> maxBufferOverride,
+ Map<FileType, Long> chunkBufferOverride)
+ {
+ this.userProvidedPort = userProvidedPort;
+ this.maxRetries = maxRetries;
+ this.millisToSleep = millisToSleep;
+ this.maxMillisToSleep = maxMillisToSleep;
+ this.maxBufferSize = maxBufferSize;
+ this.chunkSize = chunkSize;
+ this.maxPoolSize = maxPoolSize;
+ this.timeoutSeconds = timeoutSeconds;
+ this.cassandraRole = cassandraRole;
+ this.maxBufferOverride = maxBufferOverride;
+ this.chunkBufferOverride = chunkBufferOverride;
+ }
+
+ public int userProvidedPort()
+ {
+ return userProvidedPort;
+ }
+
+ public int effectivePort()
+ {
+ return userProvidedPort == -1 ? DEFAULT_SIDECAR_PORT :
userProvidedPort;
+ }
+
+ public int maxRetries()
+ {
+ return maxRetries;
+ }
+
+ public long millisToSleep()
+ {
+ return millisToSleep;
+ }
+
+ public long maxMillisToSleep()
+ {
+ return maxMillisToSleep;
+ }
+
+ public long maxBufferSize()
+ {
+ return maxBufferSize(FileType.COMMITLOG);
+ }
+
+ public long maxBufferSize(FileType fileType)
+ {
+ return maxBufferOverride.getOrDefault(fileType, maxBufferSize);
+ }
+
+ public Map<FileType, Long> maxBufferOverride()
+ {
+ return maxBufferOverride;
+ }
+
+ public long chunkBufferSize()
+ {
+ return chunkBufferSize(FileType.COMMITLOG);
+ }
+
+ public long chunkBufferSize(FileType fileType)
+ {
+ return chunkBufferOverride.getOrDefault(fileType, chunkSize);
+ }
+
+ public Map<FileType, Long> chunkBufferOverride()
+ {
+ return chunkBufferOverride;
+ }
+
+ public int maxPoolSize()
+ {
+ return maxPoolSize;
+ }
+
+ public int timeoutSeconds()
+ {
+ return timeoutSeconds;
+ }
+
+ @Nullable
+ public String cassandraRole()
+ {
+ return cassandraRole;
+ }
+
+ public static ClientConfig create()
+ {
+ return ClientConfig.create(-1, DEFAULT_MAX_RETRIES,
DEFAULT_MILLIS_TO_SLEEP);
+ }
+
+ public static ClientConfig create(int userProvidedPort, int
effectivePort)
+ {
+ return ClientConfig.create(userProvidedPort, DEFAULT_MAX_RETRIES,
DEFAULT_MILLIS_TO_SLEEP);
+ }
+
+ public static ClientConfig create(int userProvidedPort, int
maxRetries, long millisToSleep)
+ {
+ Map<FileType, Long> chunkOverride = new HashMap<>();
+ chunkOverride.put(FileType.COMMITLOG, 4 * 1024 * 1024L); // 4MB
chunks
+
+ return ClientConfig.create(userProvidedPort,
+ maxRetries,
+ millisToSleep,
+ DEFAULT_MAX_MILLIS_TO_SLEEP,
+ DEFAULT_MAX_BUFFER_SIZE,
+ 4 * 1024 * 1024L,
+ DEFAULT_MAX_POOL_SIZE,
+ DEFAULT_TIMEOUT_SECONDS,
+ DEFAULT_CASSANDRA_ROLE,
+ DEFAULT_MAX_BUFFER_OVERRIDE,
+ chunkOverride);
+ }
+
+ public static Map<FileType, Long> buildMaxBufferOverride(Map<String,
String> options,
+ Map<FileType,
Long> defaultValue)
+ {
+ return buildOverrideMap(MAX_BUFFER_SIZE_BYTES_KEY, options,
defaultValue);
+ }
+
+ public static Map<FileType, Long> buildChunkBufferOverride(Map<String,
String> options,
+
Map<FileType, Long> defaultValue)
+ {
+ return buildOverrideMap(CHUNK_BUFFER_SIZE_BYTES_KEY, options,
defaultValue);
+ }
+
+ private static Map<FileType, Long> buildOverrideMap(String keyPrefix,
+ Map<String,
String> options,
+ Map<FileType,
Long> defaultValue)
+ {
+ Map<FileType, Long> result = new HashMap<>(defaultValue);
+ for (FileType type : FileType.values())
+ {
+ // Override with DataSourceOptions if set, e.g.
maxBufferSizeBytes_Index.db
+ String key = MapUtils.lowerCaseKey(String.format("%s_%s",
keyPrefix, type.getFileSuffix()));
+
Optional.ofNullable(options.get(key)).map(Long::parseLong).ifPresent(s ->
result.put(type, s));
+ }
+ return result;
+ }
+
+ // CHECKSTYLE IGNORE: Method with many parameters
+ public static ClientConfig create(int userProvidedPort,
+ int maxRetries,
+ long millisToSleep,
+ long maxMillisToSleep,
+ long maxBufferSizeBytes,
+ long chunkSizeBytes,
+ int maxPoolSize,
+ int timeoutSeconds,
+ String cassandraRole,
+ Map<FileType, Long>
maxBufferOverride,
+ Map<FileType, Long>
chunkBufferOverride)
+ {
+ return new ClientConfig(userProvidedPort,
+ maxRetries,
+ millisToSleep,
+ maxMillisToSleep,
+ maxBufferSizeBytes,
+ chunkSizeBytes,
+ maxPoolSize,
+ timeoutSeconds,
+ cassandraRole,
+ maxBufferOverride,
+ chunkBufferOverride);
+ }
+
+ public Sidecar.ClientConfig toGenericSidecarConfig()
+ {
+ return Sidecar.ClientConfig.create(this.userProvidedPort,
+ this.maxRetries,
+ this.millisToSleep,
+ this.maxMillisToSleep,
+ DEFAULT_MAX_BUFFER_SIZE,
+ 4 * 1024 * 1024L,
+ this.maxPoolSize,
+ this.timeoutSeconds,
+ this.cassandraRole,
+ this.maxBufferOverride,
+ this.chunkBufferOverride);
+ }
+ }
}
diff --git
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
index b20ca3e4..30662e29 100644
---
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
+++
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcCommitLogSegment.java
@@ -25,7 +25,6 @@ import
o.a.c.sidecar.client.shaded.common.response.data.CdcSegmentInfo;
import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
import org.apache.cassandra.cdc.api.CommitLog;
import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.clients.Sidecar;
import org.apache.cassandra.spark.data.FileType;
import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
@@ -46,7 +45,7 @@ public class SidecarCdcCommitLogSegment implements CommitLog
public SidecarCdcCommitLogSegment(SidecarCdcClient sidecar,
CassandraInstance instance,
CdcSegmentInfo segment,
- Sidecar.ClientConfig clientConfig)
+ SidecarCdcClient.ClientConfig
clientConfig)
{
this.instance = instance;
this.segment = segment;
diff --git
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
new file mode 100644
index 00000000..3ad306d8
--- /dev/null
+++
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.cdc.sidecar;
+
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.client.SidecarClient;
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for SidecarCdc class
+ */
+public class SidecarCdcTest
+{
+ @Test
+ public void testBuilderMethodCreatesValidBuilder()
+ {
+ String jobId = "test-job-123";
+ int partitionId = 0;
+ CdcOptions cdcOptions = mock(CdcOptions.class);
+ ClusterConfigProvider clusterConfigProvider =
mock(ClusterConfigProvider.class);
+ EventConsumer eventConsumer = mock(EventConsumer.class);
+ SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
+ TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
+ SidecarCdcClient.ClientConfig clientConfig =
SidecarCdcClient.ClientConfig.create();
+ SidecarClient mockSidecarClient = mock(SidecarClient.class);
+ ICdcStats cdcStats = mock(ICdcStats.class);
+
+ SidecarCdcBuilder builder = new SidecarCdcBuilder(
+ jobId,
+ partitionId,
+ cdcOptions,
+ clusterConfigProvider,
+ eventConsumer,
+ schemaSupplier,
+ tokenRangeSupplier,
+ clientConfig,
+ mockSidecarClient,
+ cdcStats
+ );
+
+ // Verify the builder is properly created and configured
+ assertThat(builder).isNotNull();
+ assertThat(builder).isInstanceOf(SidecarCdcBuilder.class);
+
+ // Verify the builder has the cluster config provider set
+
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);
+
+ // Verify the builder has a sidecar CDC client configured
+ assertThat(builder.sidecarCdcClient).isNotNull();
+
assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient);
+ assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig);
+ assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]