This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4684a11ec Fix:  - DefaultLocalListeners.ComplexListeners iterator 
IndexOutOfBoundsException  - Race condition initialising empty ActiveEpochs, 
when minimum pending epoch can move backwards  - SyncPoints must be declared in 
an epoch containing the ranges, and PENDING_REMOVAL ranges will reject 
non-syncpoint transactions  - AccordExecutorMetrics is now registered on 
startup  - getRecentValues for non-cumulative histogram should not subtract 
prior values Improve:  - Report ephemera [...]
e4684a11ec is described below

commit e4684a11ec4fd52baaf0da141e24db6a92f63725
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Dec 9 11:46:38 2025 +0000

    Fix:
     - DefaultLocalListeners.ComplexListeners iterator IndexOutOfBoundsException
     - Race condition initialising empty ActiveEpochs, when minimum pending 
epoch can move backwards
     - SyncPoints must be declared in an epoch containing the ranges, and 
PENDING_REMOVAL ranges will reject non-syncpoint transactions
     - AccordExecutorMetrics is now registered on startup
     - getRecentValues for non-cumulative histogram should not subtract prior 
values
    Improve:
     - Report ephemeral read, epoch waits and timeout metrics
     - Remove Topologies.SelectNodeOwnership, as no need to SLICE anymore
     - Introduce SystemEventListener for epoch waiting and timeout metrics
     - No-op but log if gcBefore provided to CFK is in the past
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21076
---
 modules/accord                                     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java       |   2 +-
 .../cql3/statements/TransactionStatement.java      |   2 +-
 .../cassandra/db/virtual/AccordDebugKeyspace.java  |   2 +-
 .../cassandra/metrics/AccordCacheMetrics.java      |   2 +-
 .../metrics/AccordCoordinatorMetrics.java          |  10 +-
 .../cassandra/metrics/AccordExecutorMetrics.java   |   4 +
 .../cassandra/metrics/AccordSystemMetrics.java     |  46 ++++--
 .../metrics/CassandraMetricsRegistry.java          |  60 ++++++--
 .../apache/cassandra/metrics/LatencyMetrics.java   |   7 +-
 .../cassandra/metrics/OnDemandHistogram.java       |  10 +-
 .../cassandra/metrics/OverrideHistogram.java       |   7 +-
 .../apache/cassandra/metrics/OverrideMeter.java    |  29 ++++
 .../{ThreadLocalTimer.java => OverrideTimer.java}  |  33 +++--
 .../metrics/ShardedDecayingHistograms.java         |   6 +
 .../apache/cassandra/metrics/ShardedHistogram.java |  14 +-
 .../apache/cassandra/metrics/ThreadLocalTimer.java | 158 +--------------------
 src/java/org/apache/cassandra/metrics/Timer.java   |   5 +
 .../service/accord/AccordCommandStore.java         |   2 +-
 .../cassandra/service/accord/AccordKeyspace.java   |   2 +-
 .../cassandra/service/accord/AccordService.java    | 113 ++++++++++-----
 .../cassandra/service/accord/api/AccordAgent.java  |   7 +-
 .../accord/interop/AccordInteropAdapter.java       |  11 +-
 .../accord/interop/AccordInteropExecution.java     |   6 +-
 .../org/apache/cassandra/tcm/log/LocalLog.java     |   2 +-
 .../cassandra/distributed/shared/ClusterUtils.java |   3 +
 .../distributed/test/accord/AccordCQLTestBase.java |   3 +-
 .../test/accord/AccordSimpleFastPathTest.java      |   2 +-
 .../db/virtual/AccordVirtualTablesTest.java        |   2 +-
 .../cassandra/index/accord/RouteIndexTest.java     |   2 +-
 .../metrics/JmxVirtualTableMetricsTest.java        |   2 +-
 .../cassandra/service/accord/EpochSyncTest.java    |   4 +-
 32 files changed, 298 insertions(+), 262 deletions(-)

diff --git a/modules/accord b/modules/accord
index 8ccce74581..6bae51f6a4 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
+Subproject commit 6bae51f6a4bd560a82840fa0809cd1d630696cc0
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0c4eb44496..09100da8d3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5505,7 +5505,7 @@ public class DatabaseDescriptor
 
     public static boolean getAccordTransactionsEnabled()
     {
-        return conf == null ? false : conf.accord.enabled;
+        return conf != null && conf.accord.enabled;
     }
 
     public static void setAccordTransactionsEnabled(boolean b)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 2a96bcc16d..caff467cd7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -132,7 +132,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
     public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn 
produced no mutation, and its reads do not return to the caller; ignoring...";
     public static final String WRITE_TXN_EMPTY_WITH_NO_READS = "Write txn 
produced no mutation, and had no reads; ignoring...";
 
