This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 856db2afa03c23e40f739c122d09903c1216c1a3 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Nov 7 17:29:06 2025 +0000 Retry without time limit calculates wait time correctly Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-21002 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/tcm/Processor.java | 36 +++---- src/java/org/apache/cassandra/tcm/Retry.java | 2 +- test/unit/org/apache/cassandra/tcm/RetryTest.java | 111 ++++++++++++++++++++++ 4 files changed, 126 insertions(+), 24 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 75983caa80..bf947bc7e2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Retry without time limit calculates wait time incorrectly (CASSANDRA-21002) * Don't submit AlterSchemaStatements which produce no effect locally to the CMS (CASSANDRA-21001) * Avoid iterating all prepared statements when getting PreparedStatementsCacheSize metric (CASSANDRA-21038) * Reduce performance impact of TableMetadataRef.get and KeyspaceMetadataRef.get (CASSANDRA-20465) diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index e4e99aa08e..f80277b983 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -18,17 +18,17 @@ package org.apache.cassandra.tcm; -import java.util.concurrent.TimeUnit; - import com.codahale.metrics.Meter; - +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.metrics.TCMMetrics; +import org.apache.cassandra.service.RetryStrategy; +import org.apache.cassandra.service.TimeoutStrategy; import org.apache.cassandra.service.WaitStrategy; import org.apache.cassandra.tcm.log.Entry; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; -import static org.apache.cassandra.utils.Clock.Global.nanoTime; public interface Processor { @@ -51,28 +51,18 @@ public interface Processor } /** - * Since we are using message expiration for communicating timeouts to CMS nodes, we have to be careful not - * to overflow the long, since messaging is using only 32 bits for deadlines. To achieve that, we are - * giving `timeoutNanos` every time we retry, but will retry indefinitely. + * To be used only when submitting a STARTUP transformation when a node is restarted with a new set of addresses or + * running a new release version. */ - private static Retry unsafeRetryIndefinitely() + static Retry unsafeRetryIndefinitely() { - long timeoutNanos = getCmsAwaitTimeout().to(NANOSECONDS); Meter retryMeter = TCMMetrics.instance.commitRetries; - return Retry.withNoTimeLimit(retryMeter, new WaitStrategy() - { - @Override - public long computeWaitUntil(int attempts) - { - return nanoTime() + timeoutNanos; - } - - @Override - public long computeWait(int attempts, TimeUnit units) - { - return units.convert(timeoutNanos, NANOSECONDS); - } - }); + DurationSpec.IntMillisecondsBound defaultBackoff = DatabaseDescriptor.getDefaultRetryBackoff(); + DurationSpec.IntMillisecondsBound defaultMaxBackoff = DatabaseDescriptor.getDefaultMaxRetryBackoff(); + String spec = (defaultBackoff == null ? "100ms" : defaultBackoff.toMilliseconds() + "ms") + + "*attempts <=" + (defaultMaxBackoff == null ? "10s" : defaultMaxBackoff.toMilliseconds() + "ms"); + WaitStrategy wait = RetryStrategy.parse(spec, TimeoutStrategy.LatencySourceFactory.none()); + return Retry.withNoTimeLimit(retryMeter, wait); } Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy); diff --git a/src/java/org/apache/cassandra/tcm/Retry.java b/src/java/org/apache/cassandra/tcm/Retry.java index a66f64e113..ca294aaee0 100644 --- a/src/java/org/apache/cassandra/tcm/Retry.java +++ b/src/java/org/apache/cassandra/tcm/Retry.java @@ -104,7 +104,7 @@ public class Retry implements WaitStrategy return -1; if (deadlineNanos == Long.MAX_VALUE) - return wait; + return units.convert(wait, TimeUnit.NANOSECONDS); long now = nanoTime(); wait = Math.min(deadlineNanos - now, wait); diff --git a/test/unit/org/apache/cassandra/tcm/RetryTest.java b/test/unit/org/apache/cassandra/tcm/RetryTest.java new file mode 100644 index 0000000000..0c2f52c830 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/RetryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Meter; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.service.WaitStrategy; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.transformations.Startup; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.cassandra.tcm.membership.MembershipUtils.node; +import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; +import static org.junit.Assert.assertEquals; + +public class RetryTest +{ + private static final Logger logger = LoggerFactory.getLogger(RetryTest.class); + private Random random; + + @BeforeClass + public static void setupClass() + { + DatabaseDescriptor.daemonInitialization(); + } + + @Before + public void setup() + { + long seed = System.nanoTime(); + logger.info("Running test with seed {}", seed); + random = new Random(seed); + } + + @Test + public void testRetryWithNoTimeLimitObservesTimeUnit() + { + Meter meter = new Meter(); + final long waitTimeNanos = Math.abs(random.nextLong()); + WaitStrategy fixed = new WaitStrategy() + { + @Override + public long computeWaitUntil(int attempts) {throw new UnsupportedOperationException();} + + @Override + public long computeWait(int attempts, TimeUnit units) + { + assertEquals(NANOSECONDS, units); + return waitTimeNanos; + } + }; + + Retry retry = Retry.withNoTimeLimit(meter, fixed); + long waitTimeMillis = retry.computeWait(1, MILLISECONDS); + assertEquals(MILLISECONDS.convert(waitTimeNanos, NANOSECONDS), waitTimeMillis); + } + + + @Test + public void testProcessorIndefiniteRetryBehaviour() + { + new Processor() + { + @Override + public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry retryPolicy) {return null;} + + @Override + public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown, Retry retryPolicy) + { + // Assert the properties of the Retry provided by the private static Processor::unsafeRetryIndefinitely + for (int i = 1; i<1000;i++) + { + // backoff increases in 100ms steps, up to a max of 10000ms + long waitTime = retryPolicy.computeWait(i, MILLISECONDS); + assertEquals(Math.min((i+1) * 100, 10000), waitTime); + } + // Retry indefinitely means no explicit deadline is set + assertEquals(Long.MAX_VALUE, retryPolicy.deadlineNanos); + return null; + } + }.commit(new Entry.Id(0L), new Startup(node(random), nodeAddresses(random), NodeVersion.CURRENT), Epoch.EMPTY); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
