This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e768418696 Accord: Improve Tracing - Introduce pattern tracing,
that can intercept failed or new coordinations matching various filters -
Support additional tracing event collection modes (SAMPLE and RING)
e768418696 is described below
commit e76841869647c10c0e95ddef1e0f70f7ad16d7a8
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sun Sep 14 14:28:04 2025 +0100
Accord: Improve Tracing
- Introduce pattern tracing, that can intercept failed or new
coordinations matching various filters
- Support additional tracing event collection modes (SAMPLE and RING)
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20911
---
modules/accord | 2 +-
.../cassandra/db/virtual/AccordDebugKeyspace.java | 561 ++++++++++--
.../metrics/AccordCoordinatorMetrics.java | 2 -
.../cassandra/metrics/AccordReplicaMetrics.java | 2 -
.../cassandra/metrics/AccordSystemMetrics.java | 4 +
.../cassandra/service/accord/AccordService.java | 2 +
.../cassandra/service/accord/AccordTracing.java | 936 ++++++++++++++++++---
.../cassandra/service/accord/TokenRange.java | 17 +
.../cassandra/service/accord/api/AccordAgent.java | 13 +-
.../cassandra/service/accord/api/TokenKey.java | 15 +-
.../db/virtual/AccordDebugKeyspaceTest.java | 264 ++++--
11 files changed, 1525 insertions(+), 293 deletions(-)
diff --git a/modules/accord b/modules/accord
index 657f344eb3..e896c9c328 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 657f344eb3b3570966bf8cff7731bef6eeea98f1
+Subproject commit e896c9c328d3e8a3b7ddd85381edeaf1b4b8ccb2
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index 1f796bdd15..efe8c390cf 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -27,24 +27,31 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import com.google.common.collect.ImmutableSet;
+
import accord.coordinate.AbstractCoordination;
import accord.coordinate.Coordination;
+import accord.coordinate.Coordination.CoordinationKind;
import accord.coordinate.Coordinations;
import accord.coordinate.PrepareRecovery;
import accord.coordinate.tracking.AbstractTracker;
import accord.local.cfk.CommandsForKey.TxnInfo;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
import accord.primitives.RoutingKeys;
import accord.utils.SortedListMap;
+import accord.utils.TinyEnumSet;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TxnIdUtf8Type;
@@ -52,7 +59,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.RoutingKey;
-import accord.api.TraceEventType;
import accord.coordinate.FetchData;
import accord.coordinate.FetchRoute;
import accord.coordinate.MaybeRecover;
@@ -114,9 +120,14 @@ import org.apache.cassandra.service.accord.AccordJournal;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTracing;
+import org.apache.cassandra.service.accord.AccordTracing.BucketMode;
+import org.apache.cassandra.service.accord.AccordTracing.CoordinationKinds;
+import org.apache.cassandra.service.accord.AccordTracing.TracePattern;
+import org.apache.cassandra.service.accord.AccordTracing.TxnKindsAndDomains;
import org.apache.cassandra.service.accord.DebugBlockedTxns;
import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.JournalKey;
+import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.TokenKey;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
@@ -124,7 +135,6 @@ import
org.apache.cassandra.service.consensus.migration.TableMigrationState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.LocalizeString;
-import static accord.api.TraceEventType.RECOVER;
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
import static accord.local.RedundantStatus.Property.GC_BEFORE;
import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED;
@@ -151,13 +161,14 @@ import static
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class AccordDebugKeyspace extends VirtualKeyspace
{
- public static final String COORDINATIONS = "coordinations";
- public static final String EXECUTORS = "executors";
public static final String COMMANDS_FOR_KEY = "commands_for_key";
public static final String COMMANDS_FOR_KEY_UNMANAGED =
"commands_for_key_unmanaged";
+ public static final String CONSTANTS = "constants";
+ public static final String COORDINATIONS = "coordinations";
public static final String DURABILITY_SERVICE = "durability_service";
public static final String DURABLE_BEFORE = "durable_before";
public static final String EXECUTOR_CACHE = "executor_cache";
+ public static final String EXECUTORS = "executors";
public static final String JOURNAL = "journal";
public static final String MAX_CONFLICTS = "max_conflicts";
public static final String MIGRATION_STATE = "migration_state";
@@ -166,6 +177,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
public static final String REJECT_BEFORE = "reject_before";
public static final String TXN = "txn";
public static final String TXN_BLOCKED_BY = "txn_blocked_by";
+ public static final String TXN_PATTERN_TRACE = "txn_pattern_trace";
+ public static final String TXN_PATTERN_TRACES = "txn_pattern_traces";
public static final String TXN_TRACE = "txn_trace";
public static final String TXN_TRACES = "txn_traces";
public static final String TXN_OPS = "txn_ops";
@@ -194,11 +207,42 @@ public class AccordDebugKeyspace extends VirtualKeyspace
new TxnTable(),
new TxnTraceTable(),
new TxnTracesTable(),
- new TxnOpsTable()
+ new TxnPatternTraceTable(),
+ new TxnPatternTracesTable(),
+ new TxnOpsTable(),
+ new ConstantsTable()
));
}
- // TODO (desired): human readable packed key tracker (but requires loading
Txn, so might be preferable to only do conditionally)
+ public static final class ConstantsTable extends AbstractVirtualTable
+ {
+ private final SimpleDataSet dataSet;
+ private ConstantsTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, CONSTANTS,
+ "Accord Debug Keyspace Constants",
+ "CREATE TABLE %s (\n" +
+ " kind text,\n" +
+ " name text,\n" +
+ " description text,\n" +
+ " PRIMARY KEY (kind, name)" +
+ ')', UTF8Type.instance));
+ dataSet = new SimpleDataSet(metadata());
+
+ for (CoordinationKind coordinationKind : CoordinationKind.values())
+ dataSet.row("CoordinationKind", coordinationKind.name());
+
+ for (TxnOpsTable.Op op : TxnOpsTable.Op.values())
+ dataSet.row("Op", op.name()).column("description",
op.description);
+ }
+
+ @Override
+ public DataSet data()
+ {
+ return dataSet;
+ }
+ }
+
public static final class ExecutorsTable extends AbstractLazyVirtualTable
{
private ExecutorsTable()
@@ -913,7 +957,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
* UPDATE system_accord_debug.txn_trace SET permits = N WHERE txn_id = ?
AND event_type = ?
* SELECT * FROM system_accord_debug.txn_traces WHERE txn_id = ? AND
event_type = ?
*/
- public static final class TxnTraceTable extends AbstractMutableVirtualTable
+ public static final class TxnTraceTable extends
AbstractMutableLazyVirtualTable
{
private TxnTraceTable()
{
@@ -921,63 +965,111 @@ public class AccordDebugKeyspace extends VirtualKeyspace
"Accord Transaction Trace Configuration",
"CREATE TABLE %s (\n" +
" txn_id 'TxnIdUtf8Type',\n" +
- " event_type text,\n" +
- " permits int,\n" +
- " PRIMARY KEY (txn_id, event_type)" +
- ')', TxnIdUtf8Type.instance));
+ " bucket_mode text,\n" +
+ " bucket_seen int,\n" +
+ " bucket_size int,\n" +
+ " bucket_sub_size int,\n" +
+ " chance float,\n" +
+ " current_size int,\n" +
+ " trace_events text,\n" +
+ " managed_by_pattern boolean,\n" +
+ " PRIMARY KEY (txn_id)" +
+ ')', TxnIdUtf8Type.instance), FAIL, UNSORTED);
}
@Override
- public DataSet data()
+ protected void collect(PartitionsCollector collector)
{
AccordTracing tracing = tracing();
- SimpleDataSet dataSet = new SimpleDataSet(metadata());
- tracing.forEach(id -> true, (txnId, eventType, permits, events) ->
{
- dataSet.row(txnId.toString(),
eventType.toString()).column("permits", permits);
+ tracing.forEach(id -> true, (txnId, events) -> {
+ collector.row(txnId.toString())
+ .eagerCollect(columns -> {
+ columns.add("bucket_mode",
events.bucketMode().name())
+ .add("bucket_seen", events.bucketSeen())
+ .add("bucket_size", events.bucketSize())
+ .add("bucket_sub_size",
events.bucketSubSize())
+ .add("chance", events.chance())
+ .add("current_size", events.size())
+ .add("trace_events", events.traceEvents(),
TO_STRING)
+ .add("managed_by_pattern",
events.hasOwner())
+ ;
+ });
});
- return dataSet;
- }
-
- private AccordTracing tracing()
- {
- return ((AccordAgent)AccordService.instance().agent()).tracing();
}
@Override
- protected void applyPartitionDeletion(ColumnValues partitionKey)
+ protected void applyPartitionDeletion(Object[] partitionKey)
{
- TxnId txnId = TxnId.parse(partitionKey.value(0));
- tracing().erasePermits(txnId);
+ TxnId txnId = TxnId.parse((String)partitionKey[0]);
+ tracing().stopTracing(txnId);
}
@Override
- protected void applyRowDeletion(ColumnValues partitionKey,
ColumnValues clusteringColumns)
+ protected void applyRowDeletion(Object[] partitionKeys, Object[]
clusteringKeys)
{
- TxnId txnId = TxnId.parse(partitionKey.value(0));
- tracing().erasePermits(txnId,
parseEventType(clusteringColumns.value(0)));
+ TxnId txnId = TxnId.parse((String)partitionKeys[0]);
+ tracing().stopTracing(txnId);
}
@Override
- protected void applyColumnDeletion(ColumnValues partitionKey,
ColumnValues clusteringColumns, String columnName)
+ protected void applyRowUpdate(Object[] partitionKeys, @Nullable
Object[] clusteringKeys, ColumnMetadata[] columns, Object[] values)
{
- TxnId txnId = TxnId.parse(partitionKey.value(0));
- TraceEventType eventType =
parseEventType(clusteringColumns.value(0));
- tracing().erasePermits(txnId, eventType);
- }
+ TxnId txnId = TxnId.parse((String)partitionKeys[0]);
+ CoordinationKinds newTrace = null;
+ BucketMode newBucketMode = null;
+ boolean unsetManagedByOwner = false;
+ int newBucketSize = -1, newBucketSubSize = -1, newBucketSeen = -1;
+ float newChance = Float.NaN;
+ for (int i = 0 ; i < columns.length ; ++i)
+ {
+ String name = columns[i].name.toString();
+ switch (name)
+ {
+ default: throw new InvalidRequestException("Cannot update
'" + name + '\'');
+ case "bucket_mode":
+ newBucketMode = checkBucketMode(values[i]);
+ break;
+ case "chance":
+ newChance = checkChance(values[i], name);
+ break;
+ case "managed_by_pattern":
+ if (values[i] != null && (Boolean)values[i])
+ throw new InvalidRequestException("Can only unset
'" + name + '\'');
+ unsetManagedByOwner = true;
+ break;
+ case "bucket_size":
+ newBucketSize = checkNonNegative(values[i], name, 0);
+ break;
+ case "bucket_sub_size":
+ newBucketSubSize = checkNonNegative(values[i], name,
0);
+ break;
+ case "bucket_seen":
+ newBucketSeen = checkNonNegative(values[i], name, 0);
+ break;
+ case "trace_events":
+ newTrace = tryParseCoordinationKinds(values[i]);
+ break;
+ }
+ }
- @Override
- protected void applyColumnUpdate(ColumnValues partitionKey,
ColumnValues clusteringColumns, Optional<ColumnValue> columnValue)
- {
- TxnId txnId = TxnId.parse(partitionKey.value(0));
- TraceEventType eventType =
parseEventType(clusteringColumns.value(0));
- if (columnValue.isEmpty()) tracing().erasePermits(txnId,
eventType);
- else tracing().setPermits(txnId, eventType,
columnValue.get().value());
+ if (newBucketSize == 0)
+ {
+ if (newBucketMode != null || newBucketSeen > 0 ||
newBucketSubSize > 0 || !Float.isNaN(newChance) || (newTrace != null &&
!newTrace.isEmpty()))
+ throw new InvalidRequestException("Setting bucket size to
zero clears config; cannot set other fields.");
+ tracing().stopTracing(txnId);
+ }
+ else
+ {
+ if (newBucketSubSize == 0)
+ throw new InvalidRequestException("Cannot set
bucket_sub_size to zero.");
+ tracing().set(txnId, newTrace, newBucketMode, newBucketSize,
newBucketSubSize, newBucketSeen, newChance, unsetManagedByOwner);
+ }
}
@Override
public void truncate()
{
- tracing().eraseAllEvents();
+ tracing().eraseAllBuckets();
}
}
@@ -989,20 +1081,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace
"Accord Transaction Traces",
"CREATE TABLE %s (\n" +
" txn_id 'TxnIdUtf8Type',\n" +
- " event_type text,\n" +
" id_micros bigint,\n" +
+ " event text,\n" +
" at_micros bigint,\n" +
" command_store_id int,\n" +
" message text,\n" +
- " PRIMARY KEY (txn_id, event_type, id_micros,
at_micros)" +
+ " PRIMARY KEY (txn_id, id_micros, event, at_micros)" +
')', TxnIdUtf8Type.instance), FAIL, UNSORTED,
UNSORTED);
}
- private AccordTracing tracing()
- {
- return ((AccordAgent)AccordService.instance().agent()).tracing();
- }
-
@Override
protected void applyPartitionDeletion(Object[] partitionKeys)
{
@@ -1014,21 +1101,22 @@ public class AccordDebugKeyspace extends VirtualKeyspace
protected void applyRangeTombstone(Object[] partitionKeys, Object[]
starts, boolean startInclusive, Object[] ends, boolean endInclusive)
{
TxnId txnId = TxnId.parse((String) partitionKeys[0]);
- if (!startInclusive) throw invalidRequest("May restrict deletion
by at most one event_type");
- if (starts.length != 1) throw invalidRequest("Deletion restricted
by lower bound on id_micros or at_micros is unsupported");
- if (ends.length == 0 || (ends.length == 1 && !endInclusive)) throw
invalidRequest("Range deletion must specify one event_type");
- if (!ends[0].equals(starts[0])) throw invalidRequest("May restrict
deletion by at most one event_type");
- if (ends.length > 2) throw invalidRequest("Deletion restricted by
upper bound on at_micros is unsupported");
- TraceEventType eventType = parseEventType((String) starts[0]);
- if (ends.length == 1)
+ if (starts.length > 1 || ends.length > 1) throw
invalidRequest("May only delete on txn_id and id_micros");
+
+ long minId = Long.MIN_VALUE, maxId = Long.MAX_VALUE;
+ if (starts.length == 1)
{
- tracing().eraseEvents(txnId, eventType);
+ minId = ((Long)starts[0]);
+ if (!startInclusive && minId < Long.MAX_VALUE)
+ ++minId;
}
- else
+ if (ends.length == 1)
{
- long before = (Long)ends[1];
- tracing().eraseEventsBefore(txnId, eventType, before);
+ maxId = ((Long)ends[0]);
+ if (!endInclusive && maxId > Long.MIN_VALUE)
+ --maxId;
}
+ tracing().eraseEventsBetween(txnId, minId, maxId);
}
@Override
@@ -1040,11 +1128,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
@Override
public void collect(PartitionsCollector collector)
{
- tracing().forEach(id -> true, (txnId, eventType, permits, events)
-> {
+ tracing().forEach(id -> true, (txnId, events) -> {
events.forEach(e -> {
if (e.messages().isEmpty())
{
- collector.row(txnId.toString(), eventType.name(),
e.idMicros, 0L)
+ collector.row(txnId.toString(), e.idMicros,
e.kind.name(), 0L)
.eagerCollect(columns -> {
columns.add("message", "<Initialised but
no events (yet) recorded>");
});
@@ -1052,7 +1140,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
else
{
e.messages().forEach(m -> {
- collector.row(txnId.toString(), eventType.name(),
e.idMicros, NANOSECONDS.toMicros(m.atNanos - e.atNanos))
+ collector.row(txnId.toString(), e.idMicros,
e.kind.name(), NANOSECONDS.toMicros(m.atNanos - e.atNanos))
.eagerCollect(columns -> {
columns.add("command_store_id",
m.commandStoreId)
.add("message", m.message);
@@ -1064,6 +1152,226 @@ public class AccordDebugKeyspace extends VirtualKeyspace
}
}
+ /**
+ * Usage:
+ * collect N events (may be more than N messages)
+ * UPDATE system_accord_debug.txn_trace SET permits = N WHERE txn_id = ?
AND event_type = ?
+ * SELECT * FROM system_accord_debug.txn_traces WHERE txn_id = ? AND
event_type = ?
+ */
+ public static final class TxnPatternTraceTable extends
AbstractMutableLazyVirtualTable
+ {
+ private TxnPatternTraceTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, TXN_PATTERN_TRACE,
+ "Accord Transaction Pattern Trace Configuration",
+ "CREATE TABLE %s (\n" +
+ " id int,\n" +
+ " bucket_mode text,\n" +
+ " bucket_seen int,\n" +
+ " bucket_size int,\n" +
+ " chance float,\n" +
+ " current_size int,\n" +
+ " if_intersects text,\n" +
+ " if_kind text,\n" +
+ " on_failure text,\n" +
+ " on_new text,\n" +
+ " trace_bucket_mode text,\n" +
+ " trace_bucket_size int,\n" +
+ " trace_bucket_sub_size int,\n" +
+ " trace_events text,\n" +
+ " PRIMARY KEY (id)" +
+ ')', Int32Type.instance), FAIL, SORTED);
+ }
+
+ @Override
+ protected void collect(PartitionsCollector collector)
+ {
+ AccordTracing tracing = tracing();
+ tracing.forEachPattern((state) -> {
+ collector.row(state.id())
+ .eagerCollect(columns -> {
+ columns.add("bucket_mode", state.mode().name())
+ .add("bucket_size", state.bucketSize())
+ .add("bucket_seen", state.bucketSeen())
+ .add("chance", state.pattern().chance)
+ .add("current_size", state.currentSize())
+ .add("if_intersects",
state.pattern().intersects, TxnPatternTraceTable::toString)
+ .add("if_kind", state.pattern().kinds,
TO_STRING)
+ .add("on_failure",
state.pattern().traceFailures, TO_STRING)
+ .add("on_new", state.pattern().traceNew,
TO_STRING)
+ .add("trace_bucket_mode",
state.traceWithMode().name())
+ .add("trace_bucket_size",
state.traceBucketSize())
+ .add("trace_bucket_sub_size",
state.traceBucketSubSize())
+ .add("trace_events", state.traceEvents(),
TO_STRING)
+ ;
+ });
+ });
+ }
+
+ @Override
+ protected void applyPartitionDeletion(Object[] partitionKeys)
+ {
+ int id = (Integer) partitionKeys[0];
+ tracing().erasePattern(id);
+ }
+
+ @Override
+ protected void applyRowUpdate(Object[] partitionKeys, @Nullable
Object[] clusteringKeys, ColumnMetadata[] columns, Object[] values)
+ {
+ int id = (Integer)partitionKeys[0];
+ Function<TracePattern, TracePattern> pattern = Function.identity();
+ CoordinationKinds newTraceEvents = null;
+ BucketMode newBucketMode = null, newTraceBucketMode = null;
+ int newBucketSize = -1, newTraceBucketSize = -1,
newTraceBucketSubSize = -1;
+ int newBucketSeen = -1;
+ for (int i = 0 ; i < columns.length ; ++i)
+ {
+ String name = columns[i].name.toString();
+ switch (name)
+ {
+ default: throw new InvalidRequestException("Cannot update
'" + name + '\'');
+ case "bucket_mode":
+ newBucketMode = checkBucketMode(values[i]);
+ break;
+ case "bucket_seen":
+ newBucketSeen = checkNonNegative(values[i], name, 0);
+ break;
+ case "bucket_size":
+ newBucketSize = checkNonNegative(values[i], name, 0);
+ break;
+ case "chance":
+ float newChance = checkChance(values[i], name);
+ pattern = pattern.andThen(p ->
p.withChance(newChance));
+ break;
+ case "if_intersects":
+ Participants<?> intersects =
parseParticipants(values[i]);
+ pattern = pattern.andThen(p ->
p.withIntersects(intersects));
+ break;
+ case "if_kind":
+ TxnKindsAndDomains kinds = tryParseTxnKinds(values[i]);
+ pattern = pattern.andThen(p -> p.withKinds(kinds));
+ break;
+ case "on_failure":
+ CoordinationKinds traceFailures =
tryParseCoordinationKinds(values[i]);
+ pattern = pattern.andThen(p ->
p.withTraceFailures(traceFailures));
+ break;
+ case "on_new":
+ CoordinationKinds traceNew =
tryParseCoordinationKinds(values[i]);
+ pattern = pattern.andThen(p ->
p.withTraceNew(traceNew));
+ break;
+ case "trace_bucket_mode":
+ newTraceBucketMode = checkBucketMode(values[i]);
+ break;
+ case "trace_bucket_size":
+ newTraceBucketSize = checkNonNegative(values[i], name,
0);
+ break;
+ case "trace_bucket_sub_size":
+ newTraceBucketSubSize = checkNonNegative(values[i],
name, 0);
+ break;
+ case "trace_events":
+ newTraceEvents = tryParseCoordinationKinds(values[i]);
+ break;
+ }
+ }
+
+ tracing().setPattern(id, pattern, newBucketMode, newBucketSeen,
newBucketSize, newTraceBucketMode, newTraceBucketSize, newTraceBucketSubSize,
newTraceEvents);
+ }
+
+ private static String toString(Participants<?> participants)
+ {
+ StringBuilder out = new StringBuilder();
+ for (Routable r : participants)
+ {
+ if (out.length() != 0)
+ out.append('|');
+ out.append(r);
+ }
+ return out.toString();
+ }
+
+ private static Participants<?> parseParticipants(Object input)
+ {
+ if (input == null)
+ return null;
+
+ String[] vs = ((String)input).split("\\|");
+ if (vs.length == 0)
+ return RoutingKeys.EMPTY;
+
+ if (!vs[0].endsWith("]"))
+ {
+ RoutingKey[] keys = new RoutingKey[vs.length];
+ for (int i = 0 ; i < keys.length ; ++i)
+ {
+ try { keys[i] = TokenKey.parse(vs[i],
DatabaseDescriptor.getPartitioner()); }
+ catch (Throwable t) { throw new
InvalidRequestException("Could not parse TokenKey " + vs[0]); }
+ }
+ return RoutingKeys.of(keys);
+ }
+ else
+ {
+ TokenRange[] ranges = new TokenRange[vs.length];
+ for (int i = 0 ; i < ranges.length ; ++i)
+ {
+ try { ranges[i] = TokenRange.parse(vs[0],
DatabaseDescriptor.getPartitioner()); }
+ catch (Throwable t) { throw new
InvalidRequestException("Could not parse TokenKey " + vs[0]); }
+ }
+ return Ranges.of(ranges);
+ }
+ }
+
+ @Override
+ public void truncate()
+ {
+ tracing().eraseAllPatterns();
+ }
+ }
+
+ public static final class TxnPatternTracesTable extends
AbstractMutableLazyVirtualTable
+ {
+ private TxnPatternTracesTable()
+ {
+ super(parse(VIRTUAL_ACCORD_DEBUG, TXN_PATTERN_TRACES,
+ "Accord Transaction Pattern Traces",
+ "CREATE TABLE %s (\n" +
+ " id int,\n" +
+ " txn_id 'TxnIdUtf8Type',\n" +
+ " PRIMARY KEY (id, txn_id)" +
+ ')', Int32Type.instance), FAIL, SORTED, SORTED);
+ }
+
+ @Override
+ protected void applyPartitionDeletion(Object[] partitionKeys)
+ {
+ int id = (Integer) partitionKeys[0];
+ tracing().erasePatternTraces(id);
+ }
+
+ @Override
+ public void truncate()
+ {
+ tracing().eraseAllPatternTraces();
+ }
+
+ @Override
+ public void collect(PartitionsCollector collector)
+ {
+ tracing().forEachPattern(state -> {
+ if (state.currentSize() == 0)
+ {
+ collector.row(state.id(), "")
+ .eagerCollect(column -> {});
+ }
+ else
+ {
+ for (int i = 0, size = state.currentSize(); i < size ; ++i)
+ collector.row(state.id(), state.get(i).toString())
+ .eagerCollect(columns -> {});
+ }
+ });
+ }
+ }
+
// TODO (desired): don't report null as "null"
abstract static class AbstractJournalTable extends AbstractLazyVirtualTable
{
@@ -1291,7 +1599,24 @@ public class AccordDebugKeyspace extends VirtualKeyspace
public static final class TxnOpsTable extends
AbstractMutableLazyVirtualTable
{
// TODO (expected): test each of these operations
- enum Op { ERASE_VESTIGIAL, INVALIDATE, TRY_EXECUTE, FORCE_APPLY,
FORCE_UPDATE, RECOVER, FETCH, RESET_PROGRESS_LOG }
+ enum Op
+ {
+ LOCALLY_ERASE_VESTIGIAL("USE WITH CAUTION: Move the command to the
vestigial status, erasing its contents. This has distributed state machine
implications."),
+ LOCALLY_INVALIDATE("USE WITH CAUTION: Move the command to the
invalidated status, erasing its contents. This has distributed state machine
implications."),
+ TRY_EXECUTE("Try to execute a stuck transaction. This is safe, and
will no-op if not able to."),
+ FORCE_APPLY("USE WITH CAUTION: Apply the command if we have the
relevant information locally."),
+ FORCE_UPDATE("Try to reset in-memory book-keeping related to a
command."),
+ RECOVER("Initiate recovery for a command."),
+ FETCH("Initiate a fetch request for a command."),
+ REQUEUE_PROGRESS_LOG("Ask the progress log to queue both home and
waiting states.");
+
+ final String description;
+
+ Op(String description)
+ {
+ this.description = description;
+ }
+ }
private TxnOpsTable()
{
super(parse(VIRTUAL_ACCORD_DEBUG, TXN_OPS,
@@ -1315,14 +1640,16 @@ public class AccordDebugKeyspace extends VirtualKeyspace
{
TxnId txnId = TxnId.parse((String) partitionKeys[0]);
int commandStoreId = (Integer) clusteringKeys[0];
- Op op = Op.valueOf((String)values[0]);
+
+ Op op = tryParse(values[0], true, Op.class, Op::valueOf);
+
switch (op)
{
default: throw new UnhandledEnum(op);
- case ERASE_VESTIGIAL:
+ case LOCALLY_ERASE_VESTIGIAL:
cleanup(txnId, commandStoreId, Cleanup.VESTIGIAL);
break;
- case INVALIDATE:
+ case LOCALLY_INVALIDATE:
cleanup(txnId, commandStoreId, Cleanup.INVALIDATE);
break;
case TRY_EXECUTE:
@@ -1360,7 +1687,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
recover(txnId, route, result);
});
break;
- case RESET_PROGRESS_LOG:
+ case REQUEUE_PROGRESS_LOG:
run(txnId, commandStoreId, safeStore -> {
((DefaultProgressLog)safeStore.progressLog()).requeue(safeStore,
TxnStateKind.Waiting, txnId);
((DefaultProgressLog)safeStore.progressLog()).requeue(safeStore,
TxnStateKind.Home, txnId);
@@ -1381,7 +1708,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
BiConsumer<Route<?>, AsyncResult.Settable<Void>> consumer =
apply.apply(command);
if (command.route() == null)
{
- FetchRoute.fetchRoute(node, txnId,
command.maxContactable(), LatentStoreSelector.standard(), (success, fail) -> {
+ FetchRoute.fetchRoute(node, txnId,
command.maxParticipants(), LatentStoreSelector.standard(), (success, fail) -> {
if (fail != null) result.setFailure(fail);
else consumer.accept(success, result);
});
@@ -1411,7 +1738,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
PrepareRecovery.recover(node, node.someSequentialExecutor(),
txnId, NotKnownToBeInvalid, (FullRoute<?>) route, null,
LatentStoreSelector.standard(), (success, fail) -> {
if (fail != null) result.setFailure(fail);
else result.setSuccess(null);
- }, node.agent().trace(txnId, RECOVER));
+ });
}
else
{
@@ -1497,21 +1824,6 @@ public class AccordDebugKeyspace extends VirtualKeyspace
}
}
- private static TableId tableId(int commandStoreId, CommandStores
commandStores)
- {
- AccordCommandStore commandStore = (AccordCommandStore)
commandStores.forId(commandStoreId);
- if (commandStore == null)
- return null;
- return commandStore.tableId();
- }
-
- private static TableMetadata tableMetadata(TableId tableId)
- {
- if (tableId == null)
- return null;
- return Schema.instance.getTableMetadata(tableId);
- }
-
private static String printToken(RoutingKey routingKey)
{
TokenKey key = (TokenKey) routingKey;
@@ -1539,10 +1851,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
return av + " (" + bv + ')';
}
- private static TraceEventType parseEventType(String input)
+ private static CoordinationKind parseEventType(String input)
{
- try { return
TraceEventType.valueOf(LocalizeString.toUpperCaseLocalized(input,
Locale.ENGLISH)); }
- catch (Throwable t) { throw invalidRequest("event_type must be one of
%s; received %s", TraceEventType.values(), input); }
+ return tryParse(input, false, CoordinationKind.class,
CoordinationKind::valueOf);
}
private static String toStringOrNull(Object o)
@@ -1559,4 +1870,80 @@ public class AccordDebugKeyspace extends VirtualKeyspace
return "<error: " + t.getLocalizedMessage() + '>';
}
}
+
+ private static BucketMode checkBucketMode(Object value)
+ {
+ try { return
AccordTracing.BucketMode.valueOf(LocalizeString.toUpperCaseLocalized((String)value,
Locale.ENGLISH)); }
+ catch (IllegalArgumentException | NullPointerException e)
+ {
+ throw new InvalidRequestException("Unknown bucket_mode '" + value
+ '\'');
+ }
+
+ }
+
+ private static int checkNonNegative(Object value, String field, int ifNull)
+ {
+ if (value == null)
+ return ifNull;
+
+ int v = (Integer)value;
+ if (v < 0)
+ throw new InvalidRequestException("Cannot set '" + field + "' to
negative value");
+ return v;
+ }
+
+ private static float checkChance(Object value, String field)
+ {
+ if (value == null)
+ return 1.0f;
+
+ float v = (Float)value;
+ if (v <= 0 || v > 1.0f)
+ throw new InvalidRequestException("Cannot set '" + field + "' to
value outside the range (0..1]");
+ return v;
+ }
+
+ private static <T extends Enum<T>> Set<String> toStrings(TinyEnumSet<T>
set, IntFunction<T> lookup)
+ {
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (T t : set.iterable(lookup))
+ builder.add(t.name());
+ return builder.build();
+ }
+
+ private static AccordTracing tracing()
+ {
+ return ((AccordAgent)AccordService.instance().agent()).tracing();
+ }
+
+ private static <E extends Enum<E>> E tryParse(Object input, boolean
toUpperCase, Class<E> clazz, Function<String, E> valueOf)
+ {
+ try
+ {
+ String str = (String) input;
+ if (toUpperCase)
+ str = LocalizeString.toUpperCaseLocalized(str, Locale.ENGLISH);
+ return valueOf.apply(str);
+ }
+ catch (IllegalArgumentException | NullPointerException e)
+ {
+ throw new InvalidRequestException("Unknown " + clazz.getName() +
": '" + input + '\'');
+ }
+ }
+
+ private static CoordinationKinds tryParseCoordinationKinds(Object input)
+ {
+ if (input == null)
+ return null;
+
+ return CoordinationKinds.parse((String) input);
+ }
+
+ private static TxnKindsAndDomains tryParseTxnKinds(Object input)
+ {
+ if (input == null)
+ return null;
+
+ return TxnKindsAndDomains.parse((String) input);
+ }
}
diff --git
a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
index f062e95076..c4a37ebf2e 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
@@ -198,8 +198,6 @@ public class AccordCoordinatorMetrics
public static class Listener implements CoordinatorEventListener
{
- public static final Listener instance = new Listener();
-
private AccordCoordinatorMetrics forTransaction(TxnId txnId)
{
if (txnId != null)
diff --git a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
index 551a7ee37d..91973e4a95 100644
--- a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
@@ -112,8 +112,6 @@ public class AccordReplicaMetrics
public static class Listener implements ReplicaEventListener
{
- public static final Listener instance = new Listener();
-
private AccordReplicaMetrics forTransaction(TxnId txnId)
{
if (txnId != null)
diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
index c9f8130225..11f5f970b3 100644
--- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
@@ -132,6 +132,10 @@ public class AccordSystemMetrics
}
}
+ public static void touch()
+ {
+ }
+
private AccordSystemMetrics()
{
Invariants.expect(AccordService.isSetup());
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 6742f24843..33de4a5006 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -41,6 +41,7 @@ import com.google.common.primitives.Ints;
import accord.api.ConfigurationService.EpochReady;
import accord.primitives.Txn;
import org.apache.cassandra.metrics.AccordReplicaMetrics;
+import org.apache.cassandra.metrics.AccordSystemMetrics;
import org.apache.cassandra.service.accord.api.AccordViolationHandler;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
@@ -307,6 +308,7 @@ public class AccordService implements IAccordService,
Shutdownable
instance = as;
AccordReplicaMetrics.touch();
+ AccordSystemMetrics.touch();
AccordViolationHandler.setup();
WatermarkCollector.fetchAndReportWatermarksAsync(as.configService);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTracing.java
b/src/java/org/apache/cassandra/service/accord/AccordTracing.java
index 9de40299a0..8bbae998ca 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTracing.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTracing.java
@@ -18,40 +18,75 @@
package org.apache.cassandra.service.accord;
-import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Tracing;
-import accord.api.TraceEventType;
+import accord.coordinate.Coordination;
+import accord.coordinate.Coordination.CoordinationKind;
+import accord.coordinate.tracking.AbstractTracker;
import accord.local.CommandStore;
+import accord.local.Node;
+import accord.primitives.Participants;
+import accord.primitives.Routable;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.SortedListMap;
+import accord.utils.TinyEnumSet;
+import accord.utils.UnhandledEnum;
+import org.apache.cassandra.metrics.AccordCoordinatorMetrics;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.NoSpamLogger;
-public class AccordTracing
+import static
org.apache.cassandra.service.accord.AccordTracing.BucketMode.LEAKY;
+import static
org.apache.cassandra.service.accord.AccordTracing.BucketMode.SAMPLE;
+
+public class AccordTracing extends AccordCoordinatorMetrics.Listener
{
private static final int MAX_EVENTS = 10000;
private static final Logger logger =
LoggerFactory.getLogger(AccordTracing.class);
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.MINUTES);
+ public enum BucketMode
+ {
+ LEAKY, SAMPLE, RING;
+
+ int position(int permits, int total)
+ {
+ switch (this)
+ {
+ default: throw UnhandledEnum.unknown(this);
+ case LEAKY: return Integer.MAX_VALUE;
+ case RING: return total % permits;
+ case SAMPLE: return ThreadLocalRandom.current().nextInt(total);
+ }
+ }
+ }
+
public interface ConsumeState
{
- void accept(TxnId txnId, TraceEventType eventType, int permits,
List<Event> events);
+ void accept(TxnId txnId, TxnEvents state);
}
public static class Message
@@ -74,11 +109,18 @@ public class AccordTracing
}
}
- public static class Event implements Tracing, Comparable<Event>
+ public static class TxnEvent implements Tracing, Comparable<TxnEvent>
{
+ public final CoordinationKind kind;
public final long idMicros = uniqueNowMicros();
public final long atNanos = Clock.Global.nanoTime();
final List<Message> messages = new ArrayList<>();
+ int index = -1, subIndex = -1;
+
+ public TxnEvent(CoordinationKind kind)
+ {
+ this.kind = kind;
+ }
@Override
public void trace(CommandStore commandStore, String s)
@@ -91,7 +133,7 @@ public class AccordTracing
}
@Override
- public int compareTo(Event that)
+ public int compareTo(TxnEvent that)
{
return Long.compareUnsigned(this.idMicros, that.idMicros);
}
@@ -102,37 +144,594 @@ public class AccordTracing
}
}
- static class TraceState extends AbstractList<Event>
+ private static class TxnEventsList
{
- int permits;
- int size;
- Event[] events;
+ TxnEvent[] events;
+ int size, bucketSeen;
- void addInternal(Event event)
+ void addInternal(TxnEvent event)
{
- if (events == null) events = new Event[10];
+ if (events == null) events = new TxnEvent[10];
else if (size == events.length) events = Arrays.copyOf(events,
events.length * 2);
events[size++] = event;
}
- void truncate(int eraseBefore)
+ void truncateInternal(int eraseBefore)
{
System.arraycopy(events, eraseBefore, events, 0, size -
eraseBefore);
Arrays.fill(events, size - eraseBefore, size, null);
size -= eraseBefore;
}
- @Override
- public Event get(int index)
+ public TxnEvent get(int index)
{
return events[index];
}
- @Override
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
public int size()
{
return size;
}
+
+ void incrementSeen()
+ {
+ if (++bucketSeen < 0)
+ bucketSeen = Integer.MAX_VALUE;
+ }
+ }
+
+ public static class TxnEvents extends TxnEventsList
+ {
+ private final EnumMap<CoordinationKind, TxnEventsList> subLists = new
EnumMap<>(CoordinationKind.class);
+ private CoordinationKinds traceEvents = CoordinationKinds.ALL;
+ private BucketMode mode = LEAKY;
+ private TracePatternState owner;
+ private int bucketSize, bucketSubSize;
+ private float chance = 1.0f;
+
+ void remove(int index)
+ {
+ TxnEvent removing = get(index);
+ TxnEventsList subList = subLists.get(removing.kind);
+ if (--subList.size == 0)
+ {
+ subLists.remove(removing.kind);
+ }
+ else
+ {
+ if (removing.subIndex < subList.size)
+ {
+ TxnEvent replaceWith = subList.events[subList.size];
+ subList.events[removing.subIndex] = replaceWith;
+ replaceWith.subIndex = removing.subIndex;
+ }
+ subList.events[subList.size] = null;
+ }
+ --size;
+ if (removing.index < size)
+ {
+ TxnEvent replaceWith = events[size];
+ events[index] = replaceWith;
+ replaceWith.index = removing.index;
+ }
+ events[size] = null;
+ }
+
+ TxnEvents eraseEvents(CoordinationKind kind, GlobalCount globalCount)
+ {
+ TxnEventsList subList = subLists.remove(kind);
+ if (subList == null)
+ return this;
+
+ while (!subList.isEmpty())
+ {
+ remove(subList.get(subList.size() - 1).index);
+ globalCount.decrementAndGet();
+ }
+
+ return nullIfDefunct();
+ }
+
+ TxnEvents eraseEventsBetween(long minId, long maxId, GlobalCount
globalCount)
+ {
+ int i = 0;
+ while (i < size())
+ {
+ TxnEvent event = get(i);
+ if (event.idMicros >= minId && event.idMicros <= maxId)
+ {
+ remove(i);
+ globalCount.decrementAndGet();
+ }
+ else ++i;
+ }
+ return nullIfDefunct();
+ }
+
+ TxnEvents stopTracing()
+ {
+ bucketSize = bucketSubSize = 0;
+ return nullIfDefunct();
+ }
+
+ TxnEvents eraseEvents(GlobalCount globalCount)
+ {
+ globalCount.addAndGet(-size());
+ subLists.clear();
+ truncateInternal(size());
+ return nullIfDefunct();
+ }
+
+ TxnEvents nullIfDefunct()
+ {
+ return isEmpty() && bucketSize == 0 ? null : this;
+ }
+
+ TxnEvent trace(CoordinationKind kind, GlobalCount globalCount)
+ {
+ if (bucketSize == 0 || !traceEvents.contains(kind))
+ return null;
+
+ if (chance < 1.0f && ThreadLocalRandom.current().nextFloat() >=
chance)
+ return null;
+
+ TxnEventsList subList = subLists.get(kind);
+ if (subList != null && bucketSubSize <= subList.size)
+ {
+ subList.incrementSeen();
+ int position = mode.position(bucketSubSize,
subList.bucketSeen);
+ if (position >= bucketSubSize)
+ return null;
+
+ remove(subList.get(position).index);
+ }
+ else if (bucketSize <= size)
+ {
+ incrementSeen();
+ int position = mode.position(bucketSize, bucketSeen);
+ if (position >= bucketSize)
+ return null;
+
+ remove(position);
+ }
+ else
+ {
+ if (!globalCount.admit())
+ return null;
+ }
+
+ return newTrace(kind, subList);
+ }
+
+ TxnEvent forceTrace(CoordinationKind kind, GlobalCount globalCount)
+ {
+ // ignore all LOCAL accounting and filters, just add a new trace
+ if (!globalCount.admit())
+ return null;
+
+ return newTrace(kind, null);
+ }
+
+ private TxnEvent newTrace(CoordinationKind kind, TxnEventsList subList)
+ {
+ if (subList == null)
+ subLists.put(kind, subList = new TxnEventsList());
+
+ TxnEvent event = new TxnEvent(kind);
+ event.subIndex = subList.size;
+ subList.addInternal(event);
+ event.index = size;
+ addInternal(event);
+ return event;
+ }
+
+ public boolean hasOwner()
+ {
+ return owner != null;
+ }
+
+ public CoordinationKinds traceEvents()
+ {
+ return traceEvents;
+ }
+
+ public int bucketSize()
+ {
+ return bucketSize;
+ }
+
+ public int bucketSubSize()
+ {
+ return bucketSubSize;
+ }
+
+ public int bucketSeen()
+ {
+ return bucketSeen;
+ }
+
+ public float chance()
+ {
+ return chance;
+ }
+
+ public BucketMode bucketMode()
+ {
+ return mode;
+ }
+
+ public void forEach(Consumer<TxnEvent> forEach)
+ {
+ for (int i = 0 ; i < size ; ++i)
+ forEach.accept(events[i]);
+ }
+ }
+
+ enum NewOrFailure
+ {
+ NEW, FAILURE
+ }
+
+ public static class CoordinationKinds extends TinyEnumSet<CoordinationKind>
+ {
+ private static final int ALL_BITS = CoordinationKind.ALL.bitset();
+ public static final CoordinationKinds ALL = new
CoordinationKinds(false, ALL_BITS);
+ public static final CoordinationKinds NONE = new
CoordinationKinds(false, 0);
+
+ final boolean printAsSubtraction;
+ public CoordinationKinds(boolean printAsSubtraction, int bitset)
+ {
+ super(bitset);
+ this.printAsSubtraction = printAsSubtraction;
+ }
+
+ @Override
+ public String toString()
+ {
+ if (bitset == ALL_BITS)
+ return "*";
+ if (printAsSubtraction)
+ return '-' + toString(ALL_BITS & ~bitset);
+ return toString(bitset, CoordinationKind::forOrdinal);
+ }
+
+ public static CoordinationKinds parse(String input)
+ {
+ input = input.trim();
+ if (input.equals("*"))
+ return ALL;
+ if (input.equals("{}"))
+ return NONE;
+
+ boolean subtraction = false;
+ if (input.length() >= 1 && input.charAt(0) == '-')
+ {
+ subtraction = true;
+ input = input.substring(1);
+ }
+ if (input.length() < 2 || input.charAt(0) != '{' ||
input.charAt(input.length() - 1) != '}')
+ throw new IllegalArgumentException("Invalid CoordinationKinds
specification: " + input);
+
+ int bits = 0;
+ for (String name : input.substring(1, input.length() -
1).split("\\s*,\\s*"))
+ bits |= TinyEnumSet.encode(CoordinationKind.valueOf(name));
+
+ if (subtraction)
+ bits = ALL_BITS & ~bits;
+ return new CoordinationKinds(subtraction, bits);
+ }
+
+ private static String toString(int bitset)
+ {
+ return TinyEnumSet.toString(bitset, CoordinationKind::forOrdinal);
+ }
+ }
+
+ public static class TxnKindsAndDomains
+ {
+ static final int ALL_KINDS = Txn.Kind.All.bitset();
+ static final TxnKindsAndDomains ALL = new TxnKindsAndDomains(false,
ALL_KINDS, ALL_KINDS);
+ static final TxnKindsAndDomains NONE = new TxnKindsAndDomains(false,
0, 0);
+
+ final boolean printAsSubtraction;
+ final int keys, ranges;
+ public TxnKindsAndDomains(boolean printAsSubtraction, int keys, int
ranges)
+ {
+ this.printAsSubtraction = printAsSubtraction;
+ this.keys = keys;
+ this.ranges = ranges;
+ }
+
+ boolean matches(TxnId txnId)
+ {
+ int bits = txnId.is(Routable.Domain.Key) ? keys : ranges;
+ return TinyEnumSet.contains(bits, txnId.kind());
+ }
+
+ @Override
+ public String toString()
+ {
+ if (keys == ALL_KINDS && ranges == ALL_KINDS)
+ return "*";
+ if (printAsSubtraction)
+ return '-' + toString(ALL_KINDS & ~keys, ALL_KINDS & ~ranges);
+ return '+' + toString(keys, ranges);
+ }
+
+ public static TxnKindsAndDomains parse(String input)
+ {
+ input = input.trim();
+ if (input.equals("*"))
+ return ALL;
+ if (input.equals("{}"))
+ return NONE;
+
+ boolean subtraction = false;
+ if (input.length() >= 1 && input.charAt(0) == '-')
+ {
+ subtraction = true;
+ input = input.substring(1);
+ }
+ if (input.length() < 2 || input.charAt(0) != '{' ||
input.charAt(input.length() - 1) != '}')
+ throw new IllegalArgumentException("Invalid TxnKindsAndDomain
specification: " + input);
+
+ int keys = 0, ranges = 0;
+ for (String element : input.substring(1, input.length() -
1).split("\\s*,\\s*"))
+ {
+ if (element.length() != 2)
+ throw new IllegalArgumentException("Invalid
TxnKindsAndDomain element: " + element);
+
+ int kinds;
+ if (element.charAt(1) == '*') kinds = ALL_KINDS;
+ else
+ {
+ Txn.Kind kind = Txn.Kind.forShortName(element.charAt(1));
+ if (kind == null) throw new
IllegalArgumentException("Unknown Txn.Kind: " + element.charAt(1));
+ kinds = TinyEnumSet.encode(kind);
+ }
+
+ switch (element.charAt(0))
+ {
+ default: throw new IllegalArgumentException("Invalid
TxnKindsAndDomain element: " + element);
+ case '*': keys |= kinds; ranges |= kinds; break;
+ case 'K': keys |= kinds; break;
+ case 'R': ranges |= kinds; break;
+ }
+ }
+
+ if (subtraction)
+ {
+ keys = ALL_KINDS & ~keys;
+ ranges = ALL_KINDS & ~ranges;
+ }
+ return new TxnKindsAndDomains(subtraction, keys, ranges);
+ }
+
+ private static String toString(int keys, int ranges)
+ {
+ StringBuilder out = new StringBuilder("{");
+ if (keys != 0)
+ {
+ if (keys == ALL_KINDS) out.append("K*");
+ else TinyEnumSet.append(keys, Txn.Kind::forOrdinal, k -> "K" +
k.shortName(), out);
+ }
+
+ if (ranges != 0)
+ {
+ if (keys != 0) out.append(',');
+ if (ranges == ALL_KINDS) out.append("R*");
+ else TinyEnumSet.append(ranges, Txn.Kind::forOrdinal, k -> "R"
+ k.shortName(), out);
+ }
+ out.append('}');
+ return out.toString();
+ }
+ }
+
+ public static class TracePattern
+ {
+ private static final TracePattern EMPTY = new TracePattern(null, null,
null, null, 1.0f);
+
+ public final TxnKindsAndDomains kinds;
+ public final Participants<?> intersects;
+ public final CoordinationKinds traceNew;
+ public final CoordinationKinds traceFailures;
+ public final float chance;
+
+ public TracePattern(TxnKindsAndDomains kinds, @Nullable
Participants<?> intersects, CoordinationKinds traceNew, CoordinationKinds
traceFailures, float chance)
+ {
+ this.kinds = kinds;
+ this.intersects = intersects;
+ this.traceNew = traceNew;
+ this.traceFailures = traceFailures;
+ this.chance = chance;
+ }
+
+ public TracePattern withKinds(TxnKindsAndDomains kinds)
+ {
+ return new TracePattern(kinds, intersects, traceNew,
traceFailures, chance);
+ }
+
+ public TracePattern withIntersects(Participants<?> intersects)
+ {
+ return new TracePattern(kinds, intersects, traceNew,
traceFailures, chance);
+ }
+
+ public TracePattern withTraceNew(CoordinationKinds traceNew)
+ {
+ return new TracePattern(kinds, intersects, traceNew,
traceFailures, chance);
+ }
+
+ public TracePattern withTraceFailures(CoordinationKinds traceFailures)
+ {
+ return new TracePattern(kinds, intersects, traceNew,
traceFailures, chance);
+ }
+
+ public TracePattern withChance(float chance)
+ {
+ return new TracePattern(kinds, intersects, traceNew,
traceFailures, chance);
+ }
+
+ boolean matches(TxnId txnId, @Nullable Participants<?> participants,
CoordinationKind kind, NewOrFailure newOrFailure)
+ {
+ if (kinds != null && !kinds.matches(txnId))
+ return false;
+
+ TinyEnumSet<CoordinationKind> testKind = newOrFailure ==
NewOrFailure.NEW ? traceNew : traceFailures;
+ if (testKind == null || !testKind.contains(kind))
+ return false;
+
+ if (intersects != null && (participants == null ||
!intersects.intersects(participants)))
+ return false;
+
+ return chance >= 1.0f || ThreadLocalRandom.current().nextFloat()
<= chance;
+ }
+ }
+
+ public class TracePatternState
+ {
+ final int id;
+
+ private volatile TracePattern pattern;
+ private BucketMode bucketMode = SAMPLE;
+ private int bucketSize;
+ private int bucketSeen;
+ private BucketMode traceBucketMode = SAMPLE;
+ private int traceBucketSize, traceBucketSubSize;
+ private CoordinationKinds traceEvents = new CoordinationKinds(false,
0);
+
+ private final List<TxnId> txnIds = new ArrayList<>();
+
+ public TracePatternState(int id)
+ {
+ this.pattern = TracePattern.EMPTY;
+ this.id = id;
+ }
+
+ public int id() { return id; }
+ public TracePattern pattern() { return pattern; }
+ public int bucketSize() { return bucketSize; }
+ public BucketMode mode() { return bucketMode; }
+ public int bucketSeen() { return bucketSeen; }
+ public BucketMode traceWithMode() { return traceBucketMode; }
+ public int traceBucketSize() { return traceBucketSize; }
+ public int traceBucketSubSize() { return traceBucketSubSize; }
+ public CoordinationKinds traceEvents() { return traceEvents; }
+
+ public int currentSize()
+ {
+ return txnIds.size();
+ }
+
+ public TxnId get(int index)
+ {
+ return txnIds.get(index);
+ }
+
+ TxnEvents maybeAdd(TxnId txnId, @Nullable Participants<?>
participants, CoordinationKind kind, NewOrFailure newOrFailure)
+ {
+ if (!pattern.matches(txnId, participants, kind, newOrFailure))
+ return null;
+
+ return maybeAdd(txnId);
+ }
+
+ private synchronized TxnEvents maybeAdd(TxnId txnId)
+ {
+ if (bucketSize == 0)
+ return null;
+
+ if (++bucketSeen < 0)
+ bucketSeen = Integer.MAX_VALUE;
+
+ if (bucketSize > txnIds.size())
+ {
+ TxnEvents added = trace(txnId);
+ if (added != null)
+ txnIds.add(txnId);
+ return added;
+ }
+
+ int position = bucketMode.position(bucketSize, bucketSeen);
+
+ if (position >= bucketSize)
+ return null;
+
+ TxnEvents added = trace(txnId);
+ if (added == null)
+ return null;
+
+ untrace(txnIds.get(position));
+ txnIds.set(position, txnId);
+ return added;
+ }
+
+ private synchronized void untrace(TxnId txnId)
+ {
+ txnIdMap.compute(txnId, (ignore, cur) -> {
+ if (cur == null || cur.owner == this)
+ return null;
+ return cur;
+ });
+ }
+
+ private synchronized TxnEvents trace(TxnId txnId)
+ {
+ TxnEvents events = new TxnEvents();
+ events.mode = traceBucketMode;
+ events.bucketSize = traceBucketSize;
+ events.bucketSubSize = traceBucketSubSize;
+ events.owner = this;
+ if (null == txnIdMap.putIfAbsent(txnId, events))
+ return events;
+ return null;
+ }
+
+ synchronized void set(Function<TracePattern, TracePattern> pattern,
BucketMode newBucketMode, int newBucketSeen, int newBucketSize, BucketMode
newTraceBucketMode, int newTraceBucketSize, int newTraceBucketSubSize,
CoordinationKinds newTraceEvents)
+ {
+ Invariants.require(newBucketSize != 0);
+ Invariants.require(newTraceBucketSize != 0);
+ this.pattern = pattern.apply(this.pattern);
+ if (newBucketMode != null)
+ this.bucketMode = newBucketMode;
+ if (newBucketSize >= 0)
+ this.bucketSize = newBucketSize;
+ if (newBucketSeen >= 0)
+ this.bucketSeen = newBucketSeen;
+ if (newTraceBucketMode != null)
+ this.traceBucketMode = newTraceBucketMode;
+ if (newTraceBucketSize >= 0)
+ this.traceBucketSize = newTraceBucketSize;
+ if (newTraceBucketSubSize >= 0)
+ this.traceBucketSubSize = newTraceBucketSubSize;
+ if (newTraceEvents != null)
+ this.traceEvents = newTraceEvents;
+ }
+
+ synchronized void clear()
+ {
+ for (TxnId txnId : txnIds)
+ untrace(txnId);
+ txnIds.clear();
+ }
+ }
+
+ static class GlobalCount extends AtomicInteger
+ {
+ public boolean admit()
+ {
+ if (incrementAndGet() <= MAX_EVENTS)
+ return true;
+
+ decrementAndGet();
+ ClientWarn.instance.warn("Too many Accord trace events stored
already; delete some to continue tracing");
+ noSpamLogger.warn("Too many Accord trace events stored already;
delete some to continue tracing");
+ return false;
+ }
}
private static final AtomicLong lastNowMicros = new AtomicLong();
@@ -149,176 +748,265 @@ public class AccordTracing
}
}
- final Map<TxnId, EnumMap<TraceEventType, TraceState>> stateMap = new
ConcurrentHashMap<>();
- final AtomicInteger count = new AtomicInteger();
+ final Map<TxnId, TxnEvents> txnIdMap = new ConcurrentHashMap<>();
+ final CopyOnWriteArrayList<TracePatternState> allPatterns = new
CopyOnWriteArrayList<>();
+ final CopyOnWriteArrayList<TracePatternState> traceNewPatterns = new
CopyOnWriteArrayList<>();
+ final GlobalCount globalCount = new GlobalCount();
- public Tracing trace(TxnId txnId, TraceEventType eventType)
+ public Tracing trace(TxnId txnId, @Nullable Participants<?> participants,
CoordinationKind kind)
{
- if (!stateMap.containsKey(txnId))
+ if (!txnIdMap.containsKey(txnId) && null == maybeTrace(txnId,
participants, kind, NewOrFailure.NEW, traceNewPatterns))
return null;
- class Register implements BiFunction<TxnId, EnumMap<TraceEventType,
TraceState>, EnumMap<TraceEventType, TraceState>>
+ class Register implements BiFunction<TxnId, TxnEvents, TxnEvents>
{
- Event event;
+ TxnEvent event;
@Override
- public EnumMap<TraceEventType, TraceState> apply(TxnId id,
EnumMap<TraceEventType, TraceState> cur)
+ public TxnEvents apply(TxnId id, TxnEvents state)
{
- if (cur == null)
+ if (state == null)
return null;
- TraceState curState = cur.get(eventType);
- if (curState == null || curState.permits == 0)
- return cur;
-
- if (count.incrementAndGet() >= MAX_EVENTS)
- {
- count.decrementAndGet();
- noSpamLogger.warn("Too many Accord trace events stored
already; delete some to continue tracing");
- }
- else
- {
- curState.permits--;
- curState.addInternal(event = new Event());
- }
- return cur;
+ event = state.trace(kind, globalCount);
+ return state;
}
}
Register register = new Register();
- stateMap.compute(txnId, register);
+ txnIdMap.compute(txnId, register);
return register.event;
}
- public void setPermits(TxnId txnId, TraceEventType eventType, int
newPermits)
+ // null values, or values < 0, are ignored
+ public void set(TxnId txnId, CoordinationKinds trace, BucketMode
newBucketMode, int newBucketSize, int newBucketSubSize, int newBucketSeen,
float newChance, boolean unsetManagedByPattern)
{
- stateMap.compute(txnId, (id, cur) -> {
- if (newPermits != 0)
- {
- if (cur == null)
- cur = new EnumMap<>(TraceEventType.class);
- cur.computeIfAbsent(eventType, ignore -> new
TraceState()).permits = newPermits;
- }
- else if (cur != null)
+ Invariants.requireArgument(newBucketSize != 0);
+ Invariants.requireArgument(newBucketSubSize != 0);
+ Invariants.requireArgument(Float.isNaN(newChance) || (newChance <=
1.0f && newChance > 0f));
+ txnIdMap.compute(txnId, (id, cur) -> {
+ if (cur == null)
{
- TraceState curState = cur.get(eventType);
- if (curState != null)
- {
- if (!curState.isEmpty()) curState.permits = 0;
- else
- {
- cur.remove(eventType);
- if (cur.isEmpty())
- return null;
- }
- }
+ if (newBucketSize < 0)
+ throw new IllegalArgumentException("Must specify bucket
size for new trace config.");
+
+ cur = new TxnEvents();
+ if (newBucketSubSize < 0)
+ cur.bucketSubSize = newBucketSize;
}
+
+ if (newBucketMode != null)
+ cur.mode = newBucketMode;
+ if (newBucketSize >= 0)
+ cur.bucketSize = newBucketSize;
+ if (newBucketSubSize >= 0)
+ cur.bucketSubSize = newBucketSubSize;
+ if (newBucketSeen >= 0)
+ cur.bucketSeen = newBucketSeen;
+ if (!Float.isNaN(newChance))
+ cur.chance = newChance;
+ if (trace != null)
+ cur.traceEvents = trace;
+ if (unsetManagedByPattern)
+ cur.owner = null;
return cur;
});
}
- public void erasePermits(TxnId txnId)
+ public void stopTracing(TxnId txnId)
{
- stateMap.compute(txnId, (id, cur) -> {
+ txnIdMap.compute(txnId, (id, cur) -> {
if (cur == null)
return null;
- Iterator<TraceState> iter = cur.values().iterator();
- while (iter.hasNext())
- {
- TraceState state = iter.next();
- state.permits = 0;
- if (state.isEmpty()) iter.remove();
- }
- return cur.isEmpty() ? null : cur;
+ return cur.stopTracing();
});
}
- public void erasePermits(TxnId txnId, TraceEventType eventType)
- {
- setPermits(txnId, eventType, 0);
- }
-
public void eraseEvents(TxnId txnId)
{
- stateMap.compute(txnId, (id, cur) -> {
+ txnIdMap.compute(txnId, (id, cur) -> {
if (cur == null)
return null;
- Iterator<TraceState> iter = cur.values().iterator();
- while (iter.hasNext())
- {
- TraceState state = iter.next();
- count.addAndGet(-state.size());
- state.truncate(state.size());
- if (state.permits == 0) iter.remove();
- }
- return cur.isEmpty() ? null : cur;
+ return cur.eraseEvents(globalCount);
});
}
- public void eraseEvents(TxnId txnId, TraceEventType eventType)
+ public void eraseEvents(TxnId txnId, CoordinationKind kind)
{
- stateMap.compute(txnId, (id, cur) -> {
- if (cur != null)
- {
- TraceState state = cur.get(eventType);
- if (state == null)
- return cur;
+ txnIdMap.compute(txnId, (id, cur) -> {
+ if (cur == null)
+ return null;
- count.addAndGet(-state.size());
- state.truncate(state.size());
- if (state.permits == 0)
- cur.remove(eventType);
- if (cur.isEmpty())
- return null;
- }
- return cur;
+ return cur.eraseEvents(kind, globalCount);
});
}
- public void eraseEventsBefore(TxnId txnId, TraceEventType eventType, long
timestamp)
+ public void eraseEventsBetween(TxnId txnId, long minIdInclusive, long
maxIdInclusive)
{
- stateMap.compute(txnId, (id, cur) -> {
- if (cur != null)
- {
- TraceState state = cur.get(eventType);
- if (state == null)
- return cur;
+ txnIdMap.compute(txnId, (id, cur) -> {
+ if (cur == null)
+ return null;
- int i = 0;
- while (i < state.size() && state.get(i).idMicros < timestamp)
- ++i;
- state.truncate(i);
- count.addAndGet(-i);
- if (cur.isEmpty())
- return null;
- }
- return cur;
+ return cur.eraseEventsBetween(minIdInclusive, maxIdInclusive,
globalCount);
});
}
public void eraseAllEvents()
{
- stateMap.keySet().forEach(this::eraseEvents);
+ txnIdMap.keySet().forEach(this::eraseEvents);
}
- public void eraseAllPermits()
+ public void eraseAllBuckets()
{
- stateMap.keySet().forEach(this::erasePermits);
+ txnIdMap.keySet().forEach(this::stopTracing);
}
public void forEach(Predicate<TxnId> include, ConsumeState forEach)
{
- stateMap.forEach((txnId, state) -> {
+ txnIdMap.forEach((txnId, state) -> {
if (include.test(txnId))
{
// ensure lock is held for duration of callback
- stateMap.compute(txnId, (id, cur) -> {
- if (cur != null)
- cur.forEach((event, events) -> forEach.accept(txnId,
event, events.permits, Collections.unmodifiableList(events)));
+ txnIdMap.compute(txnId, (id, cur) -> {
+ forEach.accept(txnId, cur);
return cur;
});
}
});
}
+
+ public void setPattern(int id, Function<TracePattern, TracePattern>
pattern, BucketMode newBucketMode, int newBucketSeen, int newBucketSize,
BucketMode newTraceBucketMode, int newTraceBucketSize, int
newTraceBucketSubSize, CoordinationKinds newTraceEvents)
+ {
+ synchronized (allPatterns)
+ {
+ TracePatternState state = findPattern(id, false);
+ TracePatternState update = state != null ? state : new
TracePatternState(id);
+ boolean prevTraceNew = state != null && state.pattern.traceNew !=
null;
+ update.set(pattern, newBucketMode, newBucketSeen, newBucketSize,
newTraceBucketMode, newTraceBucketSize, newTraceBucketSubSize, newTraceEvents);
+ if (state == null)
+ allPatterns.add(update);
+ if (update.pattern.traceNew != null && !prevTraceNew)
+ traceNewPatterns.add(update);
+ else if (update.pattern.traceNew == null && prevTraceNew)
+ traceNewPatterns.remove(update);
+ }
+ }
+
+ public void erasePattern(int id)
+ {
+ TracePatternState removed = findPattern(id, true);
+ if (removed != null)
+ removed.clear();
+ }
+
+
+ public void erasePatternTraces(int id)
+ {
+ TracePatternState state = findPattern(id, false);
+ if (state != null)
+ state.clear();
+ }
+
+ private TracePatternState findPattern(int id, boolean remove)
+ {
+ synchronized (allPatterns)
+ {
+ for (int i = 0; i < allPatterns.size() ; ++i)
+ {
+ TracePatternState state = allPatterns.get(i);
+ if (state.id == id)
+ {
+ if (remove)
+ {
+ allPatterns.remove(i);
+ if (state.pattern.traceNew != null)
+ traceNewPatterns.remove(state);
+ }
+ return state;
+ }
+ }
+ }
+ return null;
+ }
+
+ public void eraseAllPatterns()
+ {
+ List<TracePatternState> removed = new ArrayList<>();
+ allPatterns.removeIf(p -> { removed.add(p); return true; });
+ removed.forEach(TracePatternState::clear);
+ }
+
+ public void eraseAllPatternTraces()
+ {
+ for (TracePatternState state : allPatterns)
+ state.clear();
+ }
+
+ public void forEachPattern(Consumer<TracePatternState> consumer)
+ {
+ allPatterns.forEach(pattern -> {
+ synchronized (pattern)
+ {
+ consumer.accept(pattern);
+ }
+ });
+ }
+
+ @Override
+ public void onFailed(Throwable failure, TxnId txnId, Participants<?>
participants, Coordination coordination)
+ {
+ TxnEvents tracing = maybeTrace(txnId, participants,
coordination.kind(), NewOrFailure.FAILURE, allPatterns);
+ if (tracing != null)
+ {
+ txnIdMap.compute(txnId, (id, cur) -> {
+ if (cur != tracing)
+ return cur;
+
+ TxnEvent event = tracing.forceTrace(coordination.kind(),
globalCount);
+ if (event == null) // we still honour global limit
+ return cur;
+
+ event.trace(null, "Failed Coordination Dump");
+ {
+ String description = coordination.describe();
+ if (description != null)
+ event.trace(null, "Description: %s", description);
+ }
+ {
+ Participants<?> scope = coordination.scope();
+ if (scope != null)
+ event.trace(null, "Scope: %s", scope);
+ }
+ {
+ AbstractTracker<?> tracker = coordination.tracker();
+ if (tracker != null)
+ event.trace(null, "Tracker: %s",
tracker.summariseTracker());
+ }
+ {
+ SortedListMap<Node.Id, ?> replies = coordination.replies();
+ if (replies != null)
+ {
+ for (int i = 0 ; i < replies.domainSize() ; ++i)
+ event.trace(null, "from %s: %s",
replies.getKey(i), replies.getValue(i));
+ }
+ }
+ return cur;
+ });
+ }
+ }
+
+ private TxnEvents maybeTrace(TxnId txnId, @Nullable Participants<?>
participants, CoordinationKind kind, NewOrFailure newOrFailure,
List<TracePatternState> patterns)
+ {
+ if (patterns.isEmpty())
+ return null;
+
+ for (TracePatternState state : patterns)
+ {
+ TxnEvents added = state.maybeAdd(txnId, participants, kind,
newOrFailure);
+ if (added != null)
+ return added;
+ }
+ return null;
+ }
}
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 08750a841f..91be38ec2f 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -173,4 +173,21 @@ public class TokenRange extends Range.EndInclusive
+ TokenKey.noTableSerializer.serializedSize(t.end());
}
};
+
+ public static TokenRange parse(String str, IPartitioner partitioner)
+ {
+ TableId tableId;
+ {
+ int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0);
+ tableId = TableId.fromString(str.substring(0, split));
+ str = str.substring(split + 2, str.length() - 1);
+ }
+
+ String[] bounds = str.split(",");
+ if (bounds.length != 2)
+ throw new IllegalArgumentException("Invalid TokenRange: " + str);
+
+ return new TokenRange(TokenKey.parse(tableId, bounds[0], partitioner),
TokenKey.parse(tableId, bounds[1], partitioner));
+ }
+
}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index e09a565c41..4f3e0ace11 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -35,7 +35,7 @@ import accord.api.ReplicaEventListener;
import accord.api.ProgressLog.BlockedUntil;
import accord.api.RoutingKey;
import accord.api.Tracing;
-import accord.api.TraceEventType;
+import accord.coordinate.Coordination;
import accord.local.Command;
import accord.local.Node;
import accord.local.SafeCommand;
@@ -43,6 +43,7 @@ import accord.local.SafeCommandStore;
import accord.local.TimeService;
import accord.messages.ReplyContext;
import accord.primitives.Keys;
+import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Status;
@@ -62,7 +63,6 @@ import accord.utils.async.AsyncChains;
import accord.utils.async.Cancellable;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.RequestTimeoutException;
-import org.apache.cassandra.metrics.AccordCoordinatorMetrics;
import org.apache.cassandra.metrics.AccordReplicaMetrics;
import org.apache.cassandra.net.ResponseContext;
import org.apache.cassandra.service.accord.AccordService;
@@ -98,6 +98,7 @@ public class AccordAgent implements Agent,
OwnershipEventListener
{
private static final Logger logger =
LoggerFactory.getLogger(AccordAgent.class);
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, MINUTES);
+ private static final ReplicaEventListener replicaEventListener = new
AccordReplicaMetrics.Listener();
private static BiConsumer<TxnId, Throwable> onFailedBarrier;
public static void setOnFailedBarrier(BiConsumer<TxnId, Throwable>
newOnFailedBarrier) { onFailedBarrier = newOnFailedBarrier; }
@@ -121,9 +122,9 @@ public class AccordAgent implements Agent,
OwnershipEventListener
}
@Override
- public @Nullable Tracing trace(TxnId txnId, TraceEventType eventType)
+ public @Nullable Tracing trace(TxnId txnId, Participants<?> participants,
Coordination.CoordinationKind eventType)
{
- return tracing.trace(txnId, eventType);
+ return tracing.trace(txnId, participants, eventType);
}
@Override
@@ -225,13 +226,13 @@ public class AccordAgent implements Agent,
OwnershipEventListener
@Override
public CoordinatorEventListener coordinatorEvents()
{
- return AccordCoordinatorMetrics.Listener.instance;
+ return tracing;
}
@Override
public ReplicaEventListener replicaEvents()
{
- return AccordReplicaMetrics.Listener.instance;
+ return replicaEventListener;
}
private static final long ONE_SECOND = SECONDS.toMicros(1L);
diff --git a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
index b71be2f333..88a32babd2 100644
--- a/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/TokenKey.java
@@ -149,12 +149,15 @@ public final class TokenKey extends AccordRoutableKey
implements RoutingKey, Ran
public static TokenKey parse(String str, IPartitioner partitioner)
{
- TableId tableId;
- {
- int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0);
- tableId = TableId.fromString(str.substring(0, split));
- str = str.substring(split + 1);
- }
+
+ int split = str.indexOf(':', str.startsWith("tid:") ? 4 : 0);
+ TableId tableId = TableId.fromString(str.substring(0, split));
+ str = str.substring(split + 1);
+ return parse(tableId, str, partitioner);
+ }
+
+ public static TokenKey parse(TableId tableId, String str, IPartitioner
partitioner)
+ {
if (str.endsWith("Inf"))
{
return new TokenKey(tableId, str.charAt(0) == '-' ?
MIN_TABLE_SENTINEL : MAX_TABLE_SENTINEL, partitioner.getMinimumToken());
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 0889ebffe5..c69d205517 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.virtual;
import java.util.Collections;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -127,16 +128,16 @@ public class AccordDebugKeyspaceTest extends CQLTester
String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ?
AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.JOURNAL);
private static final String SET_TRACE =
- String.format("UPDATE %s.%s SET permits = ? WHERE txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACE);
+ String.format("UPDATE %s.%s SET bucket_size = ?, trace_events = ?
WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACE);
private static final String SET_TRACE_REMOTE =
- String.format("UPDATE %s.%s SET permits = ? WHERE node_id = ? AND
txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+ String.format("UPDATE %s.%s SET bucket_size = ?, trace_events = ?
WHERE node_id = ? AND txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
private static final String QUERY_TRACE =
- String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type =
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
+ String.format("SELECT txn_id, bucket_size, trace_events FROM %s.%s
WHERE txn_id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACE);
private static final String QUERY_TRACE_REMOTE =
- String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ?
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+ String.format("SELECT node_id, txn_id, bucket_size, trace_events FROM
%s.%s WHERE node_id = ? AND txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE);
private static final String UNSET_TRACE1 =
String.format("DELETE FROM %s.%s WHERE txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
@@ -144,33 +145,27 @@ public class AccordDebugKeyspaceTest extends CQLTester
private static final String UNSET_TRACE1_REMOTE =
String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE);
- private static final String UNSET_TRACE2 =
- String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
-
- private static final String UNSET_TRACE2_REMOTE =
- String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+ private static final String QUERY_ALL_TRACES =
+ String.format("SELECT * FROM %s.%s WHERE txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
private static final String QUERY_TRACES =
- String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type =
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
+ String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
private static final String QUERY_TRACES_REMOTE =
- String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ?
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ?
AND event = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
private static final String ERASE_TRACES1 =
- String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?
AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACES);
+ String.format("DELETE FROM %s.%s WHERE txn_id = ? AND id_micros < ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
private static final String ERASE_TRACES1_REMOTE =
- String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
-
- private static final String ERASE_TRACES2 =
- String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
-
- private static final String ERASE_TRACES2_REMOTE =
- String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
private static final String ERASE_TRACES3 =
String.format("DELETE FROM %s.%s WHERE txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
+ private static final String TRUNCATE_TRACES =
+ String.format("TRUNCATE %s.%s", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACES);
+
private static final String ERASE_TRACES3_REMOTE =
String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES);
@@ -192,6 +187,15 @@ public class AccordDebugKeyspaceTest extends CQLTester
private static final String
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE =
String.format("SELECT * FROM %s.%s WHERE node_id = ? AND shard_applied
>= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.REDUNDANT_BEFORE);
+ private static final String SET_PATTERN_TRACE =
+ String.format("UPDATE %s.%s SET bucket_mode = ?, bucket_seen = ?,
bucket_size = ?, chance = ?, if_intersects = ?, if_kind = ?, on_failure = ?,
on_new = ?, trace_bucket_mode = ?, trace_bucket_size = ?, trace_bucket_sub_size
= ?, trace_events = ? WHERE id = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_PATTERN_TRACE);
+
+ private static final String UNSET_PATTERN_TRACE =
+ String.format("DELETE FROM %s.%s WHERE id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE);
+
+ private static final String QUERY_PATTERN_TRACE =
+ String.format("SELECT * FROM %s.%s WHERE id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_PATTERN_TRACE);
+
@BeforeClass
public static void setUpClass()
{
@@ -239,40 +243,31 @@ public class AccordDebugKeyspaceTest extends CQLTester
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
filter.appliesTo(id);
- execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
- execute(SET_TRACE, 0, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
- execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
+ execute(SET_TRACE, 1, "{WaitProgress}", id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
+ execute(SET_TRACE, 0, "{}", id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()));
+ execute(SET_TRACE, 1, "{WaitProgress}", id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
execute(UNSET_TRACE1, id.toString());
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
- execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
- execute(UNSET_TRACE2, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
- execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
- assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
- assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
- filter.appliesTo(id);
+ assertRows(execute(QUERY_TRACE, id.toString()));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()));
+ execute(SET_TRACE, 1, "{WaitProgress}", id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
accord.node().coordinate(id, txn).beginAsResult();
filter.preAccept.awaitThrowUncheckedOnInterrupt();
filter.apply.awaitThrowUncheckedOnInterrupt();
- spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES,
id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0));
- spinUntilSuccess(() ->
Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS").size()).isGreaterThan(0));
- execute(ERASE_TRACES1, id.toString(), "FETCH", Long.MAX_VALUE);
- execute(ERASE_TRACES2, id.toString(), "FETCH");
- execute(ERASE_TRACES1, id.toString(), "WAIT_PROGRESS",
Long.MAX_VALUE);
- Assertions.assertThat(execute(QUERY_TRACES, id.toString(),
"WAIT_PROGRESS").size()).isEqualTo(0);
- Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId,
id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0);
+ spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES,
id.toString(), "WaitProgress").size()).isGreaterThan(0));
+ spinUntilSuccess(() ->
Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(),
"WaitProgress").size()).isGreaterThan(0));
+ execute(ERASE_TRACES1, id.toString(), Long.MAX_VALUE);
+ execute(ERASE_TRACES1, id.toString(), Long.MAX_VALUE);
+ Assertions.assertThat(execute(QUERY_TRACES, id.toString(),
"WaitProgress").size()).isEqualTo(0);
+ Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId,
id.toString(), "WaitProgress").size()).isEqualTo(0);
// just check other variants don't fail
- execute(ERASE_TRACES2, id.toString(), "WAIT_PROGRESS");
execute(ERASE_TRACES3, id.toString());
}
@@ -280,6 +275,137 @@ public class AccordDebugKeyspaceTest extends CQLTester
{
MessagingService.instance().outboundSink.remove(filter);
}
+
+ filter = new AccordMsgFilter();
+ MessagingService.instance().outboundSink.add(filter);
+ try
+ {
+ TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 1, 1, 1);
+ filter.appliesTo(id);
+
+ execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId,
id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
+ execute(SET_TRACE_REMOTE, 0, "{}", nodeId, id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()));
+ execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId,
id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
+ execute(UNSET_TRACE1_REMOTE, nodeId, id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()));
+ execute(SET_TRACE_REMOTE, 1, "{WaitProgress}", nodeId,
id.toString());
+ assertRows(execute(QUERY_TRACE, id.toString()), row(id.toString(),
1, "{WaitProgress}"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString()),
row(nodeId, id.toString(), 1, "{WaitProgress}"));
+ accord.node().coordinate(id, txn).beginAsResult();
+ filter.preAccept.awaitThrowUncheckedOnInterrupt();
+ filter.apply.awaitThrowUncheckedOnInterrupt();
+ spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES,
id.toString(), "WaitProgress").size()).isGreaterThan(0));
+ spinUntilSuccess(() ->
Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(),
"WaitProgress").size()).isGreaterThan(0));
+ execute(ERASE_TRACES1_REMOTE, nodeId, id.toString(),
Long.MAX_VALUE);
+ execute(ERASE_TRACES1_REMOTE, nodeId, id.toString(),
Long.MAX_VALUE);
+ Assertions.assertThat(execute(QUERY_TRACES, id.toString(),
"WaitProgress").size()).isEqualTo(0);
+ Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId,
id.toString(), "WaitProgress").size()).isEqualTo(0);
+ // just check other variants don't fail
+ execute(ERASE_TRACES3_REMOTE, nodeId, id.toString());
+
+ }
+ finally
+ {
+ MessagingService.instance().outboundSink.remove(filter);
+ }
+ }
+
+ @Test
+ public void patternTracing()
+ {
+ // simple test to confirm basic tracing functionality works, doesn't
validate specific behaviours only requesting/querying/erasing
+ String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ AccordService accord = accord();
+ DatabaseDescriptor.getAccord().fetch_txn = "1s";
+
+ execute(SET_PATTERN_TRACE, "leaky", 0, 5, 1.0f, "tid:1:1|tid:1:2",
"-{*X}", "-{WaitProgress}", "{}", "ring", 5, 1, "*", 1);
+ assertRows(execute(QUERY_PATTERN_TRACE, 1), row(1, "LEAKY", 0, 5,
1.0f, 0, "tid:1:1|tid:1:2", "-{KX,RX}", "-{WaitProgress}", "{}", "RING", 5, 1,
"*"));
+ execute(UNSET_PATTERN_TRACE, 1);
+ assertRows(execute(QUERY_PATTERN_TRACE, 1));
+
+ RoutingKey matchKey;
+ {
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+ matchKey = (RoutingKey) txn.keys().toParticipants().get(0);
+ }
+
+ int count = 5;
+ {
+ List<TxnId> txnIds = new ArrayList<>();
+ execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f,
matchKey.toString(), "*", "{}", "*", "leaky", 1, 1, "*", 1);
+ for (int i = 0 ; i < count + 1 ; ++i)
+ {
+ TxnId id =
accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO
%s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, i, 0);
+ getBlocking(accord.node().coordinate(id, txn));
+ if (i < count) assertRows(execute(QUERY_TRACE, id.toString()),
row(id.toString(), 1, "*"));
+ else assertRows(execute(QUERY_TRACE, id.toString()));
+ txnIds.add(id);
+ }
+
+ execute(UNSET_PATTERN_TRACE, 1);
+ for (int i = 0 ; i < count ; ++i)
+ assertRows(execute(QUERY_TRACE, txnIds.get(i).toString()));
+ }
+
+ {
+ execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f,
matchKey.asRange().toString(), "{KE}", "{}", "{PreAccept}", "leaky", 1, 1, "*",
1);
+ for (int i = 0 ; i < count ; ++i)
+ {
+ TxnId id =
accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO
%s.%s(k, c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, i, 0);
+ getBlocking(accord.node().coordinate(id, txn));
+ assertRows(execute(QUERY_TRACE, id.toString()));
+ }
+
+ List<TxnId> txnIds = new ArrayList<>();
+ for (int i = 0 ; i < count + 1 ; ++i)
+ {
+ TxnId id =
accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.EphemeralRead,
Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("SELECT * FROM
%s.%s WHERE k = ? AND c = ?", KEYSPACE, tableName)), 0, i);
+ getBlocking(accord.node().coordinate(id, txn));
+ if (i < count) assertRows(execute(QUERY_TRACE, id.toString()),
row(id.toString(), 1, "*"));
+ else assertRows(execute(QUERY_TRACE, id.toString()));
+ txnIds.add(id);
+ }
+
+ execute(UNSET_PATTERN_TRACE, 1);
+ for (int i = 0 ; i < count ; ++i)
+ assertRows(execute(QUERY_TRACE, txnIds.get(i).toString()));
+ }
+
+ {
+ TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 1, 1, 1);
+ execute(SET_PATTERN_TRACE, "leaky", 0, count, 1.0f, "" +
txn.keys().get(0).toUnseekable(), "{KW}", "*", "{}", "leaky", 1, 1, "{}", 1);
+
+ AccordMsgFilter filter = new AccordMsgFilter();
+ filter.dropVerbs = EnumSet.allOf(Verb.class);
+ filter.appliesTo(id);
+ MessagingService.instance().outboundSink.add(filter);
+ try
+ {
+ boolean failed = false;
+ try { getBlocking(accord.node().coordinate(id, txn)); }
+ catch (Throwable ignore) { failed = true; }
+ Assertions.assertThat(failed).isTrue();
+ }
+ finally
+ {
+ MessagingService.instance().outboundSink.remove(filter);
+ }
+
+ spinUntilSuccess(() ->
Assertions.assertThat(execute(QUERY_ALL_TRACES,
id.toString()).size()).isGreaterThan(0), 60);
+ execute(UNSET_PATTERN_TRACE, 1);
+ }
}
@Test
@@ -332,19 +458,30 @@ public class AccordDebugKeyspaceTest extends CQLTester
String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
AccordService accord = accord();
int nodeId = accord.nodeId().id;
+ AccordMsgFilter filter = new AccordMsgFilter();
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c,
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
- String keyStr = txn.keys().get(0).toUnseekable().toString();
- getBlocking(accord.node().coordinate(id, txn));
-
- spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY,
id.toString()),
- row(id.toString(), anyInt(), 0, "",
"", any(), anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(),
SaveStatus.Applied.name()))));
- assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(),
"Applied"));
- assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()),
row(id.toString(), "Applied"));
- assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(),
"PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"),
row(id.toString(), null));
- assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()),
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"),
row(id.toString(), "Applied"), row(id.toString(), null));
- assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(),
"APPLIED_DURABLE"));
- assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr),
row(id.toString(), "APPLIED_DURABLE"));
+ filter.appliesTo(id);
+ filter.dropVerbs = Set.of();
+ MessagingService.instance().outboundSink.add(filter);
+ try
+ {
+ String keyStr = txn.keys().get(0).toUnseekable().toString();
+ getBlocking(accord.node().coordinate(id, txn));
+ filter.apply.awaitThrowUncheckedOnInterrupt();
+ spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY,
id.toString()),
+ row(id.toString(), anyInt(), 0,
"", "", any(), anyOf(SaveStatus.ReadyToExecute.name(),
SaveStatus.Applying.name(), SaveStatus.Applied.name()))));
+ assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(),
anyOf("Applied", "Applying")));
+ assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()),
row(id.toString(), anyOf("Applied", "Applying")));
+ assertRows(execute(QUERY_JOURNAL, id.toString()),
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"),
row(id.toString(), "Applied"), row(id.toString(), null));
+ assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()),
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"),
row(id.toString(), "Applied"), row(id.toString(), null));
+ assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr),
row(id.toString(), "APPLIED_DURABLE"));
+ assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr),
row(id.toString(), "APPLIED_DURABLE"));
+ }
+ finally
+ {
+ MessagingService.instance().outboundSink.remove(filter);
+ }
}
@Test
@@ -503,13 +640,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
@Test
public void patchJournalVestigialTest()
{
- testPatchJournal("ERASE_VESTIGIAL", "Vestigial");
+ testPatchJournal("LOCALLY_ERASE_VESTIGIAL", "Vestigial");
}
@Test
public void patchJournalInvalidateTest()
{
- testPatchJournal("INVALIDATE", "Invalidated");
+ testPatchJournal("LOCALLY_INVALIDATE", "Invalidated");
}
@Test
@@ -520,9 +657,8 @@ public class AccordDebugKeyspaceTest extends CQLTester
testPatchJournal("ERASE", "Erased");
Assert.fail("Should have thrown");
}
- catch (Throwable t)
+ catch (InvalidRequestException t)
{
- Assert.assertTrue(t.getMessage().contains("No enum constant"));
}
}
@@ -630,6 +766,4 @@ public class AccordDebugKeyspaceTest extends CQLTester
return !dropVerbs.contains(msg.verb());
}
}
-
-
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]