This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f9e2f1b219 Properly cancel in-flight futures and reject requests in EpochAwareDebounce during shutdown f9e2f1b219 is described below commit f9e2f1b219c0b570eb59d529da42722178608572 Author: Caleb Rackliffe <crackli...@apple.com> AuthorDate: Thu Oct 17 14:37:36 2024 -0500 Properly cancel in-flight futures and reject requests in EpochAwareDebounce during shutdown patch by Caleb Rackliffe; reviewed by David Capwell and Sam Tunnicliffe for CASSANDRA-19848 Co-authored-by: Caleb Rackliffe <calebrackli...@gmail.com> Co-authored-by: Sam Tunnicliffe <s...@apache.org> --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 +++ .../org/apache/cassandra/metrics/TCMMetrics.java | 7 ++++ .../apache/cassandra/tcm/EpochAwareDebounce.java | 41 ++++++++++++++++++---- 5 files changed, 48 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 23bb86b090..276969d4d7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Properly cancel in-flight futures and reject requests in EpochAwareDebounce during shutdown (CASSANDRA-19848) * Provide clearer exception message on failing commitlog_disk_access_mode combinations (CASSANDRA-19812) * Add total space used for a keyspace to nodetool tablestats (CASSANDRA-19671) * Ensure Relation#toRestriction() handles ReversedType properly (CASSANDRA-19950) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index a5d18386e1..119cfbfb7d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -177,6 +177,7 @@ public class Config public volatile DurationSpec.LongMillisecondsBound cms_await_timeout = new DurationSpec.LongMillisecondsBound("120000ms"); public volatile int cms_default_max_retries = 10; public volatile DurationSpec.IntMillisecondsBound cms_default_retry_backoff = new DurationSpec.IntMillisecondsBound("50ms"); + public volatile int epoch_aware_debounce_inflight_tracker_max_size = 100; /** * How often we should snapshot the cluster metadata. */ diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 4024d0ea0f..7699a809a4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -5261,6 +5261,11 @@ public class DatabaseDescriptor return conf.cms_await_timeout; } + public static int getEpochAwareDebounceInFlightTrackerMaxSize() + { + return conf.epoch_aware_debounce_inflight_tracker_max_size; + } + public static int getMetadataSnapshotFrequency() { return conf.metadata_snapshot_frequency; diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java b/src/java/org/apache/cassandra/metrics/TCMMetrics.java index 29858ead80..01061b725a 100644 --- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java @@ -27,6 +27,7 @@ import com.codahale.metrics.Timer; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.EpochAwareDebounce; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -64,6 +65,7 @@ public class TCMMetrics public final Meter progressBarrierCLRelax; public final Meter coordinatorBehindSchema; public final Meter coordinatorBehindPlacements; + public final Gauge<Long> epochAwareDebounceTrackerSize; private TCMMetrics() { @@ -98,6 +100,11 @@ public class TCMMetrics return metadata != null && needsReconfiguration(metadata) ? 1 : 0; }); + epochAwareDebounceTrackerSize = Metrics.register(factory.createMetricName("EpochAwareDebounceTrackerEntries"), () -> { + // don't replace with a method reference because tests may access metrics before EAD is initialized + return EpochAwareDebounce.instance.inflightTrackerSize(); + }); + fetchedPeerLogEntries = Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false); fetchPeerLogLatency = Metrics.timer(factory.createMetricName("FetchPeerLogLatency")); fetchedCMSLogEntries = Metrics.histogram(factory.createMetricName("FetchedCMSLogEntries"), false); diff --git a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java index 25456dbbce..d4d0e111b3 100644 --- a/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java +++ b/src/java/org/apache/cassandra/tcm/EpochAwareDebounce.java @@ -21,8 +21,15 @@ package org.apache.cassandra.tcm; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.cassandra.config.DatabaseDescriptor.getCmsAwaitTimeout; +import static org.apache.cassandra.config.DatabaseDescriptor.getEpochAwareDebounceInFlightTrackerMaxSize; /** * When debouncing from a replica we know exactly which epoch we need, so to avoid retries we @@ -35,6 +42,11 @@ public class EpochAwareDebounce implements Closeable public static final EpochAwareDebounce instance = new EpochAwareDebounce(); private final AtomicReference<EpochAwareFuture> currentFuture = new AtomicReference<>(); + private final Cache<Epoch, Future<ClusterMetadata>> inflight = Caffeine.newBuilder() + .maximumSize(getEpochAwareDebounceInFlightTrackerMaxSize()) + .expireAfterWrite(getCmsAwaitTimeout().to(MILLISECONDS), + MILLISECONDS) + .build(); private EpochAwareDebounce() { @@ -59,27 +71,42 @@ public class EpochAwareDebounce implements Closeable if (running == SENTINEL) continue; - if (running != null && !running.future.isDone() && running.epoch.isEqualOrAfter(epoch)) + // The inflight future is sufficient, or we are shutting down the debouncer + if ((running != null && !running.future.isDone() && running.epoch.isEqualOrAfter(epoch)) || running == CLOSED) return running.future; if (currentFuture.compareAndSet(running, SENTINEL)) { - EpochAwareFuture promise = new EpochAwareFuture(epoch, fetchFunction.get()); - boolean res = currentFuture.compareAndSet(SENTINEL, promise); - assert res : "Should not have happened"; - return promise.future; + Future<ClusterMetadata> cmFuture = fetchFunction.get(); + cmFuture.addListener(() -> inflight.asMap().remove(epoch)); + EpochAwareFuture promise = new EpochAwareFuture(epoch, cmFuture); + EpochAwareFuture current = currentFuture.compareAndExchange(SENTINEL, promise); + // we have to explicitly check here as close() unconditionally sets currentFuture + if (current == CLOSED) + return CLOSED.future; + + inflight.put(epoch, cmFuture); + return cmFuture; } } } private static final EpochAwareFuture SENTINEL = new EpochAwareFuture(Epoch.EMPTY, null); + private static final EpochAwareFuture CLOSED = new EpochAwareFuture(Epoch.EMPTY, ImmediateFuture.cancelled()); @Override public void close() { - EpochAwareFuture future = currentFuture.get(); - if (future != null && future != SENTINEL) + EpochAwareFuture future = currentFuture.getAndSet(CLOSED); + if (future != null && future != SENTINEL && future != CLOSED) future.future.cancel(true); + for (Future<ClusterMetadata> toCancel : inflight.asMap().values()) + toCancel.cancel(true); + } + + public long inflightTrackerSize() + { + return inflight.estimatedSize(); } private static class EpochAwareFuture --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org