This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new 42ffdef4 CASSSIDECAR-236: Fixing incorrect creation of MinuteBoundConfiguration & MillisecondBoundConfiguration in CdcConfigImpl (#213) 42ffdef4 is described below commit 42ffdef49656c58f800c2526e16a3a5ed37f45d9 Author: Jyothsna konisa <jkon...@apple.com> AuthorDate: Wed Apr 9 15:32:12 2025 -0700 CASSSIDECAR-236: Fixing incorrect creation of MinuteBoundConfiguration & MillisecondBoundConfiguration in CdcConfigImpl (#213) Patch by Jyothsna Konisa; Reviewed by Bernardo Botella, Yifan Cai for CASSSIDECAR-236 --- CHANGES.txt | 1 + .../java/org/apache/cassandra/sidecar/cdc/CdcConfig.java | 4 ++-- .../org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java | 8 ++++---- .../org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java | 13 ++++++++++++- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 233e305f..ff2fab10 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 0.2.0 ----- + * Fixing incorrect creation of MinuteBoundConfiguration & MillisecondBoundConfiguration in CdcConfigImpl (CASSSIDECAR-236) * Improvements to CDCConfig classes (CASSSIDECAR-235) * Hot Reload client and server SSL certificates (CASSSIDECAR-228) * Enhance the Cluster Lease Claim task feature (CASSSIDECAR-232) diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java index 85d469ce..776d494b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java @@ -19,7 +19,7 @@ package org.apache.cassandra.sidecar.cdc; import java.util.Map; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; -import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -79,7 +79,7 @@ public interface CdcConfig /** * @return watermark window */ - MinuteBoundConfiguration watermarkWindow(); + SecondBoundConfiguration watermarkWindow(); /** * @return max Kafka record size in bytes. If value is non-negative then the KafkaPublisher will chunk larger records into multiple messages. diff --git a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java index 51b6dbaa..aabfabe8 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java @@ -36,7 +36,7 @@ import io.vertx.core.Promise; import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; -import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; import org.apache.cassandra.sidecar.config.CdcConfiguration; import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; import org.apache.cassandra.sidecar.config.SidecarConfiguration; @@ -165,7 +165,7 @@ public class CdcConfigImpl implements CdcConfig @Override public MillisecondBoundConfiguration persistDelay() { - return new MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName, 1000), TimeUnit.SECONDS); + return new MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName, 1000), TimeUnit.MILLISECONDS); } @Override @@ -175,11 +175,11 @@ public class CdcConfigImpl implements CdcConfig } @Override - public MinuteBoundConfiguration watermarkWindow() + public SecondBoundConfiguration watermarkWindow() { // this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than // the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window." - return new MinuteBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, DEFAULT_WATERMARK_WINDOW), TimeUnit.SECONDS); + return new SecondBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, DEFAULT_WATERMARK_WINDOW), TimeUnit.SECONDS); } @Override diff --git a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java index 0edb68fc..63f44c89 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.cdc; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -28,6 +29,7 @@ import io.vertx.core.Vertx; import org.apache.cassandra.sidecar.TestResourceReaper; import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; import org.apache.cassandra.sidecar.concurrent.ExecutorPools; import org.apache.cassandra.sidecar.config.CdcConfiguration; import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; @@ -118,6 +120,8 @@ class CdcConfigImplTest assertThat(cdcConfig.env()).isEqualTo(""); assertThat(cdcConfig.kafkaTopic()).isNull(); assertThat(cdcConfig.logOnly()).isFalse(); + assertThat(cdcConfig.watermarkWindow()).isEqualTo(new SecondBoundConfiguration(72, TimeUnit.HOURS)); + assertThat(cdcConfig.persistDelay()).isEqualTo(new MillisecondBoundConfiguration(1, TimeUnit.SECONDS)); } @Test @@ -126,7 +130,12 @@ class CdcConfigImplTest CdcConfigAccessor cdcConfigAccessor = mockCdcConfigAccessor(); KafkaConfigAccessor kafkaConfigAccessor = mockKafkaConfigAccessor(); when(cdcConfigAccessor.getConfig().getConfigs()) - .thenReturn(Map.of("datacenter", "DC1", "env", "if", "log_only", "false", "topic", "topic1")); + .thenReturn(Map.of("datacenter", "DC1", + "env", "if", + "log_only", "false", + "topic", "topic1", + "watermark_seconds", "120", + "persist_delay_millis", "5000")); when(kafkaConfigAccessor.getConfig().getConfigs()) .thenReturn(Map.of("k1", "v1")); @@ -137,6 +146,8 @@ class CdcConfigImplTest assertThat(cdcConfig.env()).isEqualTo("if"); assertThat(cdcConfig.kafkaTopic()).isEqualTo("topic1"); assertThat(cdcConfig.logOnly()).isFalse(); + assertThat(cdcConfig.watermarkWindow()).isEqualTo(new SecondBoundConfiguration(2, TimeUnit.MINUTES)); + assertThat(cdcConfig.persistDelay()).isEqualTo(new MillisecondBoundConfiguration(5, TimeUnit.SECONDS)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org