-    private static NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1, 
TimeUnit.MINUTES);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1, 
TimeUnit.MINUTES);
 
     static class NamedSelect
     {
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index de989d7f6e..c3f08de148 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -2186,7 +2186,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                 Map<TokenKey, List<ShardAndEpochs>> startLookup = null;
                 for (ActiveEpoch epoch : snapshot)
                 {
-                    Topology topology = epoch.global();
+                    Topology topology = epoch.all();
                     for (Shard shard : topology.shards())
                     {
                         Range range = shard.range;
diff --git a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
index 21f1297584..3df852af9d 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
@@ -90,7 +90,7 @@ public class AccordCacheMetrics
     public AccordCacheMetrics(String subTypeName)
     {
         DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE, 
subTypeName);
-        this.objectSize = 
Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
+        this.objectSize = 
Metrics.shardedHistogram(factory.createMetricName("EntrySize"), false);
         this.hits = Metrics.gauge(factory.createMetricName("Hits"), 
hitRate::totalHits);
         this.misses = Metrics.gauge(factory.createMetricName("Misses"), 
hitRate::totalMisses);
         this.requests = Metrics.gauge(factory.createMetricName("Requests"), 
hitRate::totalRequests);
diff --git 
a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
index 1500517bcb..12c736614f 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
@@ -52,6 +52,7 @@ public class AccordCoordinatorMetrics
     public static final String COORDINATOR_PREACCEPT_LATENCY = 
"PreAcceptLatency";
     public static final String COORDINATOR_EXECUTE_LATENCY = "ExecuteLatency";
     public static final String COORDINATOR_APPLY_LATENCY = "ApplyLatency";
+    public static final String EPHEMERAL = "Ephemeral";
     public static final String FAST_PATHS = "FastPaths";
     public static final String MEDIUM_PATHS = "MediumPaths";
     public static final String SLOW_PATHS = "SlowPaths";
@@ -98,6 +99,11 @@ public class AccordCoordinatorMetrics
      */
     public final Histogram tables;
 
+    /**
+     * The number of ephemeral transactions executed on this coordinator.
+     */
+    public final Meter ephemeral;
+
     /**
      * The number of fast path transactions executed on this coordinator.
      */
@@ -159,6 +165,7 @@ public class AccordCoordinatorMetrics
         keys = 
Metrics.histogram(coordinator.createMetricName(COORDINATOR_KEYS), true);
         tables = 
Metrics.histogram(coordinator.createMetricName(COORDINATOR_TABLES), true);
 
+        ephemeral = Metrics.meter(coordinator.createMetricName(EPHEMERAL));
         fastPaths = Metrics.meter(coordinator.createMetricName(FAST_PATHS));
         mediumPaths = 
Metrics.meter(coordinator.createMetricName(MEDIUM_PATHS));
         slowPaths = Metrics.meter(coordinator.createMetricName(SLOW_PATHS));
@@ -168,7 +175,7 @@ public class AccordCoordinatorMetrics
         invalidations = 
Metrics.meter(coordinator.createMetricName(INVALIDATIONS));
         recoveryDelay = 
Metrics.timer(coordinator.createMetricName(RECOVERY_DELAY));
         recoveryDuration = 
Metrics.timer(coordinator.createMetricName(RECOVERY_TIME));
-        fastPathToTotal = new RatioGaugeSet(fastPaths, 
RatioGaugeSet.sum(fastPaths, mediumPaths, slowPaths), coordinator, 
FAST_PATH_TO_TOTAL + ".%s");
+        fastPathToTotal = new RatioGaugeSet(fastPaths, 
RatioGaugeSet.sum(ephemeral, fastPaths, mediumPaths, slowPaths), coordinator, 
FAST_PATH_TO_TOTAL + ".%s");
     }
 
     @Override
@@ -238,6 +245,7 @@ public class AccordCoordinatorMetrics
                 {
                     switch (path)
                     {
+                        case EPHEMERAL: metrics.ephemeral.mark(); break;
                         case FAST: metrics.fastPaths.mark(); break;
                         case MEDIUM: metrics.mediumPaths.mark(); break;
                         case SLOW: metrics.slowPaths.mark(); break;
diff --git a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
index c916c3239a..94d8e43ab5 100644
--- a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
@@ -58,4 +58,8 @@ public class AccordExecutorMetrics
         waitingToRun = 
Metrics.register(factory.createMetricName("WaitingToRun"), 
gauges.newGauge(AccordExecutor::unsafeWaitingToRunCount, Long::sum));
         running = Metrics.register(factory.createMetricName("Running"), 
gauges.newGauge(AccordExecutor::unsafeRunningCount, Long::sum));
     }
+
+    public static void touch()
+    {
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
index 313ed92bb2..1ef552588f 100644
--- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
@@ -21,12 +21,14 @@ package org.apache.cassandra.metrics;
 import java.lang.reflect.Field;
 import java.util.concurrent.TimeUnit;
 
+import accord.api.SystemEventListener;
 import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.MaxDecidedRX;
 import accord.local.RedundantBefore;
 import accord.primitives.TxnId;
 import accord.topology.TopologyManager;
 import accord.utils.Invariants;
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Counting;
 import com.codahale.metrics.Gauge;
 import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
@@ -44,7 +46,7 @@ import static 
org.apache.cassandra.metrics.AccordMetricUtils.fromDurabilityServi
 import static 
org.apache.cassandra.metrics.AccordMetricUtils.fromTopologyManager;
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
-public class AccordSystemMetrics
+public class AccordSystemMetrics implements SystemEventListener
 {
     public final static AccordSystemMetrics metrics = new 
AccordSystemMetrics();
     private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(30);
@@ -52,6 +54,10 @@ public class AccordSystemMetrics
     public static final String ACCORD_SYSTEM = "AccordSystem";
     public static final String MIN_EPOCH = "MinEpoch";
     public static final String MAX_EPOCH = "MaxEpoch";
+    public static final String MAX_PENDING_EPOCH = "MaxPendingEpoch";
+    public static final String ERRORS = "Errors";
+    public static final String EPOCH_WAITS = "EpochWaits";
+    public static final String EPOCH_TIMEOUTS = "EpochTimeouts";
     public static final String PROGRESS_LOG_ACTIVE = "ProgressLogActive";
     public static final String PROGRESS_LOG_SIZE = "ProgressLogSize";
     public static final String PROGRESS_LOG_AGE = "ProgressLogAge";
@@ -66,6 +72,10 @@ public class AccordSystemMetrics
 
     public final Gauge<Long> minEpoch;
     public final Gauge<Long> maxEpoch;
+    public final Gauge<Long> maxPendingEpoch;
+    public final Counter errors;
+    public final Counter epochWaits;
+    public final Counter epochTimeouts;
     public final Gauge<Long> progressLogActive;
     public final Gauge<Long> durabilityQueueActive;
     public final Gauge<Long> durabilityQueuePending;
@@ -142,17 +152,33 @@ public class AccordSystemMetrics
         DefaultNameFactory factory = new DefaultNameFactory(ACCORD_SYSTEM);
         minEpoch = Metrics.gauge(factory.createMetricName(MIN_EPOCH), 
fromTopologyManager(TopologyManager::minEpoch));
         maxEpoch = Metrics.gauge(factory.createMetricName(MAX_EPOCH), 
fromTopologyManager(TopologyManager::epoch));
+        maxPendingEpoch = 
Metrics.gauge(factory.createMetricName(MAX_PENDING_EPOCH), 
fromTopologyManager(TopologyManager::pendingEpoch));
+        errors = Metrics.counter(factory.createMetricName(ERRORS));
+        epochTimeouts = 
Metrics.counter(factory.createMetricName(EPOCH_TIMEOUTS));
+        epochWaits = Metrics.counter(factory.createMetricName(EPOCH_WAITS));
         durabilityQueueActive = 
Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_ACTIVE), 
fromDurabilityService(durability -> (long)durability.queue().activeCount()));
         durabilityQueuePending = 
Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_PENDING), 
fromDurabilityService(durability -> (long)durability.queue().pendingCount()));
         progressLogActive = 
Metrics.gauge(factory.createMetricName(PROGRESS_LOG_ACTIVE), 
fromDurabilityService(durability -> (long)durability.queue().activeCount()));
-        progressLogSize = 
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () -> 
maybeRefreshHistograms().progressLogSize);
-        progressLogAge = 
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () -> 
maybeRefreshHistograms().progressLogAge);
-        syncPointAgreedLag = 
Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () -> 
maybeRefreshHistograms().syncPointAgreedLag);
-        locallyAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () -> 
maybeRefreshHistograms().locallyAppliedLag);
-        locallyDurableLag = 
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () -> 
maybeRefreshHistograms().locallyDurableLag);
-        quorumAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () -> 
maybeRefreshHistograms().quorumAppliedLag);
-        shardAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () -> 
maybeRefreshHistograms().shardAppliedLag);
-        gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), () 
-> maybeRefreshHistograms().gcLag);
+        progressLogSize = 
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () -> 
maybeRefreshHistograms().progressLogSize, false);
+        progressLogAge = 
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () -> 
maybeRefreshHistograms().progressLogAge, false);
+        syncPointAgreedLag = 
Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () -> 
maybeRefreshHistograms().syncPointAgreedLag, false);
+        locallyAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () -> 
maybeRefreshHistograms().locallyAppliedLag, false);
+        locallyDurableLag = 
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () -> 
maybeRefreshHistograms().locallyDurableLag, false);
+        quorumAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () -> 
maybeRefreshHistograms().quorumAppliedLag, false);
+        shardAppliedLag = 
Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () -> 
maybeRefreshHistograms().shardAppliedLag, false);
+        gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), () 
-> maybeRefreshHistograms().gcLag, false);
+    }
+
+    @Override
+    public void onWaitingForEpoch(long epoch)
+    {
+        epochWaits.inc();
+    }
+
+    @Override
+    public void onTimeoutForEpoch(long epoch, int count)
+    {
+        epochTimeouts.inc(count);
     }
 
     private synchronized Snapshot maybeRefreshHistograms()
