This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9e2f1b219 Properly cancel in-flight futures and reject requests in 
EpochAwareDebounce during shutdown
f9e2f1b219 is described below

commit f9e2f1b219c0b570eb59d529da42722178608572
Author: Caleb Rackliffe <crackli...@apple.com>
AuthorDate: Thu Oct 17 14:37:36 2024 -0500

    Properly cancel in-flight futures and reject requests in EpochAwareDebounce 
during shutdown
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Sam Tunnicliffe for 
CASSANDRA-19848
    
    Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com>
    Co-authored-by: Sam Tunnicliffe <s...@apache.org>
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/config/Config.java   |  1 +
 .../cassandra/config/DatabaseDescriptor.java       |  5 +++
 .../org/apache/cassandra/metrics/TCMMetrics.java   |  7 ++++
 .../apache/cassandra/tcm/EpochAwareDebounce.java   | 41 ++++++++++++++++++----
 5 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 23bb86b090..276969d4d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Properly cancel in-flight futures and reject requests in EpochAwareDebounce 
during shutdown (CASSANDRA-19848)
  * Provide clearer exception message on failing commitlog_disk_access_mode 
combinations (CASSANDRA-19812)
  * Add total space used for a keyspace to nodetool tablestats (CASSANDRA-19671)
  * Ensure Relation#toRestriction() handles ReversedType properly 
(CASSANDRA-19950)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index a5d18386e1..119cfbfb7d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -177,6 +177,7 @@ public class Config
     public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new 
DurationSpec.LongMillisecondsBound("120000ms");
     public volatile int cms_default_max_retries = 10;
     public volatile DurationSpec.IntMillisecondsBound 
cms_default_retry_backoff = new DurationSpec.IntMillisecondsBound("50ms");
+    public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100;
     /**
      * How often we should snapshot the cluster metadata.
      */
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4024d0ea0f..7699a809a4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5261,6 +5261,11 @@ public class DatabaseDescriptor
         return conf.cms_await_timeout;
     }
 
+    public static int getEpochAwareDebounceInFlightTrackerMaxSize()
+    {
+        return conf.epoch_aware_debounce_inflight_tracker_max_size;
+    }
+
     public static int getMetadataSnapshotFrequency()
     {
         return conf.metadata_snapshot_frequency;
diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java 
b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
index 29858ead80..01061b725a 100644
--- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
@@ -27,6 +27,7 @@ import com.codahale.metrics.Timer;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.EpochAwareDebounce;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -64,6 +65,7 @@ public class TCMMetrics
     public final Meter progressBarrierCLRelax;
     public final Meter coordinatorBehindSchema;
     public final Meter coordinatorBehindPlacements;
+    public final Gauge<Long> epochAwareDebounceTrackerSize;
 
     private TCMMetrics()
     {
@@ -98,6 +100,11 @@ public class TCMMetrics
             return metadata != null && needsReconfiguration(metadata) ? 1 : 0;
         });
 
+        epochAwareDebounceTrackerSize = 
Metrics.register(factory.createMetricName("EpochAwareDebounceTrackerEntries"), 
() -> {
+            // don't replace with a method reference because tests may access 
metrics before EAD is initialized
+            return EpochAwareDebounce.instance.inflightTrackerSize();
+        });
+
         fetchedPeerLogEntries = 
Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false);
         fetchPeerLogLatency = 
Metrics.timer(factory.createMetricName("FetchPeerLogLatency"));
         fetchedCMSLogEntries = 
Metrics.histogram(factory.createMetricName("FetchedCMSLogEntries"), false);
diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java 
b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
index 25456dbbce..d4d0e111b3 100644
--- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
+++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java
@@ -21,8 +21,15 @@ package org.apache.cassandra.tcm;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getEpochAwareDebounceInFlightTrackerMaxSize;
 
 /**
  * When debouncing from a replica we know exactly which epoch we need, so to 
avoid retries we
@@ -35,6 +42,11 @@ public class EpochAwareDebounce implements Closeable
     public static final EpochAwareDebounce instance = new EpochAwareDebounce();
 
     private final AtomicReference<EpochAwareFuture> currentFuture = new 
AtomicReference<>();
+    private final Cache<Epoch, Future<ClusterMetadata>> inflight = 
Caffeine.newBuilder()
+                                                                           
.maximumSize(getEpochAwareDebounceInFlightTrackerMaxSize())
+                                                                           
.expireAfterWrite(getCmsAwaitTimeout().to(MILLISECONDS),
+                                                                               
              MILLISECONDS)
+                                                                           
.build();
 
     private EpochAwareDebounce()
     {
@@ -59,27 +71,42 @@ public class EpochAwareDebounce implements Closeable
             if (running == SENTINEL)
                 continue;
 
-            if (running != null && !running.future.isDone() && 
running.epoch.isEqualOrAfter(epoch))
+            // The inflight future is sufficient, or we are shutting down the 
debouncer
+            if ((running != null && !running.future.isDone() && 
running.epoch.isEqualOrAfter(epoch)) || running == CLOSED)
                 return running.future;
 
             if (currentFuture.compareAndSet(running, SENTINEL))
             {
-                EpochAwareFuture promise = new EpochAwareFuture(epoch, 
fetchFunction.get());
-                boolean res = currentFuture.compareAndSet(SENTINEL, promise);
-                assert res : "Should not have happened";
-                return promise.future;
+                Future<ClusterMetadata> cmFuture = fetchFunction.get();
+                cmFuture.addListener(() -> inflight.asMap().remove(epoch));
+                EpochAwareFuture promise = new EpochAwareFuture(epoch, 
cmFuture);
+                EpochAwareFuture current = 
currentFuture.compareAndExchange(SENTINEL, promise);
+                // we have to explicitly check here as close() unconditionally 
sets currentFuture
+                if (current == CLOSED)
+                    return CLOSED.future;
+
+                inflight.put(epoch, cmFuture);
+                return cmFuture;
             }
         }
     }
 
     private static final EpochAwareFuture SENTINEL = new 
EpochAwareFuture(Epoch.EMPTY, null);
+    private static final EpochAwareFuture CLOSED = new 
EpochAwareFuture(Epoch.EMPTY, ImmediateFuture.cancelled());
 
     @Override
     public void close()
     {
-        EpochAwareFuture future = currentFuture.get();
-        if (future != null && future != SENTINEL)
+        EpochAwareFuture future = currentFuture.getAndSet(CLOSED);
+        if (future != null && future != SENTINEL && future != CLOSED)
             future.future.cancel(true);
+        for (Future<ClusterMetadata> toCancel : inflight.asMap().values())
+            toCancel.cancel(true);
+    }
+
+    public long inflightTrackerSize()
+    {
+        return inflight.estimatedSize();
     }
 
     private static class EpochAwareFuture


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to