This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 42ffdef4 CASSSIDECAR-236: Fixing incorrect creation of 
MinuteBoundConfiguration & MillisecondBoundConfiguration in CdcConfigImpl (#213)
42ffdef4 is described below

commit 42ffdef49656c58f800c2526e16a3a5ed37f45d9
Author: Jyothsna konisa <jkon...@apple.com>
AuthorDate: Wed Apr 9 15:32:12 2025 -0700

    CASSSIDECAR-236: Fixing incorrect creation of MinuteBoundConfiguration & 
MillisecondBoundConfiguration in CdcConfigImpl (#213)
    
    Patch by Jyothsna Konisa; Reviewed by Bernardo Botella, Yifan Cai for 
CASSSIDECAR-236
---
 CHANGES.txt                                                 |  1 +
 .../java/org/apache/cassandra/sidecar/cdc/CdcConfig.java    |  4 ++--
 .../org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java     |  8 ++++----
 .../org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java | 13 ++++++++++++-
 4 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 233e305f..ff2fab10 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.2.0
 -----
+ * Fixing incorrect creation of MinuteBoundConfiguration & 
MillisecondBoundConfiguration in CdcConfigImpl (CASSSIDECAR-236)
  * Improvements to CDCConfig classes (CASSSIDECAR-235)
  * Hot Reload client and server SSL certificates (CASSSIDECAR-228)
  * Enhance the Cluster Lease Claim task feature (CASSSIDECAR-232)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
index 85d469ce..776d494b 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.sidecar.cdc;
 
 import java.util.Map;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import 
org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -79,7 +79,7 @@ public interface CdcConfig
     /**
      * @return watermark window
      */
-    MinuteBoundConfiguration watermarkWindow();
+    SecondBoundConfiguration watermarkWindow();
 
     /**
      * @return max Kafka record size in bytes. If value is non-negative then 
the KafkaPublisher will chunk larger records into multiple messages.
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
index 51b6dbaa..aabfabe8 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java
@@ -36,7 +36,7 @@ import io.vertx.core.Promise;
 import org.apache.cassandra.sidecar.common.server.ThrowingRunnable;
 import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
-import 
org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.config.CdcConfiguration;
 import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -165,7 +165,7 @@ public class CdcConfigImpl implements CdcConfig
     @Override
     public MillisecondBoundConfiguration persistDelay()
     {
-        return new 
MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName,
 1000), TimeUnit.SECONDS);
+        return new 
MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName,
 1000), TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -175,11 +175,11 @@ public class CdcConfigImpl implements CdcConfig
     }
 
     @Override
-    public MinuteBoundConfiguration watermarkWindow()
+    public SecondBoundConfiguration watermarkWindow()
     {
         // this prop sets the maximum duration age accepted by CDC, any 
mutations with write timestamps older than
         // the watermark window will be dropped with log message "Exclude the 
update due to out of the allowed time window."
-        return new 
MinuteBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, 
DEFAULT_WATERMARK_WINDOW), TimeUnit.SECONDS);
+        return new 
SecondBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, 
DEFAULT_WATERMARK_WINDOW), TimeUnit.SECONDS);
     }
 
     @Override
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java
index 0edb68fc..63f44c89 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.cdc;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -28,6 +29,7 @@ import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.TestResourceReaper;
 import org.apache.cassandra.sidecar.common.server.ThrowingRunnable;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.CdcConfiguration;
 import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
@@ -118,6 +120,8 @@ class CdcConfigImplTest
         assertThat(cdcConfig.env()).isEqualTo("");
         assertThat(cdcConfig.kafkaTopic()).isNull();
         assertThat(cdcConfig.logOnly()).isFalse();
+        assertThat(cdcConfig.watermarkWindow()).isEqualTo(new 
SecondBoundConfiguration(72, TimeUnit.HOURS));
+        assertThat(cdcConfig.persistDelay()).isEqualTo(new 
MillisecondBoundConfiguration(1, TimeUnit.SECONDS));
     }
 
     @Test
@@ -126,7 +130,12 @@ class CdcConfigImplTest
         CdcConfigAccessor cdcConfigAccessor = mockCdcConfigAccessor();
         KafkaConfigAccessor kafkaConfigAccessor = mockKafkaConfigAccessor();
         when(cdcConfigAccessor.getConfig().getConfigs())
-                .thenReturn(Map.of("datacenter", "DC1", "env", "if", 
"log_only", "false", "topic", "topic1"));
+                .thenReturn(Map.of("datacenter", "DC1",
+                                   "env", "if",
+                                   "log_only", "false",
+                                   "topic", "topic1",
+                                   "watermark_seconds", "120",
+                                   "persist_delay_millis", "5000"));
         when(kafkaConfigAccessor.getConfig().getConfigs())
                 .thenReturn(Map.of("k1", "v1"));
 
@@ -137,6 +146,8 @@ class CdcConfigImplTest
         assertThat(cdcConfig.env()).isEqualTo("if");
         assertThat(cdcConfig.kafkaTopic()).isEqualTo("topic1");
         assertThat(cdcConfig.logOnly()).isFalse();
+        assertThat(cdcConfig.watermarkWindow()).isEqualTo(new 
SecondBoundConfiguration(2, TimeUnit.MINUTES));
+        assertThat(cdcConfig.persistDelay()).isEqualTo(new 
MillisecondBoundConfiguration(5, TimeUnit.SECONDS));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to