@@ -190,6 +216,8 @@ public class AccordSystemMetrics
             for (int i = 0 ; i < redundantBefore.size() ; ++i)
             {
                 RedundantBefore.Bounds bounds = redundantBefore.valueAt(i);
+                if (bounds == null)
+                    continue;
                 builder.locallyAppliedLag.increment(ageSeconds(nowSeconds, 
bounds.maxBound(LOCALLY_APPLIED)));
                 builder.locallyDurableLag.increment(ageSeconds(nowSeconds, 
bounds.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE, 
LOCALLY_DURABLE_TO_COMMAND_STORE)));
                 builder.quorumAppliedLag.increment(ageSeconds(nowSeconds, 
bounds.maxBound(QUORUM_APPLIED)));
diff --git 
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java 
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index fc50797005..c11fd5d018 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -48,7 +48,6 @@ import com.codahale.metrics.Metered;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter;
 import org.apache.cassandra.db.virtual.VirtualTable;
 import org.apache.cassandra.db.virtual.model.CounterMetricRow;
@@ -177,8 +176,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
             return Double.toString(((OverrideHistogram) 
metric).getSnapshot().getMedian());
         else if (metric instanceof Meter)
             return Long.toString(((Meter) metric).getCount());
-        else if (metric instanceof Timer)
-            return Long.toString(((Timer) metric).getCount());
+        else if (metric instanceof com.codahale.metrics.Timer)
+            return Long.toString(((com.codahale.metrics.Timer) 
metric).getCount());
         else
             throw new IllegalStateException("Unknown metric type: " + 
metric.getClass().getName());
     }
@@ -357,14 +356,14 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         return histogram;
     }
 
-    public ShardedHistogram shardedHistogram(MetricName name)
+    public ShardedHistogram shardedHistogram(MetricName name, boolean 
isCumulative)
     {
-        return register(name, new ShardedHistogram());
+        return register(name, new ShardedHistogram(isCumulative));
     }
 
-    public OnDemandHistogram onDemandHistogram(MetricName name, 
Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot)
+    public OnDemandHistogram onDemandHistogram(MetricName name, 
Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot, boolean isCumulative)
     {
-        return register(name, new OnDemandHistogram(snapshot));
+        return register(name, new OnDemandHistogram(snapshot, isCumulative));
     }
 
     public <T extends Gauge<?>> T gauge(MetricName name, T gauge)
@@ -379,7 +378,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
         return gaugeLoc;
     }
 
-    public Timer timer(MetricName name)
+    public SnapshottingTimer timer(MetricName name)
     {
         return timer(name, DEFAULT_TIMER_UNIT);
     }
@@ -773,17 +772,22 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         }
 
         /**
-         * Returns a histogram describing the values recorded since the last 
time this method was called.
+         * If the Histogram has cumulative data, returns a histogram 
describing the values recorded since the last time this method was called.
          *
-         * ex. If the counts are [0, 1, 2, 1] at the time the first caller 
arrives, but change to [1, 2, 3, 2] by the 
+         * ex. If the counts are [0, 1, 2, 1] at the time the first caller 
arrives, but change to [1, 2, 3, 2] by the
          * time a second caller arrives, the second caller will receive [1, 1, 
1, 1].
          *
+         * If the Histogram does not have cumulative data, simply returns the 
current snapshot.
+         *
          * @return a histogram whose bucket offsets are assumed to be in 
nanoseconds
          */
         @Override
         public synchronized long[] getRecentValues()
         {
-            long[] now = metric.getSnapshot().getValues();
+            long[] now = values();
+            if (!metric.isCumulative())
+                return now;
+
             long[] delta = delta(now, last);
             last = now;
             return delta;
@@ -968,6 +972,12 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         long[] getRecentValues();
 
         String getDurationUnit();
+
+        String bucketsId();
+
+        long[] rawBuckets(int count);
+
+        long[] rawValues();
     }
 
     static class JmxTimer extends JmxMeter implements JmxTimerMBean
