This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 856db2afa03c23e40f739c122d09903c1216c1a3
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Nov 7 17:29:06 2025 +0000

    Retry without time limit calculates wait time correctly
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21002
---
 CHANGES.txt                                       |   1 +
 src/java/org/apache/cassandra/tcm/Processor.java  |  36 +++----
 src/java/org/apache/cassandra/tcm/Retry.java      |   2 +-
 test/unit/org/apache/cassandra/tcm/RetryTest.java | 111 ++++++++++++++++++++++
 4 files changed, 126 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 75983caa80..bf947bc7e2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Retry without time limit calculates wait time incorrectly (CASSANDRA-21002)
  * Don't submit AlterSchemaStatements which produce no effect locally to the 
CMS (CASSANDRA-21001)
  * Avoid iterating all prepared statements when getting 
PreparedStatementsCacheSize metric (CASSANDRA-21038)
  * Reduce performance impact of TableMetadataRef.get and 
KeyspaceMetadataRef.get (CASSANDRA-20465)
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
index e4e99aa08e..f80277b983 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -18,17 +18,17 @@
 
 package org.apache.cassandra.tcm;
 
-import java.util.concurrent.TimeUnit;
-
 import com.codahale.metrics.Meter;
-
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.service.RetryStrategy;
+import org.apache.cassandra.service.TimeoutStrategy;
 import org.apache.cassandra.service.WaitStrategy;
 import org.apache.cassandra.tcm.log.Entry;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public interface Processor
 {
@@ -51,28 +51,18 @@ public interface Processor
     }
 
     /**
-     * Since we are using message expiration for communicating timeouts to CMS 
nodes, we have to be careful not
-     * to overflow the long, since messaging is using only 32 bits for 
deadlines. To achieve that, we are
-     * giving `timeoutNanos` every time we retry, but will retry indefinitely.
+     * To be used only when submitting a STARTUP transformation when a node is 
restarted with a new set of addresses or
+     * running a new release version.
      */
-    private static Retry unsafeRetryIndefinitely()
+    static Retry unsafeRetryIndefinitely()
     {
-        long timeoutNanos = getCmsAwaitTimeout().to(NANOSECONDS);
         Meter retryMeter = TCMMetrics.instance.commitRetries;
-        return Retry.withNoTimeLimit(retryMeter, new WaitStrategy()
-        {
-            @Override
-            public long computeWaitUntil(int attempts)
-            {
-                return nanoTime() + timeoutNanos;
-            }
-
-            @Override
-            public long computeWait(int attempts, TimeUnit units)
-            {
-                return units.convert(timeoutNanos, NANOSECONDS);
-            }
-        });
+        DurationSpec.IntMillisecondsBound defaultBackoff = 
DatabaseDescriptor.getDefaultRetryBackoff();
+        DurationSpec.IntMillisecondsBound defaultMaxBackoff = 
DatabaseDescriptor.getDefaultMaxRetryBackoff();
+        String spec = (defaultBackoff == null ? "100ms" : 
defaultBackoff.toMilliseconds() + "ms")
+                      + "*attempts <=" + (defaultMaxBackoff == null ? "10s" : 
defaultMaxBackoff.toMilliseconds() + "ms");
+        WaitStrategy wait = RetryStrategy.parse(spec, 
TimeoutStrategy.LatencySourceFactory.none());
+        return Retry.withNoTimeLimit(retryMeter, wait);
     }
 
     Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch 
lastKnown, Retry retryPolicy);
diff --git a/src/java/org/apache/cassandra/tcm/Retry.java 
b/src/java/org/apache/cassandra/tcm/Retry.java
index a66f64e113..ca294aaee0 100644
--- a/src/java/org/apache/cassandra/tcm/Retry.java
+++ b/src/java/org/apache/cassandra/tcm/Retry.java
@@ -104,7 +104,7 @@ public class Retry implements WaitStrategy
             return -1;
 
         if (deadlineNanos == Long.MAX_VALUE)
-            return wait;
+            return units.convert(wait, TimeUnit.NANOSECONDS);
 
         long now = nanoTime();
         wait = Math.min(deadlineNanos - now, wait);
diff --git a/test/unit/org/apache/cassandra/tcm/RetryTest.java 
b/test/unit/org/apache/cassandra/tcm/RetryTest.java
new file mode 100644
index 0000000000..0c2f52c830
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/RetryTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.WaitStrategy;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.transformations.Startup;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.cassandra.tcm.membership.MembershipUtils.node;
+import static 
org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses;
+import static org.junit.Assert.assertEquals;
+
+public class RetryTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(RetryTest.class);
+    private Random random;
+
+    @BeforeClass
+    public static void setupClass()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @Before
+    public void setup()
+    {
+        long seed = System.nanoTime();
+        logger.info("Running test with seed {}", seed);
+        random = new Random(seed);
+    }
+
+    @Test
+    public void testRetryWithNoTimeLimitObservesTimeUnit()
+    {
+        Meter meter = new Meter();
+        final long waitTimeNanos = Math.abs(random.nextLong());
+        WaitStrategy fixed = new WaitStrategy()
+        {
+            @Override
+            public long computeWaitUntil(int attempts) {throw new 
UnsupportedOperationException();}
+
+            @Override
+            public long computeWait(int attempts, TimeUnit units)
+            {
+                assertEquals(NANOSECONDS, units);
+                return waitTimeNanos;
+            }
+        };
+
+        Retry retry = Retry.withNoTimeLimit(meter, fixed);
+        long waitTimeMillis = retry.computeWait(1, MILLISECONDS);
+        assertEquals(MILLISECONDS.convert(waitTimeNanos, NANOSECONDS), 
waitTimeMillis);
+    }
+
+
+    @Test
+    public void testProcessorIndefiniteRetryBehaviour()
+    {
+        new Processor()
+        {
+            @Override
+            public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry 
retryPolicy) {return null;}
+
+            @Override
+            public Commit.Result commit(Entry.Id entryId, Transformation 
transform, Epoch lastKnown, Retry retryPolicy)
+            {
+                // Assert the properties of the Retry provided by the private 
static Processor::unsafeRetryIndefinitely
+                for (int i = 1; i<1000;i++)
+                {
+                    // backoff increases in 100ms steps, up to a max of 10000ms
+                    long waitTime = retryPolicy.computeWait(i, MILLISECONDS);
+                    assertEquals(Math.min((i+1) * 100, 10000), waitTime);
+                }
+                // Retry indefinitely means no explicit deadline is set
+                assertEquals(Long.MAX_VALUE, retryPolicy.deadlineNanos);
+                return null;
+            }
+        }.commit(new Entry.Id(0L), new Startup(node(random), 
nodeAddresses(random), NodeVersion.CURRENT), Epoch.EMPTY);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to