This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4684a11ec Fix: - DefaultLocalListeners.ComplexListeners iterator
IndexOutOfBoundsException - Race condition initialising empty ActiveEpochs,
when minimum pending epoch can move backwards - SyncPoints must be declared in
an epoch containing the ranges, and PENDING_REMOVAL ranges will reject
non-syncpoint transactions - AccordExecutorMetrics is now registered on
startup - getRecentValues for non-cumulative histogram should not subtract
prior values Improve: - Report ephemera [...]
e4684a11ec is described below
commit e4684a11ec4fd52baaf0da141e24db6a92f63725
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Dec 9 11:46:38 2025 +0000
Fix:
- DefaultLocalListeners.ComplexListeners iterator IndexOutOfBoundsException
- Race condition initialising empty ActiveEpochs, when minimum pending
epoch can move backwards
- SyncPoints must be declared in an epoch containing the ranges, and
PENDING_REMOVAL ranges will reject non-syncpoint transactions
- AccordExecutorMetrics is now registered on startup
- getRecentValues for non-cumulative histogram should not subtract prior
values
Improve:
- Report ephemeral read, epoch waits and timeout metrics
- Remove Topologies.SelectNodeOwnership, as no need to SLICE anymore
- Introduce SystemEventListener for epoch waiting and timeout metrics
- No-op but log if gcBefore provided to CFK is in the past
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21076
---
modules/accord | 2 +-
.../cassandra/config/DatabaseDescriptor.java | 2 +-
.../cql3/statements/TransactionStatement.java | 2 +-
.../cassandra/db/virtual/AccordDebugKeyspace.java | 2 +-
.../cassandra/metrics/AccordCacheMetrics.java | 2 +-
.../metrics/AccordCoordinatorMetrics.java | 10 +-
.../cassandra/metrics/AccordExecutorMetrics.java | 4 +
.../cassandra/metrics/AccordSystemMetrics.java | 46 ++++--
.../metrics/CassandraMetricsRegistry.java | 60 ++++++--
.../apache/cassandra/metrics/LatencyMetrics.java | 7 +-
.../cassandra/metrics/OnDemandHistogram.java | 10 +-
.../cassandra/metrics/OverrideHistogram.java | 7 +-
.../apache/cassandra/metrics/OverrideMeter.java | 29 ++++
.../{ThreadLocalTimer.java => OverrideTimer.java} | 33 +++--
.../metrics/ShardedDecayingHistograms.java | 6 +
.../apache/cassandra/metrics/ShardedHistogram.java | 14 +-
.../apache/cassandra/metrics/ThreadLocalTimer.java | 158 +--------------------
src/java/org/apache/cassandra/metrics/Timer.java | 5 +
.../service/accord/AccordCommandStore.java | 2 +-
.../cassandra/service/accord/AccordKeyspace.java | 2 +-
.../cassandra/service/accord/AccordService.java | 113 ++++++++++-----
.../cassandra/service/accord/api/AccordAgent.java | 7 +-
.../accord/interop/AccordInteropAdapter.java | 11 +-
.../accord/interop/AccordInteropExecution.java | 6 +-
.../org/apache/cassandra/tcm/log/LocalLog.java | 2 +-
.../cassandra/distributed/shared/ClusterUtils.java | 3 +
.../distributed/test/accord/AccordCQLTestBase.java | 3 +-
.../test/accord/AccordSimpleFastPathTest.java | 2 +-
.../db/virtual/AccordVirtualTablesTest.java | 2 +-
.../cassandra/index/accord/RouteIndexTest.java | 2 +-
.../metrics/JmxVirtualTableMetricsTest.java | 2 +-
.../cassandra/service/accord/EpochSyncTest.java | 4 +-
32 files changed, 298 insertions(+), 262 deletions(-)
diff --git a/modules/accord b/modules/accord
index 8ccce74581..6bae51f6a4 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 8ccce745818cf80c7cff82c3554e4a88e9e540db
+Subproject commit 6bae51f6a4bd560a82840fa0809cd1d630696cc0
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 0c4eb44496..09100da8d3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -5505,7 +5505,7 @@ public class DatabaseDescriptor
public static boolean getAccordTransactionsEnabled()
{
- return conf == null ? false : conf.accord.enabled;
+ return conf != null && conf.accord.enabled;
}
public static void setAccordTransactionsEnabled(boolean b)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 2a96bcc16d..caff467cd7 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -132,7 +132,7 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn
produced no mutation, and its reads do not return to the caller; ignoring...";
public static final String WRITE_TXN_EMPTY_WITH_NO_READS = "Write txn
produced no mutation, and had no reads; ignoring...";
- private static NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1,
TimeUnit.MINUTES);
+ private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1,
TimeUnit.MINUTES);
static class NamedSelect
{
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index de989d7f6e..c3f08de148 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -2186,7 +2186,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
Map<TokenKey, List<ShardAndEpochs>> startLookup = null;
for (ActiveEpoch epoch : snapshot)
{
- Topology topology = epoch.global();
+ Topology topology = epoch.all();
for (Shard shard : topology.shards())
{
Range range = shard.range;
diff --git a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
index 21f1297584..3df852af9d 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
@@ -90,7 +90,7 @@ public class AccordCacheMetrics
public AccordCacheMetrics(String subTypeName)
{
DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE,
subTypeName);
- this.objectSize =
Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
+ this.objectSize =
Metrics.shardedHistogram(factory.createMetricName("EntrySize"), false);
this.hits = Metrics.gauge(factory.createMetricName("Hits"),
hitRate::totalHits);
this.misses = Metrics.gauge(factory.createMetricName("Misses"),
hitRate::totalMisses);
this.requests = Metrics.gauge(factory.createMetricName("Requests"),
hitRate::totalRequests);
diff --git
a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
index 1500517bcb..12c736614f 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
@@ -52,6 +52,7 @@ public class AccordCoordinatorMetrics
public static final String COORDINATOR_PREACCEPT_LATENCY =
"PreAcceptLatency";
public static final String COORDINATOR_EXECUTE_LATENCY = "ExecuteLatency";
public static final String COORDINATOR_APPLY_LATENCY = "ApplyLatency";
+ public static final String EPHEMERAL = "Ephemeral";
public static final String FAST_PATHS = "FastPaths";
public static final String MEDIUM_PATHS = "MediumPaths";
public static final String SLOW_PATHS = "SlowPaths";
@@ -98,6 +99,11 @@ public class AccordCoordinatorMetrics
*/
public final Histogram tables;
+ /**
+ * The number of ephemeral transactions executed on this coordinator.
+ */
+ public final Meter ephemeral;
+
/**
* The number of fast path transactions executed on this coordinator.
*/
@@ -159,6 +165,7 @@ public class AccordCoordinatorMetrics
keys =
Metrics.histogram(coordinator.createMetricName(COORDINATOR_KEYS), true);
tables =
Metrics.histogram(coordinator.createMetricName(COORDINATOR_TABLES), true);
+ ephemeral = Metrics.meter(coordinator.createMetricName(EPHEMERAL));
fastPaths = Metrics.meter(coordinator.createMetricName(FAST_PATHS));
mediumPaths =
Metrics.meter(coordinator.createMetricName(MEDIUM_PATHS));
slowPaths = Metrics.meter(coordinator.createMetricName(SLOW_PATHS));
@@ -168,7 +175,7 @@ public class AccordCoordinatorMetrics
invalidations =
Metrics.meter(coordinator.createMetricName(INVALIDATIONS));
recoveryDelay =
Metrics.timer(coordinator.createMetricName(RECOVERY_DELAY));
recoveryDuration =
Metrics.timer(coordinator.createMetricName(RECOVERY_TIME));
- fastPathToTotal = new RatioGaugeSet(fastPaths,
RatioGaugeSet.sum(fastPaths, mediumPaths, slowPaths), coordinator,
FAST_PATH_TO_TOTAL + ".%s");
+ fastPathToTotal = new RatioGaugeSet(fastPaths,
RatioGaugeSet.sum(ephemeral, fastPaths, mediumPaths, slowPaths), coordinator,
FAST_PATH_TO_TOTAL + ".%s");
}
@Override
@@ -238,6 +245,7 @@ public class AccordCoordinatorMetrics
{
switch (path)
{
+ case EPHEMERAL: metrics.ephemeral.mark(); break;
case FAST: metrics.fastPaths.mark(); break;
case MEDIUM: metrics.mediumPaths.mark(); break;
case SLOW: metrics.slowPaths.mark(); break;
diff --git a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
index c916c3239a..94d8e43ab5 100644
--- a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
@@ -58,4 +58,8 @@ public class AccordExecutorMetrics
waitingToRun =
Metrics.register(factory.createMetricName("WaitingToRun"),
gauges.newGauge(AccordExecutor::unsafeWaitingToRunCount, Long::sum));
running = Metrics.register(factory.createMetricName("Running"),
gauges.newGauge(AccordExecutor::unsafeRunningCount, Long::sum));
}
+
+ public static void touch()
+ {
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
index 313ed92bb2..1ef552588f 100644
--- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
@@ -21,12 +21,14 @@ package org.apache.cassandra.metrics;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
+import accord.api.SystemEventListener;
import accord.impl.progresslog.DefaultProgressLog;
import accord.local.MaxDecidedRX;
import accord.local.RedundantBefore;
import accord.primitives.TxnId;
import accord.topology.TopologyManager;
import accord.utils.Invariants;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Gauge;
import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
@@ -44,7 +46,7 @@ import static
org.apache.cassandra.metrics.AccordMetricUtils.fromDurabilityServi
import static
org.apache.cassandra.metrics.AccordMetricUtils.fromTopologyManager;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-public class AccordSystemMetrics
+public class AccordSystemMetrics implements SystemEventListener
{
public final static AccordSystemMetrics metrics = new
AccordSystemMetrics();
private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(30);
@@ -52,6 +54,10 @@ public class AccordSystemMetrics
public static final String ACCORD_SYSTEM = "AccordSystem";
public static final String MIN_EPOCH = "MinEpoch";
public static final String MAX_EPOCH = "MaxEpoch";
+ public static final String MAX_PENDING_EPOCH = "MaxPendingEpoch";
+ public static final String ERRORS = "Errors";
+ public static final String EPOCH_WAITS = "EpochWaits";
+ public static final String EPOCH_TIMEOUTS = "EpochTimeouts";
public static final String PROGRESS_LOG_ACTIVE = "ProgressLogActive";
public static final String PROGRESS_LOG_SIZE = "ProgressLogSize";
public static final String PROGRESS_LOG_AGE = "ProgressLogAge";
@@ -66,6 +72,10 @@ public class AccordSystemMetrics
public final Gauge<Long> minEpoch;
public final Gauge<Long> maxEpoch;
+ public final Gauge<Long> maxPendingEpoch;
+ public final Counter errors;
+ public final Counter epochWaits;
+ public final Counter epochTimeouts;
public final Gauge<Long> progressLogActive;
public final Gauge<Long> durabilityQueueActive;
public final Gauge<Long> durabilityQueuePending;
@@ -142,17 +152,33 @@ public class AccordSystemMetrics
DefaultNameFactory factory = new DefaultNameFactory(ACCORD_SYSTEM);
minEpoch = Metrics.gauge(factory.createMetricName(MIN_EPOCH),
fromTopologyManager(TopologyManager::minEpoch));
maxEpoch = Metrics.gauge(factory.createMetricName(MAX_EPOCH),
fromTopologyManager(TopologyManager::epoch));
+ maxPendingEpoch =
Metrics.gauge(factory.createMetricName(MAX_PENDING_EPOCH),
fromTopologyManager(TopologyManager::pendingEpoch));
+ errors = Metrics.counter(factory.createMetricName(ERRORS));
+ epochTimeouts =
Metrics.counter(factory.createMetricName(EPOCH_TIMEOUTS));
+ epochWaits = Metrics.counter(factory.createMetricName(EPOCH_WAITS));
durabilityQueueActive =
Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_ACTIVE),
fromDurabilityService(durability -> (long)durability.queue().activeCount()));
durabilityQueuePending =
Metrics.gauge(factory.createMetricName(DURABILITY_QUEUE_PENDING),
fromDurabilityService(durability -> (long)durability.queue().pendingCount()));
progressLogActive =
Metrics.gauge(factory.createMetricName(PROGRESS_LOG_ACTIVE),
fromDurabilityService(durability -> (long)durability.queue().activeCount()));
- progressLogSize =
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () ->
maybeRefreshHistograms().progressLogSize);
- progressLogAge =
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () ->
maybeRefreshHistograms().progressLogAge);
- syncPointAgreedLag =
Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () ->
maybeRefreshHistograms().syncPointAgreedLag);
- locallyAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () ->
maybeRefreshHistograms().locallyAppliedLag);
- locallyDurableLag =
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () ->
maybeRefreshHistograms().locallyDurableLag);
- quorumAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () ->
maybeRefreshHistograms().quorumAppliedLag);
- shardAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () ->
maybeRefreshHistograms().shardAppliedLag);
- gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), ()
-> maybeRefreshHistograms().gcLag);
+ progressLogSize =
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_SIZE), () ->
maybeRefreshHistograms().progressLogSize, false);
+ progressLogAge =
Metrics.onDemandHistogram(factory.createMetricName(PROGRESS_LOG_AGE), () ->
maybeRefreshHistograms().progressLogAge, false);
+ syncPointAgreedLag =
Metrics.onDemandHistogram(factory.createMetricName(SYNCPOINT_AGREED_LAG), () ->
maybeRefreshHistograms().syncPointAgreedLag, false);
+ locallyAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_APPLIED_LAG), () ->
maybeRefreshHistograms().locallyAppliedLag, false);
+ locallyDurableLag =
Metrics.onDemandHistogram(factory.createMetricName(LOCALLY_DURABLE_LAG), () ->
maybeRefreshHistograms().locallyDurableLag, false);
+ quorumAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(QUORUM_APPLIED_LAG), () ->
maybeRefreshHistograms().quorumAppliedLag, false);
+ shardAppliedLag =
Metrics.onDemandHistogram(factory.createMetricName(SHARD_APPLIED_LAG), () ->
maybeRefreshHistograms().shardAppliedLag, false);
+ gcLag = Metrics.onDemandHistogram(factory.createMetricName(GC_LAG), ()
-> maybeRefreshHistograms().gcLag, false);
+ }
+
+ @Override
+ public void onWaitingForEpoch(long epoch)
+ {
+ epochWaits.inc();
+ }
+
+ @Override
+ public void onTimeoutForEpoch(long epoch, int count)
+ {
+ epochTimeouts.inc(count);
}
private synchronized Snapshot maybeRefreshHistograms()
@@ -190,6 +216,8 @@ public class AccordSystemMetrics
for (int i = 0 ; i < redundantBefore.size() ; ++i)
{
RedundantBefore.Bounds bounds = redundantBefore.valueAt(i);
+ if (bounds == null)
+ continue;
builder.locallyAppliedLag.increment(ageSeconds(nowSeconds,
bounds.maxBound(LOCALLY_APPLIED)));
builder.locallyDurableLag.increment(ageSeconds(nowSeconds,
bounds.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE,
LOCALLY_DURABLE_TO_COMMAND_STORE)));
builder.quorumAppliedLag.increment(ageSeconds(nowSeconds,
bounds.maxBound(QUORUM_APPLIED)));
diff --git
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index fc50797005..c11fd5d018 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -48,7 +48,6 @@ import com.codahale.metrics.Metered;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Timer;
import org.apache.cassandra.db.virtual.CollectionVirtualTableAdapter;
import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.db.virtual.model.CounterMetricRow;
@@ -177,8 +176,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
return Double.toString(((OverrideHistogram)
metric).getSnapshot().getMedian());
else if (metric instanceof Meter)
return Long.toString(((Meter) metric).getCount());
- else if (metric instanceof Timer)
- return Long.toString(((Timer) metric).getCount());
+ else if (metric instanceof com.codahale.metrics.Timer)
+ return Long.toString(((com.codahale.metrics.Timer)
metric).getCount());
else
throw new IllegalStateException("Unknown metric type: " +
metric.getClass().getName());
}
@@ -357,14 +356,14 @@ public class CassandraMetricsRegistry extends
MetricRegistry
return histogram;
}
- public ShardedHistogram shardedHistogram(MetricName name)
+ public ShardedHistogram shardedHistogram(MetricName name, boolean
isCumulative)
{
- return register(name, new ShardedHistogram());
+ return register(name, new ShardedHistogram(isCumulative));
}
- public OnDemandHistogram onDemandHistogram(MetricName name,
Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot)
+ public OnDemandHistogram onDemandHistogram(MetricName name,
Supplier<LogLinearHistogram.LogLinearSnapshot> snapshot, boolean isCumulative)
{
- return register(name, new OnDemandHistogram(snapshot));
+ return register(name, new OnDemandHistogram(snapshot, isCumulative));
}
public <T extends Gauge<?>> T gauge(MetricName name, T gauge)
@@ -379,7 +378,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
return gaugeLoc;
}
- public Timer timer(MetricName name)
+ public SnapshottingTimer timer(MetricName name)
{
return timer(name, DEFAULT_TIMER_UNIT);
}
@@ -773,17 +772,22 @@ public class CassandraMetricsRegistry extends
MetricRegistry
}
/**
- * Returns a histogram describing the values recorded since the last
time this method was called.
+ * If the Histogram has cumulative data, returns a histogram
describing the values recorded since the last time this method was called.
*
- * ex. If the counts are [0, 1, 2, 1] at the time the first caller
arrives, but change to [1, 2, 3, 2] by the
+ * ex. If the counts are [0, 1, 2, 1] at the time the first caller
arrives, but change to [1, 2, 3, 2] by the
* time a second caller arrives, the second caller will receive [1, 1,
1, 1].
*
+ * If the Histogram does not have cumulative data, simply returns the
current snapshot.
+ *
* @return a histogram whose bucket offsets are assumed to be in
nanoseconds
*/
@Override
public synchronized long[] getRecentValues()
{
- long[] now = metric.getSnapshot().getValues();
+ long[] now = values();
+ if (!metric.isCumulative())
+ return now;
+
long[] delta = delta(now, last);
last = now;
return delta;
@@ -968,6 +972,12 @@ public class CassandraMetricsRegistry extends
MetricRegistry
long[] getRecentValues();
String getDurationUnit();
+
+ String bucketsId();
+
+ long[] rawBuckets(int count);
+
+ long[] rawValues();
}
static class JmxTimer extends JmxMeter implements JmxTimerMBean
@@ -1049,7 +1059,10 @@ public class CassandraMetricsRegistry extends
MetricRegistry
@Override
public long[] values()
{
- return metric.getSnapshot().getValues();
+ long[] values = metric.getSnapshot().getValues();
+ if (metric.bucketStrategy() ==
CassandraReservoir.BucketStrategy.log_linear)
+ values =
metric.bucketStrategy().translateTo(CassandraReservoir.BucketStrategy.exp_12_nozero,
values);
+ return values;
}
/**
@@ -1063,7 +1076,10 @@ public class CassandraMetricsRegistry extends
MetricRegistry
@Override
public synchronized long[] getRecentValues()
{
- long[] now = metric.getSnapshot().getValues();
+ long[] now = values();
+ if (!metric.isCumulative())
+ return now;
+
long[] delta = delta(now, last);
last = now;
return delta;
@@ -1074,6 +1090,24 @@ public class CassandraMetricsRegistry extends
MetricRegistry
{
return durationUnit;
}
+
+ @Override
+ public String bucketsId()
+ {
+ return metric.bucketStrategy().name();
+ }
+
+ @Override
+ public long[] rawBuckets(int count)
+ {
+ return metric.bucketStarts(count);
+ }
+
+ @Override
+ public long[] rawValues()
+ {
+ return metric.getSnapshot().getValues();
+ }
}
/**
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
index e4abd40bbf..dda9844752 100644
--- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -24,9 +24,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import com.codahale.metrics.Counter;
-import com.codahale.metrics.Reservoir;
import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -169,12 +167,11 @@ public class LatencyMetrics
Metrics.remove(factory.createMetricName(namePrefix + "TotalLatency"));
}
- public class LatencyMetricsTimer extends Timer
+ public class LatencyMetricsTimer extends OverrideTimer implements
org.apache.cassandra.metrics.Timer
{
-
long releasedLatencyCount = 0;
- public LatencyMetricsTimer(Reservoir reservoir)
+ public LatencyMetricsTimer(CassandraReservoir reservoir)
{
super(reservoir);
}
diff --git a/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
b/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
index 4ad890e3c0..f00966824b 100644
--- a/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/OnDemandHistogram.java
@@ -28,9 +28,11 @@ import static
org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy.log
public class OnDemandHistogram extends OverrideHistogram
{
final Supplier<LogLinearSnapshot> snapshot;
- protected OnDemandHistogram(Supplier<LogLinearSnapshot> snapshot)
+ final boolean isCumulative;
+ protected OnDemandHistogram(Supplier<LogLinearSnapshot> snapshot, boolean
isCumulative)
{
this.snapshot = snapshot;
+ this.isCumulative = isCumulative;
}
@Override
@@ -56,4 +58,10 @@ public class OnDemandHistogram extends OverrideHistogram
{
return LogLinearHistogram.bucketsWithLength(length);
}
+
+ @Override
+ public boolean isCumulative()
+ {
+ return isCumulative;
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
index 2553d5e01f..e45480247d 100644
--- a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.metrics;
import com.codahale.metrics.Snapshot;
import org.agrona.UnsafeAccess;
-public abstract class OverrideHistogram extends com.codahale.metrics.Histogram
+public class OverrideHistogram extends com.codahale.metrics.Histogram
{
private static final CassandraReservoir NO_RESERVOIR = new
CassandraReservoir() {
@Override public Snapshot getPercentileSnapshot() { return null; }
@@ -61,4 +61,9 @@ public abstract class OverrideHistogram extends
com.codahale.metrics.Histogram
{
return reservoir.buckets(length);
}
+
+ public boolean isCumulative()
+ {
+ return true;
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/OverrideMeter.java
b/src/java/org/apache/cassandra/metrics/OverrideMeter.java
new file mode 100644
index 0000000000..4daae7e1b7
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/OverrideMeter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Clock;
+
+public class OverrideMeter extends com.codahale.metrics.Meter implements Meter
+{
+ public OverrideMeter(Clock clock)
+ {
+ super(clock);
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
b/src/java/org/apache/cassandra/metrics/OverrideTimer.java
similarity index 88%
copy from src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
copy to src/java/org/apache/cassandra/metrics/OverrideTimer.java
index 771c68b04f..e2fe1e45fd 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
+++ b/src/java/org/apache/cassandra/metrics/OverrideTimer.java
@@ -34,18 +34,18 @@ import com.codahale.metrics.Snapshot;
* NOTE: Dropwizard Timer is a concrete class and there is no an interface for
Dropwizard Timer logic,
* so we have to create an alternative hierarchy.
*/
-public class ThreadLocalTimer extends com.codahale.metrics.Timer implements
Timer
+public class OverrideTimer extends com.codahale.metrics.Timer implements Timer
{
- private final Meter meter;
- private final ThreadLocalHistogram histogram;
+ protected final Meter meter;
+ protected final OverrideHistogram histogram;
// usually we need precise clocks for timing, so we do not replace it with
an approximate version
- private final MetricClock clock;
+ protected final MetricClock clock;
/**
* Creates a new {@link Timer} using an {@link
ExponentiallyDecayingReservoir} and the default
* {@link MetricClock}.
*/
- public ThreadLocalTimer()
+ public OverrideTimer()
{
this(new DecayingEstimatedHistogramReservoir());
}
@@ -55,7 +55,7 @@ public class ThreadLocalTimer extends
com.codahale.metrics.Timer implements Time
*
* @param reservoir the {@link Reservoir} implementation the timer should
use
*/
- public ThreadLocalTimer(CassandraReservoir reservoir)
+ public OverrideTimer(CassandraReservoir reservoir)
{
this(reservoir, MetricClock.defaultClock());
}
@@ -66,14 +66,14 @@ public class ThreadLocalTimer extends
com.codahale.metrics.Timer implements Time
* @param reservoir the {@link Reservoir} implementation the timer should
use
* @param clock the {@link MetricClock} implementation the timer
should use
*/
- public ThreadLocalTimer(CassandraReservoir reservoir, MetricClock clock)
+ public OverrideTimer(CassandraReservoir reservoir, MetricClock clock)
{
// the precise clock is intentionally not propagated to
ThreadLocalMeter
// we do not need a precise and more expensive time within the meter
- this(new ThreadLocalMeter(), new ThreadLocalHistogram(reservoir),
clock);
+ this(new OverrideMeter(clock), new OverrideHistogram(reservoir),
clock);
}
- public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram,
MetricClock clock)
+ public OverrideTimer(Meter meter, OverrideHistogram histogram, MetricClock
clock)
{
// original Codahale meter and histogram are set to null to reduce
memory footprint
super(null, null, clock);
@@ -221,4 +221,19 @@ public class ThreadLocalTimer extends
com.codahale.metrics.Timer implements Time
meter.mark();
}
}
+
+ public CassandraReservoir.BucketStrategy bucketStrategy()
+ {
+ return histogram.bucketStrategy();
+ }
+
+ public long[] bucketStarts(int length)
+ {
+ return histogram.reservoir.buckets(length);
+ }
+
+ public boolean isCumulative()
+ {
+ return true;
+ }
}
diff --git
a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
index db91857b3c..92620fb034 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
@@ -74,6 +74,12 @@ public class ShardedDecayingHistograms
{
return shard.histograms.get(histogramIndex);
}
+
+ @Override
+ public boolean isCumulative()
+ {
+ return false;
+ }
}
public static class DecayingHistogramsShard
diff --git a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
index 66defdac56..3ac77a9717 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
@@ -58,11 +58,12 @@ public class ShardedHistogram extends OverrideHistogram
}
final CopyOnWriteArrayList<HistogramShard> shards = new
CopyOnWriteArrayList<>();
+ final boolean isCumulative;
final long initialMaxValue;
- public ShardedHistogram()
+ public ShardedHistogram(boolean isCumulative)
{
- this(1 << 16);
+ this(isCumulative, 1 << 16);
}
@Override
@@ -77,8 +78,9 @@ public class ShardedHistogram extends OverrideHistogram
return LogLinearHistogram.bucketsWithLength(length);
}
- public ShardedHistogram(long initialMaxValue)
+ public ShardedHistogram(boolean isCumulative, long initialMaxValue)
{
+ this.isCumulative = isCumulative;
this.initialMaxValue = initialMaxValue;
}
@@ -130,4 +132,10 @@ public class ShardedHistogram extends OverrideHistogram
{
return maybeRefresh();
}
+
+ @Override
+ public boolean isCumulative()
+ {
+ return isCumulative;
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
index 771c68b04f..c653a1dc60 100644
--- a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
@@ -18,14 +18,8 @@
package org.apache.cassandra.metrics;
-import java.time.Duration;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Reservoir;
-import com.codahale.metrics.Snapshot;
/**
* An alternative to Dropwizard Timer which implements the same kind of API.
@@ -34,13 +28,8 @@ import com.codahale.metrics.Snapshot;
* NOTE: Dropwizard Timer is a concrete class and there is no an interface for
Dropwizard Timer logic,
* so we have to create an alternative hierarchy.
*/
-public class ThreadLocalTimer extends com.codahale.metrics.Timer implements
Timer
+public class ThreadLocalTimer extends OverrideTimer
{
- private final Meter meter;
- private final ThreadLocalHistogram histogram;
- // usually we need precise clocks for timing, so we do not replace it with
an approximate version
- private final MetricClock clock;
-
/**
* Creates a new {@link Timer} using an {@link
ExponentiallyDecayingReservoir} and the default
* {@link MetricClock}.
@@ -76,149 +65,6 @@ public class ThreadLocalTimer extends
com.codahale.metrics.Timer implements Time
public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram,
MetricClock clock)
{
// original Codahale meter and histogram are set to null to reduce
memory footprint
- super(null, null, clock);
- this.meter = meter;
- this.histogram = histogram;
- this.clock = clock;
- }
-
- /**
- * Adds a recorded duration.
- *
- * @param duration the length of the duration
- * @param unit the scale unit of {@code duration}
- */
- @Override
- public void update(long duration, TimeUnit unit)
- {
- update(unit.toNanos(duration));
- }
-
- /**
- * Adds a recorded duration.
- *
- * @param duration the {@link Duration} to add to the timer. Negative or
zero value are ignored.
- */
- @Override
- public void update(Duration duration)
- {
- update(duration.toNanos());
- }
-
- /**
- * Times and records the duration of event.
- *
- * @param event a {@link Callable} whose {@link Callable#call()} method
implements a process
- * whose duration should be timed
- * @param <T> the type of the value returned by {@code event}
- * @return the value returned by {@code event}
- * @throws Exception if {@code event} throws an {@link Exception}
- */
- @Override
- public <T> T time(Callable<T> event) throws Exception
- {
- final long startTime = clock.getTick();
- try
- {
- return event.call();
- }
- finally
- {
- update(clock.getTick() - startTime);
- }
- }
-
- /**
- * Times and records the duration of event. Should not throw exceptions,
for that use the
- * {@link #time(Callable)} method.
- *
- * @param event a {@link Supplier} whose {@link Supplier#get()} method
implements a process
- * whose duration should be timed
- * @param <T> the type of the value returned by {@code event}
- * @return the value returned by {@code event}
- */
- @Override
- public <T> T timeSupplier(Supplier<T> event)
- {
- final long startTime = clock.getTick();
- try
- {
- return event.get();
- }
- finally
- {
- update(clock.getTick() - startTime);
- }
- }
-
- /**
- * Times and records the duration of event.
- *
- * @param event a {@link Runnable} whose {@link Runnable#run()} method
implements a process
- * whose duration should be timed
- */
- @Override
- public void time(Runnable event)
- {
- final long startTime = clock.getTick();
- try
- {
- event.run();
- }
- finally
- {
- update(clock.getTick() - startTime);
- }
- }
-
- @Override
- public Timer.Context startTime()
- {
- return new Timer.Context(this, clock);
- }
-
- @Override
- public long getCount()
- {
- return histogram.getCount();
- }
-
- @Override
- public double getFifteenMinuteRate()
- {
- return meter.getFifteenMinuteRate();
- }
-
- @Override
- public double getFiveMinuteRate()
- {
- return meter.getFiveMinuteRate();
- }
-
- @Override
- public double getMeanRate()
- {
- return meter.getMeanRate();
- }
-
- @Override
- public double getOneMinuteRate()
- {
- return meter.getOneMinuteRate();
- }
-
- @Override
- public Snapshot getSnapshot()
- {
- return histogram.getSnapshot();
- }
-
- private void update(long duration)
- {
- if (duration >= 0)
- {
- histogram.update(duration);
- meter.mark();
- }
+ super(meter, histogram, clock);
}
}
diff --git a/src/java/org/apache/cassandra/metrics/Timer.java
b/src/java/org/apache/cassandra/metrics/Timer.java
index ed764c03c9..84369be485 100644
--- a/src/java/org/apache/cassandra/metrics/Timer.java
+++ b/src/java/org/apache/cassandra/metrics/Timer.java
@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import com.codahale.metrics.Metered;
import com.codahale.metrics.Sampling;
+import org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy;
/**
* An interface which mimics {@link com.codahale.metrics.Timer} API and allows
alternative implementations
@@ -70,6 +71,10 @@ public interface Timer extends Metered, Sampling
}
}
+ BucketStrategy bucketStrategy();
+ long[] bucketStarts(int length);
+ boolean isCumulative();
+
void update(long duration, TimeUnit unit);
void update(Duration duration);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d88314c320..2e34080f99 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -320,7 +320,7 @@ public class AccordCommandStore extends CommandStore
RedundantBefore.QuickBounds bounds =
unsafeGetRedundantBefore().get(key);
if (bounds == null)
return cfk; // TODO (required): I don't think this should be
possible? but we hit it on some test
- return cfk.withRedundantBeforeAtLeast(bounds.gcBefore, false);
+ return cfk.withGcBeforeAtLeast(bounds.gcBefore, false);
}
boolean validateCommandsForKey(RoutableKey key, CommandsForKey evicting)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 0de343feb7..783299dc4f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -303,7 +303,7 @@ public class AccordKeyspace
return null;
// TODO (desired): consider whether better to not compact any
validation failures, since we expect is already overwritten
- CommandsForKey updated =
current.withRedundantBeforeAtLeast(redundantBefore.gcBefore(), false);
+ CommandsForKey updated =
current.withGcBeforeAtLeast(redundantBefore.gcBefore(), false);
if (current == updated)
return row;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index e7e5762fb1..e9874fae0a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -46,6 +46,7 @@ import accord.primitives.Txn;
import org.apache.cassandra.config.AccordSpec;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
+import org.apache.cassandra.metrics.AccordExecutorMetrics;
import org.apache.cassandra.metrics.AccordReplicaMetrics;
import org.apache.cassandra.metrics.AccordSystemMetrics;
import org.apache.cassandra.service.accord.api.AccordViolationHandler;
@@ -161,51 +162,79 @@ public class AccordService implements IAccordService,
Shutdownable
// Listener is initialized before Accord is initialized
public static MetadataChangeListener instance = new
MetadataChangeListener();
- private MetadataChangeListener() {}
-
- private final AtomicReference<ChangeListener> collector = new
AtomicReference<>(new PreInitStateCollector());
-
- @Override
- public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata
next, boolean fromSnapshot)
- {
- collector.get().notifyPreCommit(prev, next, fromSnapshot);
- }
-
- @Override
- public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata
next, boolean fromSnapshot)
+ interface Sink
{
- collector.get().notifyPostCommit(prev, next, fromSnapshot);
- }
-
- @VisibleForTesting
- public void resetForTesting(ClusterMetadata metadata)
- {
- PreInitStateCollector stateCollector = new PreInitStateCollector();
- stateCollector.items.add(metadata);
- collector.set(stateCollector);
+ Sink update(ClusterMetadata next);
}
/**
* Collects TCM events from startup util full Accord initialization to
avoid races with TCM and creating gaps between
* epochs restored from journal and reported by TCM.
**/
-
- static class PreInitStateCollector implements ChangeListener
+ static final class Collector implements Sink
{
- private final List<ClusterMetadata> items = new ArrayList<>(4);
+ static final Collector EMPTY = new Collector(new
ClusterMetadata[0]);
+ final ClusterMetadata[] saved;
+
+ Collector(ClusterMetadata[] saved)
+ {
+ this.saved = saved;
+ }
@Override
- public synchronized void notifyPostCommit(ClusterMetadata prev,
ClusterMetadata next, boolean fromSnapshot)
+ public Sink update(ClusterMetadata next)
{
- logger.debug("Saving epoch {} to deliver after startup",
next.epoch);
- items.add(next);
+ ClusterMetadata[] newSaved = Arrays.copyOf(saved, saved.length
+ 1);
+ newSaved[saved.length] = next;
+ return new Collector(newSaved);
}
- public synchronized List<ClusterMetadata> getItems()
+ public List<ClusterMetadata> getItems()
{
- return new ArrayList<>(items);
+ return Arrays.asList(saved);
}
}
+
+ private final AtomicReference<Sink> sink = new
AtomicReference<>(Collector.EMPTY);
+
+ private MetadataChangeListener() {}
+
+ @Override
+ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata
next, boolean fromSnapshot)
+ {
+ while (true)
+ {
+ Sink curSink = sink.get();
+ Sink nextSink = curSink.update(next);
+ if (nextSink == curSink || sink.compareAndSet(curSink,
nextSink))
+ return;
+ }
+ }
+
+ Collector replaceSinkIfEmpty(Sink newSink)
+ {
+ while (true)
+ {
+ Sink curSink = sink.get();
+ Invariants.require(curSink instanceof Collector);
+ if (curSink == Collector.EMPTY)
+ {
+ if (sink.compareAndSet(curSink, newSink))
+ return null;
+ }
+ else
+ {
+ if (sink.compareAndSet(curSink, Collector.EMPTY))
+ return (Collector) curSink;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void unsafeResetForTesting(ClusterMetadata metadata)
+ {
+ sink.set(Collector.EMPTY.update(metadata));
+ }
}
private static final Logger logger =
LoggerFactory.getLogger(AccordService.class);
@@ -325,6 +354,7 @@ public class AccordService implements IAccordService,
Shutdownable
AccordReplicaMetrics.touch();
AccordSystemMetrics.touch();
+ AccordExecutorMetrics.touch();
AccordViolationHandler.setup();
return as;
}
@@ -422,6 +452,7 @@ public class AccordService implements IAccordService,
Shutdownable
journal.start(node);
node.load();
+
ClusterMetadata metadata = ClusterMetadata.current();
endpointMapper.updateMapping(metadata);
@@ -575,24 +606,28 @@ public class AccordService implements IAccordService,
Shutdownable
}
// Subscribe to TCM events, and collect any we may have missed to
report now
- ChangeListener prevListener =
MetadataChangeListener.instance.collector.getAndSet(new ChangeListener()
+ MetadataChangeListener.Sink sink = new
MetadataChangeListener.Sink()
{
@Override
- public void notifyPostCommit(ClusterMetadata prev,
ClusterMetadata next, boolean fromSnapshot)
+ public MetadataChangeListener.Sink update(ClusterMetadata next)
{
if (state != State.SHUTDOWN)
maybeReportMetadata(next);
+ return this;
}
- });
-
- Invariants.require((prevListener instanceof
MetadataChangeListener.PreInitStateCollector),
- "Listener should have been initialized with
Accord pre-init state collector, but was " + prevListener.getClass());
+ };
- MetadataChangeListener.PreInitStateCollector preinit =
(MetadataChangeListener.PreInitStateCollector) prevListener;
- for (ClusterMetadata item : preinit.getItems())
+ while (true)
{
- if (item.epoch.getEpoch() > highestKnown)
- maybeReportMetadata(item);
+ MetadataChangeListener.Collector collector =
MetadataChangeListener.instance.replaceSinkIfEmpty(sink);
+ if (collector == null)
+ break;
+
+ for (ClusterMetadata item : collector.getItems())
+ {
+ if (item.epoch.getEpoch() > highestKnown)
+ maybeReportMetadata(item);
+ }
}
}
catch (InterruptedException e)
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 636535e576..67825bad6b 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -67,6 +67,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.RequestTimeoutException;
import org.apache.cassandra.metrics.AccordReplicaMetrics;
+import org.apache.cassandra.metrics.AccordSystemMetrics;
import org.apache.cassandra.net.ResponseContext;
import org.apache.cassandra.service.RetryStrategy;
import org.apache.cassandra.service.accord.AccordService;
@@ -197,7 +198,11 @@ public class AccordAgent implements Agent,
OwnershipEventListener
public static void handleException(Throwable t)
{
- if (t instanceof RequestTimeoutException || t instanceof
CancellationException || t instanceof TimeoutException || t instanceof Timeout)
+ if (t instanceof RequestTimeoutException)
+ return;
+
+ AccordSystemMetrics.metrics.errors.inc();
+ if (t instanceof CancellationException || t instanceof
TimeoutException || t instanceof Timeout)
return;
JVMStabilityInspector.uncaughtException(Thread.currentThread(), t);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index 2c5f405205..c98ba63a4a 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -41,7 +41,6 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.topology.Topologies;
-import accord.topology.Topologies.SelectNodeOwnership;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.service.accord.AccordEndpointMapper;
import org.apache.cassandra.service.accord.txn.AccordUpdate;
@@ -90,12 +89,12 @@ public class AccordInteropAdapter extends TxnAdapter
}
@Override
- public void persist(Node node, SequentialAsyncExecutor executor,
Topologies any, Route<?> require, Route<?> sendTo, SelectNodeOwnership
selectSendTo, FullRoute<?> route, Ballot ballot, CoordinationFlags flags, TxnId
txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result result,
boolean informDurableOnDone, BiConsumer<? super Result, Throwable> callback)
+ public void persist(Node node, SequentialAsyncExecutor executor,
Topologies any, Route<?> require, Route<?> sendTo, FullRoute<?> route, Ballot
ballot, CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt,
Deps deps, Writes writes, Result result, boolean informDurableOnDone,
BiConsumer<? super Result, Throwable> callback)
{
- if (applyKind == Minimal && doInteropPersist(node, executor, any,
require, sendTo, selectSendTo, ballot, txnId, txn, executeAt, deps, writes,
result, route, informDurableOnDone, callback))
+ if (applyKind == Minimal && doInteropPersist(node, executor, any,
require, sendTo, ballot, txnId, txn, executeAt, deps, writes, result, route,
informDurableOnDone, callback))
return;
- super.persist(node, executor, any, require, sendTo, selectSendTo,
route, ballot, flags, txnId, txn, executeAt, deps, writes, result,
informDurableOnDone, callback);
+ super.persist(node, executor, any, require, sendTo, route, ballot,
flags, txnId, txn, executeAt, deps, writes, result, informDurableOnDone,
callback);
}
private boolean doInteropExecute(Node node, SequentialAsyncExecutor
executor, FullRoute<?> route, Ballot ballot, TxnId txnId, Txn txn, Timestamp
executeAt, Deps deps, BiConsumer<? super Result, Throwable> callback)
@@ -121,7 +120,7 @@ public class AccordInteropAdapter extends TxnAdapter
return true;
}
- private boolean doInteropPersist(Node node, SequentialAsyncExecutor
executor, Topologies any, Route<?> require, Route<?> sendTo,
SelectNodeOwnership selectSendTo, Ballot ballot, TxnId txnId, Txn txn,
Timestamp executeAt, Deps deps, Writes writes, Result result, FullRoute<?>
fullRoute, boolean informDurableOnDone, BiConsumer<? super Result, Throwable>
callback)
+ private boolean doInteropPersist(Node node, SequentialAsyncExecutor
executor, Topologies any, Route<?> require, Route<?> sendTo, Ballot ballot,
TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result
result, FullRoute<?> fullRoute, boolean informDurableOnDone, BiConsumer<? super
Result, Throwable> callback)
{
Update update = txn.update();
ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ?
((AccordUpdate) update).cassandraCommitCL() : null;
@@ -132,7 +131,7 @@ public class AccordInteropAdapter extends TxnAdapter
AccordInteropPersist persist;
try
{
- Topologies all = execution(node, any, sendTo, selectSendTo,
fullRoute, txnId, executeAt);
+ Topologies all = execution(node, any, sendTo, fullRoute, txnId,
executeAt);
persist = new AccordInteropPersist(node, executor, all, txnId,
require, ballot, txn, executeAt, deps, writes, result, fullRoute,
consistencyLevel, CoordinationFlags.none(), informDurableOnDone, Minimal,
callback);
}
catch (Throwable t)
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index cbc6182674..6872ed79e1 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -95,7 +95,7 @@ import org.apache.cassandra.transport.Dispatcher;
import static accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
import static accord.primitives.Txn.Kind.Write;
-import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
+import static accord.topology.SelectShards.LIVE;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.requireArgument;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadMetrics;
@@ -159,9 +159,9 @@ public class AccordInteropExecution implements
ReadCoordinator
// TODO (required): compare this to latest logic in Accord, make sure
it makes sense
ActiveEpochs epochs = node.topology().active();
- this.executes = epochs.forEpoch(route, executeAt.epoch(), SHARE);
+ this.executes = epochs.forEpoch(route, executeAt.epoch(), LIVE);
this.allTopologies = txnId.epoch() != executeAt.epoch()
- ? epochs.preciseEpochs(route, txnId.epoch(),
executeAt.epoch(), SHARE)
+ ? epochs.preciseEpochs(route, txnId.epoch(),
executeAt.epoch(), LIVE)
: executes;
this.executeTopology = executes.getEpoch(executeAt.epoch());
this.coordinateTopology = allTopologies.getEpoch(txnId.epoch());
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0e8f681a48..232db188db 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -926,7 +926,7 @@ public abstract class LocalLog implements Closeable
addListener(new MetadataSnapshotListener());
addListener(new ClientNotificationListener());
addListener(new UpgradeMigrationListener());
- if (DatabaseDescriptor.getAccord().enabled)
+ if (DatabaseDescriptor.getAccordTransactionsEnabled())
addListener(AccordService.MetadataChangeListener.instance);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a362938aba..d6479e1813 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -717,6 +717,9 @@ public class ClusterUtils
public static Epoch maxEpoch(ICluster<IInvokableInstance> cluster, int...
nodes)
{
+ if (nodes == null || nodes.length == 0)
+ return maxEpoch(cluster);
+
Epoch max = null;
for (int id : nodes)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 2577d02a7a..bdee5765ac 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.primitives.Unseekables;
+import accord.topology.SelectShards;
import accord.topology.Topologies;
import accord.topology.TopologyException;
import org.apache.cassandra.config.Config.PaxosVariant;
@@ -711,7 +712,7 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
Unseekables<?> routables =
AccordTestUtils.createTxn(sb.toString()).keys().toParticipants();
long epoch = AccordService.instance().topology().epoch();
Topologies topology;
- try { topology =
AccordService.instance().topology().active().withUnsyncedEpochs(routables,
epoch, epoch); }
+ try { topology =
AccordService.instance().topology().active().withUnsyncedEpochs(routables,
epoch, epoch, SelectShards.ALL); }
catch (TopologyException e) { throw new RuntimeException(e); }
// we don't detect out-of-bounds read/write yet, so use this
to validate we reach different shards
Assertions.assertThat(topology.totalShards()).isEqualTo(2);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
index 8443526e9f..01632f1cd3 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordSimpleFastPathTest.java
@@ -110,7 +110,7 @@ public class AccordSimpleFastPathTest extends TestBaseImpl
long epoch = cm.epoch.getEpoch();
TopologyManager tm = AccordService.instance().topology();
- Topology topology = tm.active().getKnown(epoch).global();
+ Topology topology = tm.active().getKnown(epoch).all();
Assert.assertFalse(topology.shards().isEmpty());
topology.shards().forEach(shard ->
Assert.assertEquals(idSet(1, 2, 3), shard.nodes.without(shard.notInFastPath)));
return cm.epoch.getEpoch();
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
index e4e6de559f..ce927926d9 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -124,7 +124,7 @@ public class AccordVirtualTablesTest extends CQLTester
// range is no longer "added" so doesn't show up as synced!
long e2 = 2;
- ActiveEpoch a2 = active(topology(e2, T1), EpochReady.done(e2),
a1.global());
+ ActiveEpoch a2 = active(topology(e2, T1), EpochReady.done(e2),
a1.all());
tm.unsafeSetActive(ActiveEpochs.unsafeNew(tm, new ActiveEpoch[] { a2,
a1 }, -1));
assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
List.of(), List.of(), List.of(), FULL_RANGE));
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index aa37658fc6..865d1b799e 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -518,7 +518,7 @@ public class RouteIndexTest extends CQLTester
AccordService startAccord()
{
ClusterMetadata metadata = ClusterMetadata.current();
-
AccordService.MetadataChangeListener.instance.resetForTesting(metadata);
+
AccordService.MetadataChangeListener.instance.unsafeResetForTesting(metadata);
NodeId tcmNodeId = metadata.myNodeId();
AccordService.unsafeSetNewAccordService(null);
AccordService.localStartup(tcmNodeId);
diff --git
a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
index 9f79d656c1..45348f2775 100644
--- a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
@@ -68,7 +68,7 @@ public class JmxVirtualTableMetricsTest extends CQLTester
metricToNameMap.put(MetricType.METER, registry.meter("meter"));
metricToNameMap.put(MetricType.COUNTER, registry.counter("counter"));
metricToNameMap.put(MetricType.HISTOGRAM,
registry.histogram("histogram", () -> new ClearableHistogram(new
DecayingEstimatedHistogramReservoir(true))));
- metricToNameMap.put(MetricType.TIMER, registry.timer("timer"));
+ metricToNameMap.put(MetricType.TIMER, registry.timer("timer", () ->
new SnapshottingTimer(new DecayingEstimatedHistogramReservoir())));
metricToNameMap.put(MetricType.GAUGE, registry.gauge("gauge", () ->
gaugeValue::get));
CassandraMetricsRegistry.metricGroups.forEach(group -> {
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index b94237befc..aee4fabb44 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -392,7 +392,7 @@ public class EpochSyncTest
.isFalse();
// validate topology manager
- Ranges ranges =
tm.active().getKnown(epoch).global().ranges().mergeTouching();
+ Ranges ranges =
tm.active().getKnown(epoch).all().ranges().mergeTouching();
Ranges actual =
tm.unsafeQuorumReady(epoch).mergeTouching();
Assertions.assertThat(actual)
.describedAs("node%s does not have all
expected sync ranges for epoch %d; missing %s", id, epoch,
ranges.without(actual))
@@ -404,7 +404,7 @@ public class EpochSyncTest
continue;
Assertions.assertThat(tm.active().hasEpoch(epoch)).describedAs("node%s does not
have epoch %d", id, epoch).isTrue();
- Topology topology =
tm.active().getKnown(epoch).global();
+ Topology topology = tm.active().getKnown(epoch).all();
Ranges ranges = topology.ranges().mergeTouching();
Ranges actual =
tm.unsafeQuorumReady(epoch).mergeTouching();
// TopologyManager defines syncComplete for an epoch
as (epoch - 1).syncComplete. This means that an epoch has reached quorum, but
will still miss ranges as previous epochs have not
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]