@@ -1049,7 +1059,10 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         @Override
         public long[] values()
         {
-            return metric.getSnapshot().getValues();
+            long[] values = metric.getSnapshot().getValues();
+            if (metric.bucketStrategy() == 
CassandraReservoir.BucketStrategy.log_linear)
+                values = 
metric.bucketStrategy().translateTo(CassandraReservoir.BucketStrategy.exp_12_nozero,
 values);
+            return values;
         }
 
         /**
@@ -1063,7 +1076,10 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         @Override
         public synchronized long[] getRecentValues()
         {
-            long[] now = metric.getSnapshot().getValues();
+            long[] now = values();
+            if (!metric.isCumulative())
+                return now;
+
             long[] delta = delta(now, last);
             last = now;
             return delta;
@@ -1074,6 +1090,24 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
         {
             return durationUnit;
         }
+
+        @Override
+        public String bucketsId()
+        {
+            return metric.bucketStrategy().name();
+        }
+
+        @Override
+        public long[] rawBuckets(int count)
+        {
+            return metric.bucketStarts(count);
+        }
+
+        @Override
+        public long[] rawValues()
+        {
+            return metric.getSnapshot().getValues();
+        }
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java 
b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
index e4abd40bbf..dda9844752 100644
--- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Lists;
 
 import com.codahale.metrics.Counter;
-import com.codahale.metrics.Reservoir;
 import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -169,12 +167,11 @@ public class LatencyMetrics
         Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"));
     }
 
-    public class LatencyMetricsTimer extends Timer
+    public class LatencyMetricsTimer extends OverrideTimer implements 
org.apache.cassandra.metrics.Timer
     {
-
         long releasedLatencyCount = 0;
 
-        public LatencyMetricsTimer(Reservoir reservoir) 
+        public LatencyMetricsTimer(CassandraReservoir reservoir)
         {
             super(reservoir);
         }
diff --git a/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java 
b/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
index 4ad890e3c0..f00966824b 100644
--- a/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
@@ -28,9 +28,11 @@ import static 
org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy.log
 public class OnDemandHistogram extends OverrideHistogram
 {
     final Supplier<LogLinearSnapshot> snapshot;
-    protected OnDemandHistogram(Supplier<LogLinearSnapshot> snapshot)
+    final boolean isCumulative;
+    protected OnDemandHistogram(Supplier<LogLinearSnapshot> snapshot, boolean 
isCumulative)
     {
         this.snapshot = snapshot;
+        this.isCumulative = isCumulative;
     }
 
     @Override
@@ -56,4 +58,10 @@ public class OnDemandHistogram extends OverrideHistogram
     {
         return LogLinearHistogram.bucketsWithLength(length);
     }
+
+    @Override
+    public boolean isCumulative()
+    {
+        return isCumulative;
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java 
b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
index 2553d5e01f..e45480247d 100644
--- a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.metrics;
 import com.codahale.metrics.Snapshot;
 import org.agrona.UnsafeAccess;
 
-public abstract class OverrideHistogram extends com.codahale.metrics.Histogram
+public class OverrideHistogram extends com.codahale.metrics.Histogram
 {
     private static final CassandraReservoir NO_RESERVOIR = new 
CassandraReservoir() {
         @Override public Snapshot getPercentileSnapshot() { return null; }
@@ -61,4 +61,9 @@ public abstract class OverrideHistogram extends 
com.codahale.metrics.Histogram
     {
         return reservoir.buckets(length);
     }
+
+    public boolean isCumulative()
+    {
+        return true;
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/OverrideMeter.java 
b/src/java/org/apache/cassandra/metrics/OverrideMeter.java
new file mode 100644
index 0000000000..4daae7e1b7
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/OverrideMeter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Clock;
+
+public class OverrideMeter extends com.codahale.metrics.Meter implements Meter
+{
+    public OverrideMeter(Clock clock)
+    {
+        super(clock);
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java 
b/src/java/org/apache/cassandra/metrics/OverrideTimer.java
similarity index 88%
copy from src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
copy to src/java/org/apache/cassandra/metrics/OverrideTimer.java
index 771c68b04f..e2fe1e45fd 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
+++ b/src/java/org/apache/cassandra/metrics/OverrideTimer.java
@@ -34,18 +34,18 @@ import com.codahale.metrics.Snapshot;
  * NOTE: Dropwizard Timer is a concrete class and there is no an interface for 
Dropwizard Timer logic,
  *   so we have to create an alternative hierarchy.
  */
-public class ThreadLocalTimer extends com.codahale.metrics.Timer implements 
Timer
+public class OverrideTimer extends com.codahale.metrics.Timer implements Timer
 {
-    private final Meter meter;
-    private final ThreadLocalHistogram histogram;
+    protected final Meter meter;
+    protected final OverrideHistogram histogram;
     // usually we need precise clocks for timing, so we do not replace it with 
an approximate version
-    private final MetricClock clock;
+    protected final MetricClock clock;
 
     /**
      * Creates a new {@link Timer} using an {@link 
ExponentiallyDecayingReservoir} and the default
      * {@link MetricClock}.
      */
-    public ThreadLocalTimer()
+    public OverrideTimer()
     {
         this(new DecayingEstimatedHistogramReservoir());
     }
@@ -55,7 +55,7 @@ public class ThreadLocalTimer extends 
com.codahale.metrics.Timer implements Time
      *
      * @param reservoir the {@link Reservoir} implementation the timer should 
use
      */
-    public ThreadLocalTimer(CassandraReservoir reservoir)
+    public OverrideTimer(CassandraReservoir reservoir)
     {
         this(reservoir, MetricClock.defaultClock());
     }
@@ -66,14 +66,14 @@ public class ThreadLocalTimer extends 
com.codahale.metrics.Timer implements Time
      * @param reservoir the {@link Reservoir} implementation the timer should 
use
      * @param clock     the {@link MetricClock} implementation the timer 
should use
      */
-    public ThreadLocalTimer(CassandraReservoir reservoir, MetricClock clock)
+    public OverrideTimer(CassandraReservoir reservoir, MetricClock clock)
     {
         // the precise clock is intentionally not propagated to 
ThreadLocalMeter
         // we do not need a precise and more expensive time within the meter
-        this(new ThreadLocalMeter(), new ThreadLocalHistogram(reservoir), 
clock);
+        this(new OverrideMeter(clock), new OverrideHistogram(reservoir), 
clock);
     }
 
-    public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram, 
MetricClock clock)
+    public OverrideTimer(Meter meter, OverrideHistogram histogram, MetricClock 
clock)
     {
         // original Codahale meter and histogram are set to null to reduce 
memory footprint
         super(null, null, clock);
@@ -221,4 +221,19 @@ public class ThreadLocalTimer extends 
com.codahale.metrics.Timer implements Time
             meter.mark();
         }
     }
+
+    public CassandraReservoir.BucketStrategy bucketStrategy()
+    {
+        return histogram.bucketStrategy();
+    }
+
+    public long[] bucketStarts(int length)
+    {
+        return histogram.reservoir.buckets(length);
+    }
+
+    public boolean isCumulative()
+    {
+        return true;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java 
b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
index db91857b3c..92620fb034 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
@@ -74,6 +74,12 @@ public class ShardedDecayingHistograms
         {
             return shard.histograms.get(histogramIndex);
         }
+
+        @Override
+        public boolean isCumulative()
+        {
+            return false;
+        }
     }
 
     public static class DecayingHistogramsShard
diff --git a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java 
b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
index 66defdac56..3ac77a9717 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
@@ -58,11 +58,12 @@ public class ShardedHistogram extends OverrideHistogram
     }
 
     final CopyOnWriteArrayList<HistogramShard> shards = new 
CopyOnWriteArrayList<>();
+    final boolean isCumulative;
     final long initialMaxValue;
 
-    public ShardedHistogram()
+    public ShardedHistogram(boolean isCumulative)
     {
-        this(1 << 16);
+        this(isCumulative, 1 << 16);
     }
 
     @Override
@@ -77,8 +78,9 @@ public class ShardedHistogram extends OverrideHistogram
         return LogLinearHistogram.bucketsWithLength(length);
     }
 
-    public ShardedHistogram(long initialMaxValue)
+    public ShardedHistogram(boolean isCumulative, long initialMaxValue)
     {
+        this.isCumulative = isCumulative;
         this.initialMaxValue = initialMaxValue;
     }
 
@@ -130,4 +132,10 @@ public class ShardedHistogram extends OverrideHistogram
     {
         return maybeRefresh();
     }
+
+    @Override
+    public boolean isCumulative()
+    {
+        return isCumulative;
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
index 771c68b04f..c653a1dc60 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
@@ -18,14 +18,8 @@
 
 package org.apache.cassandra.metrics;
 
-import java.time.Duration;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Reservoir;
-import com.codahale.metrics.Snapshot;
 
 /**
  * An alternative to Dropwizard Timer which implements the same kind of API.
@@ -34,13 +28,8 @@ import com.codahale.metrics.Snapshot;
  * NOTE: Dropwizard Timer is a concrete class and there is no an interface for 
Dropwizard Timer logic,
  *   so we have to create an alternative hierarchy.
  */
-public class ThreadLocalTimer extends com.codahale.metrics.Timer implements 
Timer
+public class ThreadLocalTimer extends OverrideTimer
 {
-    private final Meter meter;
-    private final ThreadLocalHistogram histogram;
-    // usually we need precise clocks for timing, so we do not replace it with 
an approximate version
-    private final MetricClock clock;
-
     /**
      * Creates a new {@link Timer} using an {@link 
ExponentiallyDecayingReservoir} and the default
      * {@link MetricClock}.
@@ -76,149 +65,6 @@ public class ThreadLocalTimer extends 
com.codahale.metrics.Timer implements Time
     public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram, 
MetricClock clock)
     {
         // original Codahale meter and histogram are set to null to reduce 
memory footprint
-        super(null, null, clock);
-        this.meter = meter;
-        this.histogram = histogram;
-        this.clock = clock;
-    }
-
-    /**
-     * Adds a recorded duration.
-     *
-     * @param duration the length of the duration
-     * @param unit     the scale unit of {@code duration}
-     */
-    @Override
-    public void update(long duration, TimeUnit unit)
-    {
-        update(unit.toNanos(duration));
-    }
-
-    /**
-     * Adds a recorded duration.
-     *
-     * @param duration the {@link Duration} to add to the timer. Negative or 
zero value are ignored.
-     */
-    @Override
-    public void update(Duration duration)
-    {
-        update(duration.toNanos());
-    }
-
-    /**
-     * Times and records the duration of event.
-     *
-     * @param event a {@link Callable} whose {@link Callable#call()} method 
implements a process
-     *              whose duration should be timed
-     * @param <T>   the type of the value returned by {@code event}
-     * @return the value returned by {@code event}
-     * @throws Exception if {@code event} throws an {@link Exception}
-     */
-    @Override
-    public <T> T time(Callable<T> event) throws Exception
-    {
-        final long startTime = clock.getTick();
-        try
-        {
-            return event.call();
-        }
-        finally
-        {
-            update(clock.getTick() - startTime);
-        }
-    }
-
-    /**
-     * Times and records the duration of event. Should not throw exceptions, 
for that use the
-     * {@link #time(Callable)} method.
-     *
-     * @param event a {@link Supplier} whose {@link Supplier#get()} method 
implements a process
-     *              whose duration should be timed
-     * @param <T>   the type of the value returned by {@code event}
-     * @return the value returned by {@code event}
-     */
-    @Override
-    public <T> T timeSupplier(Supplier<T> event)
-    {
-        final long startTime = clock.getTick();
-        try
-        {
-            return event.get();
-        }
-        finally
-        {
-            update(clock.getTick() - startTime);
-        }
-    }
-
-    /**
-     * Times and records the duration of event.
-     *
-     * @param event a {@link Runnable} whose {@link Runnable#run()} method 
implements a process
-     *              whose duration should be timed
-     */
-    @Override
-    public void time(Runnable event)
-    {
-        final long startTime = clock.getTick();
-        try
-        {
-            event.run();
-        }
-        finally
-        {
-            update(clock.getTick() - startTime);
-        }
-    }
-
-    @Override
-    public Timer.Context startTime()
-    {
-        return new Timer.Context(this, clock);
-    }
-
-    @Override
-    public long getCount()
-    {
-        return histogram.getCount();
-    }
-
-    @Override
-    public double getFifteenMinuteRate()
-    {
-        return meter.getFifteenMinuteRate();
-    }
-
-    @Override
-    public double getFiveMinuteRate()
-    {
-        return meter.getFiveMinuteRate();
-    }
-
-    @Override
-    public double getMeanRate()
-    {
-        return meter.getMeanRate();
-    }
-
-    @Override
-    public double getOneMinuteRate()
-    {
-        return meter.getOneMinuteRate();
-    }
-
-    @Override
-    public Snapshot getSnapshot()
-    {
-        return histogram.getSnapshot();
-    }
-
-    private void update(long duration)
-    {
-        if (duration >= 0)
-        {
-            histogram.update(duration);
-            meter.mark();
-        }
+        super(meter, histogram, clock);
     }
 }
diff --git a/src/java/org/apache/cassandra/metrics/Timer.java 
b/src/java/org/apache/cassandra/metrics/Timer.java
index ed764c03c9..84369be485 100644
--- a/src/java/org/apache/cassandra/metrics/Timer.java
+++ b/src/java/org/apache/cassandra/metrics/Timer.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
 
 import com.codahale.metrics.Metered;
 import com.codahale.metrics.Sampling;
+import org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy;
 
 /**
  * An interface which mimics {@link com.codahale.metrics.Timer} API and allows 
alternative implementations
@@ -70,6 +71,10 @@ public interface Timer extends Metered, Sampling
         }
     }
 
+    BucketStrategy bucketStrategy();
+    long[] bucketStarts(int length);
+    boolean isCumulative();
+
     void update(long duration, TimeUnit unit);
 
     void update(Duration duration);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d88314c320..2e34080f99 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -320,7 +320,7 @@ public class AccordCommandStore extends CommandStore
         RedundantBefore.QuickBounds bounds = 
unsafeGetRedundantBefore().get(key);
         if (bounds == null)
             return cfk; // TODO (required): I don't think this should be 
possible? but we hit it on some test
-        return cfk.withRedundantBeforeAtLeast(bounds.gcBefore, false);
+        return cfk.withGcBeforeAtLeast(bounds.gcBefore, false);
     }
 
     boolean validateCommandsForKey(RoutableKey key, CommandsForKey evicting)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 0de343feb7..783299dc4f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -303,7 +303,7 @@ public class AccordKeyspace
                 return null;
 
             // TODO (desired): consider whether better to not compact any 
validation failures, since we expect is already overwritten
-            CommandsForKey updated = 
current.withRedundantBeforeAtLeast(redundantBefore.gcBefore(), false);
+            CommandsForKey updated = 
current.withGcBeforeAtLeast(redundantBefore.gcBefore(), false);
             if (current == updated)
                 return row;
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index e7e5762fb1..e9874fae0a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -46,6 +46,7 @@ import accord.primitives.Txn;
 import org.apache.cassandra.config.AccordSpec;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
+import org.apache.cassandra.metrics.AccordExecutorMetrics;
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
 import org.apache.cassandra.metrics.AccordSystemMetrics;
 import org.apache.cassandra.service.accord.api.AccordViolationHandler;
@@ -161,51 +162,79 @@ public class AccordService implements IAccordService, 
Shutdownable
         // Listener is initialized before Accord is initialized
         public static MetadataChangeListener instance = new 
MetadataChangeListener();
 
-        private MetadataChangeListener() {}
-
-        private final AtomicReference<ChangeListener> collector = new 
AtomicReference<>(new PreInitStateCollector());
-
-        @Override
-        public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata 
next, boolean fromSnapshot)
-        {
-            collector.get().notifyPreCommit(prev, next, fromSnapshot);
-        }
-
-        @Override
-        public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata 
next, boolean fromSnapshot)
+        interface Sink
         {
-            collector.get().notifyPostCommit(prev, next, fromSnapshot);
-        }
-
-        @VisibleForTesting
-        public void resetForTesting(ClusterMetadata metadata)
-        {
-            PreInitStateCollector stateCollector = new PreInitStateCollector();
-            stateCollector.items.add(metadata);
-            collector.set(stateCollector);
+            Sink update(ClusterMetadata next);
         }
 
         /**
          * Collects TCM events from startup util full Accord initialization to 
avoid races with TCM and creating gaps between
          * epochs restored from journal and reported by TCM.
          **/
-
-        static class PreInitStateCollector implements ChangeListener
+        static final class Collector implements Sink
         {
-            private final List<ClusterMetadata> items = new ArrayList<>(4);
+            static final Collector EMPTY = new Collector(new 
ClusterMetadata[0]);
+            final ClusterMetadata[] saved;
+
+            Collector(ClusterMetadata[] saved)
+            {
+                this.saved = saved;
+            }
 
             @Override
-            public synchronized void notifyPostCommit(ClusterMetadata prev, 
ClusterMetadata next, boolean fromSnapshot)
+            public Sink update(ClusterMetadata next)
             {
-                logger.debug("Saving epoch {} to deliver after startup", 
next.epoch);
-                items.add(next);
+                ClusterMetadata[] newSaved = Arrays.copyOf(saved, saved.length 
+ 1);
+                newSaved[saved.length] = next;
+                return new Collector(newSaved);
             }
 
-            public synchronized List<ClusterMetadata> getItems()
+            public List<ClusterMetadata> getItems()
             {
-                return new ArrayList<>(items);
+                return Arrays.asList(saved);
             }
         }
+
+        private final AtomicReference<Sink> sink = new 
AtomicReference<>(Collector.EMPTY);
+
+        private MetadataChangeListener() {}
+
+        @Override
+        public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata 
next, boolean fromSnapshot)
+        {
+            while (true)
+            {
+                Sink curSink = sink.get();
+                Sink nextSink = curSink.update(next);
+                if (nextSink == curSink || sink.compareAndSet(curSink, 
nextSink))
+                    return;
+            }
+        }
+
+        Collector replaceSinkIfEmpty(Sink newSink)
+        {
+            while (true)
+            {
+                Sink curSink = sink.get();
+                Invariants.require(curSink instanceof Collector);
+                if (curSink == Collector.EMPTY)
+                {
+                    if (sink.compareAndSet(curSink, newSink))
+                        return null;
+                }
+                else
+                {
+                    if (sink.compareAndSet(curSink, Collector.EMPTY))
+                        return (Collector) curSink;
+                }
+            }
+        }
+
+        @VisibleForTesting
+        public void unsafeResetForTesting(ClusterMetadata metadata)
+        {
+            sink.set(Collector.EMPTY.update(metadata));
+        }
     }
 
     private static final Logger logger = 
LoggerFactory.getLogger(AccordService.class);
@@ -325,6 +354,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
         AccordReplicaMetrics.touch();
         AccordSystemMetrics.touch();
+        AccordExecutorMetrics.touch();
         AccordViolationHandler.setup();
         return as;
     }
@@ -422,6 +452,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             journal.start(node);
             node.load();
 
+
             ClusterMetadata metadata = ClusterMetadata.current();
             endpointMapper.updateMapping(metadata);
 
@@ -575,24 +606,28 @@ public class AccordService implements IAccordService, 
Shutdownable
             }
 
             // Subscribe to TCM events, and collect any we may have missed to 
report now
-            ChangeListener prevListener = 
MetadataChangeListener.instance.collector.getAndSet(new ChangeListener()
+            MetadataChangeListener.Sink sink = new 
MetadataChangeListener.Sink()
             {
                 @Override
-                public void notifyPostCommit(ClusterMetadata prev, 
ClusterMetadata next, boolean fromSnapshot)
+                public MetadataChangeListener.Sink update(ClusterMetadata next)
                 {
                     if (state != State.SHUTDOWN)
                         maybeReportMetadata(next);
+                    return this;
                 }
-            });
-
-            Invariants.require((prevListener instanceof 
MetadataChangeListener.PreInitStateCollector),
-                               "Listener should have been initialized with 
Accord pre-init state collector, but was " + prevListener.getClass());
+            };
 
-            MetadataChangeListener.PreInitStateCollector preinit = 
(MetadataChangeListener.PreInitStateCollector) prevListener;
-            for (ClusterMetadata item : preinit.getItems())
+            while (true)
             {
-                if (item.epoch.getEpoch() > highestKnown)
-                    maybeReportMetadata(item);
+                MetadataChangeListener.Collector collector = 
MetadataChangeListener.instance.replaceSinkIfEmpty(sink);
+                if (collector == null)
+                    break;
+
+                for (ClusterMetadata item : collector.getItems())
+                {
+                    if (item.epoch.getEpoch() > highestKnown)
+                        maybeReportMetadata(item);
+                }
             }
         }
         catch (InterruptedException e)
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 636535e576..67825bad6b 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -67,6 +67,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.RequestTimeoutException;
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
+import org.apache.cassandra.metrics.AccordSystemMetrics;
 import org.apache.cassandra.net.ResponseContext;
 import org.apache.cassandra.service.RetryStrategy;
 import org.apache.cassandra.service.accord.AccordService;
@@ -197,7 +198,11 @@ public class AccordAgent implements Agent, 
OwnershipEventListener
 
     public static void handleException(Throwable t)
     {
-        if (t instanceof RequestTimeoutException || t instanceof 
CancellationException || t instanceof TimeoutException || t instanceof Timeout)
+        if (t instanceof RequestTimeoutException)
+            return;
+
+        AccordSystemMetrics.metrics.errors.inc();
+        if (t instanceof CancellationException || t instanceof 
TimeoutException || t instanceof Timeout)
             return;
         JVMStabilityInspector.uncaughtException(Thread.currentThread(), t);
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index 2c5f405205..c98ba63a4a 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -41,7 +41,6 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.topology.Topologies;
-import accord.topology.Topologies.SelectNodeOwnership;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.service.accord.AccordEndpointMapper;
 import org.apache.cassandra.service.accord.txn.AccordUpdate;
@@ -90,12 +89,12 @@ public class AccordInteropAdapter extends TxnAdapter
     }
 
     @Override
-    public void persist(Node node, SequentialAsyncExecutor executor, 
Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership 
selectSendTo, FullRoute<?> route, Ballot ballot, CoordinationFlags flags, TxnId 
txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result, 
boolean informDurableOnDone, BiConsumer<? super Result, Throwable> callback)
+    public void persist(Node node, SequentialAsyncExecutor executor, 
Topologies any, Route<?> require, Route<?> sendTo, FullRoute<?> route, Ballot 
ballot, CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt, 
Deps deps, Writes writes, Result result, boolean informDurableOnDone, 
BiConsumer<? super Result, Throwable> callback)
     {
-        if (applyKind == Minimal && doInteropPersist(node, executor, any, 
require, sendTo, selectSendTo, ballot, txnId, txn, executeAt, deps, writes, 
result, route, informDurableOnDone, callback))
+        if (applyKind == Minimal && doInteropPersist(node, executor, any, 
require, sendTo, ballot, txnId, txn, executeAt, deps, writes, result, route, 
informDurableOnDone, callback))
             return;
 
-        super.persist(node, executor, any, require, sendTo, selectSendTo, 
route, ballot, flags, txnId, txn, executeAt, deps, writes, result, 
informDurableOnDone, callback);
+        super.persist(node, executor, any, require, sendTo, route, ballot, 
flags, txnId, txn, executeAt, deps, writes, result, informDurableOnDone, 
callback);
     }
 
     private boolean doInteropExecute(Node node, SequentialAsyncExecutor 
executor, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, Timestamp 
executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback)
@@ -121,7 +120,7 @@ public class AccordInteropAdapter extends TxnAdapter
         return true;
     }
 
-    private boolean doInteropPersist(Node node, SequentialAsyncExecutor 
executor, Topologies any, Route<?> require, Route<?> sendTo, 
SelectNodeOwnership selectSendTo, Ballot ballot, TxnId txnId, Txn txn, 
Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?> 
fullRoute, boolean informDurableOnDone, BiConsumer<? super Result, Throwable> 
callback)
+    private boolean doInteropPersist(Node node, SequentialAsyncExecutor 
executor, Topologies any, Route<?> require, Route<?> sendTo, Ballot ballot, 
TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result 
result, FullRoute<?> fullRoute, boolean informDurableOnDone, BiConsumer<? super 
Result, Throwable> callback)
     {
         Update update = txn.update();
         ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ? 
((AccordUpdate) update).cassandraCommitCL() : null;
@@ -132,7 +131,7 @@ public class AccordInteropAdapter extends TxnAdapter
         AccordInteropPersist persist;
         try
         {
-            Topologies all = execution(node, any, sendTo, selectSendTo, 
fullRoute, txnId, executeAt);
+            Topologies all = execution(node, any, sendTo, fullRoute, txnId, 
executeAt);
             persist = new AccordInteropPersist(node, executor, all, txnId, 
require, ballot, txn, executeAt, deps, writes, result, fullRoute, 
consistencyLevel, CoordinationFlags.none(), informDurableOnDone, Minimal, 
callback);
         }
         catch (Throwable t)
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index cbc6182674..6872ed79e1 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -95,7 +95,7 @@ import org.apache.cassandra.transport.Dispatcher;
 
 import static accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
 import static accord.primitives.Txn.Kind.Write;
-import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
+import static accord.topology.SelectShards.LIVE;
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.requireArgument;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadMetrics;
@@ -159,9 +159,9 @@ public class AccordInteropExecution implements 
ReadCoordinator
 
         // TODO (required): compare this to latest logic in Accord, make sure 
it makes sense
         ActiveEpochs epochs = node.topology().active();
-        this.executes = epochs.forEpoch(route, executeAt.epoch(), SHARE);
+        this.executes = epochs.forEpoch(route, executeAt.epoch(), LIVE);
         this.allTopologies = txnId.epoch() != executeAt.epoch()
-                             ? epochs.preciseEpochs(route, txnId.epoch(), 
executeAt.epoch(), SHARE)
+                             ? epochs.preciseEpochs(route, txnId.epoch(), 
executeAt.epoch(), LIVE)
                              : executes;
         this.executeTopology = executes.getEpoch(executeAt.epoch());
         this.coordinateTopology = allTopologies.getEpoch(txnId.epoch());
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0e8f681a48..232db188db 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -926,7 +926,7 @@ public abstract class LocalLog implements Closeable
         addListener(new MetadataSnapshotListener());
         addListener(new ClientNotificationListener());
         addListener(new UpgradeMigrationListener());
-        if (DatabaseDescriptor.getAccord().enabled)
+        if (DatabaseDescriptor.getAccordTransactionsEnabled())
             addListener(AccordService.MetadataChangeListener.instance);
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a362938aba..d6479e1813 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -717,6 +717,9 @@ public class ClusterUtils
 
     public static Epoch maxEpoch(ICluster<IInvokableInstance> cluster, int... 
nodes)
     {
+        if (nodes == null || nodes.length == 0)
+            return maxEpoch(cluster);
+
         Epoch max = null;
         for (int id : nodes)
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 2577d02a7a..bdee5765ac 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.primitives.Unseekables;
+import accord.topology.SelectShards;
 import accord.topology.Topologies;
 import accord.topology.TopologyException;
 import org.apache.cassandra.config.Config.PaxosVariant;
@@ -711,7 +712,7 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
                 Unseekables<?> routables = 
AccordTestUtils.createTxn(sb.toString()).keys().toParticipants();
                 long epoch = AccordService.instance().topology().epoch();
                 Topologies topology;
-                try { topology = 
AccordService.instance().topology().active().withUnsyncedEpochs(routables, 
epoch, epoch); }
+                try { topology = 
AccordService.instance().topology().active().withUnsyncedEpochs(routables, 
epoch, epoch, SelectShards.ALL); }
                 catch (TopologyException e) { throw new RuntimeException(e); }
                 // we don't detect out-of-bounds read/write yet, so use this 
to validate we reach different shards
                 Assertions.assertThat(topology.totalShards()).isEqualTo(2);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
index 8443526e9f..01632f1cd3 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
@@ -110,7 +110,7 @@ public class AccordSimpleFastPathTest extends TestBaseImpl
 
                 long epoch = cm.epoch.getEpoch();
                 TopologyManager tm = AccordService.instance().topology();
-                Topology topology = tm.active().getKnown(epoch).global();
+                Topology topology = tm.active().getKnown(epoch).all();
                 Assert.assertFalse(topology.shards().isEmpty());
                 topology.shards().forEach(shard -> 
Assert.assertEquals(idSet(1, 2, 3), shard.nodes.without(shard.notInFastPath)));
                 return cm.epoch.getEpoch();
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
index e4e6de559f..ce927926d9 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -124,7 +124,7 @@ public class AccordVirtualTablesTest extends CQLTester
 
         // range is no longer "added" so doesn't show up as synced!
         long e2 = 2;
-        ActiveEpoch a2 = active(topology(e2, T1), EpochReady.done(e2), 
a1.global());
+        ActiveEpoch a2 = active(topology(e2, T1), EpochReady.done(e2), 
a1.all());
         tm.unsafeSetActive(ActiveEpochs.unsafeNew(tm, new ActiveEpoch[] { a2, 
a1 }, -1));
         assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
                    row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
List.of(), List.of(), List.of(), FULL_RANGE));
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index aa37658fc6..865d1b799e 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -518,7 +518,7 @@ public class RouteIndexTest extends CQLTester
         AccordService startAccord()
         {
             ClusterMetadata metadata = ClusterMetadata.current();
-            
AccordService.MetadataChangeListener.instance.resetForTesting(metadata);
+            
AccordService.MetadataChangeListener.instance.unsafeResetForTesting(metadata);
             NodeId tcmNodeId = metadata.myNodeId();
             AccordService.unsafeSetNewAccordService(null);
             AccordService.localStartup(tcmNodeId);
diff --git 
a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
index 9f79d656c1..45348f2775 100644
--- a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
@@ -68,7 +68,7 @@ public class JmxVirtualTableMetricsTest extends CQLTester
         metricToNameMap.put(MetricType.METER, registry.meter("meter"));
         metricToNameMap.put(MetricType.COUNTER, registry.counter("counter"));
         metricToNameMap.put(MetricType.HISTOGRAM, 
registry.histogram("histogram", () -> new ClearableHistogram(new 
DecayingEstimatedHistogramReservoir(true))));
-        metricToNameMap.put(MetricType.TIMER, registry.timer("timer"));
+        metricToNameMap.put(MetricType.TIMER, registry.timer("timer", () -> 
new SnapshottingTimer(new DecayingEstimatedHistogramReservoir())));
         metricToNameMap.put(MetricType.GAUGE, registry.gauge("gauge", () -> 
gaugeValue::get));
 
         CassandraMetricsRegistry.metricGroups.forEach(group -> {
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java 
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index b94237befc..aee4fabb44 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -392,7 +392,7 @@ public class EpochSyncTest
                                   .isFalse();
 
                         // validate topology manager
-                        Ranges ranges = 
tm.active().getKnown(epoch).global().ranges().mergeTouching();
+                        Ranges ranges = 
tm.active().getKnown(epoch).all().ranges().mergeTouching();
                         Ranges actual = 
tm.unsafeQuorumReady(epoch).mergeTouching();
                         Assertions.assertThat(actual)
                                   .describedAs("node%s does not have all 
expected sync ranges for epoch %d; missing %s", id, epoch, 
ranges.without(actual))
@@ -404,7 +404,7 @@ public class EpochSyncTest
                             continue;
 
                         
Assertions.assertThat(tm.active().hasEpoch(epoch)).describedAs("node%s does not 
have epoch %d", id, epoch).isTrue();
-                        Topology topology = 
tm.active().getKnown(epoch).global();
+                        Topology topology = tm.active().getKnown(epoch).all();
                         Ranges ranges = topology.ranges().mergeTouching();
                         Ranges actual = 
tm.unsafeQuorumReady(epoch).mergeTouching();
                         // TopologyManager defines syncComplete for an epoch 
as (epoch - 1).syncComplete.  This means that an epoch has reached quorum, but 
will still miss ranges as previous epochs have not


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to