This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b712cfb930 RemoteToLocalVirtualKeyspace: supporting access to all
nodes' local virtual tables from any node in the cluster
b712cfb930 is described below
commit b712cfb930106972904e6fdb381c030bc649b554
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Sep 13 10:50:32 2025 +0100
RemoteToLocalVirtualKeyspace: supporting access to all nodes' local virtual
tables from any node in the cluster
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20900
---
.../apache/cassandra/db/filter/ColumnFilter.java | 59 +-
.../cassandra/db/filter/ColumnSubselection.java | 14 +
.../org/apache/cassandra/db/filter/RowFilter.java | 50 +-
.../db/virtual/AccordDebugRemoteKeyspace.java | 31 +
.../db/virtual/RemoteToLocalVirtualKeyspace.java | 35 ++
.../db/virtual/RemoteToLocalVirtualTable.java | 625 +++++++++++++++++++++
.../cassandra/db/virtual/VirtualMutation.java | 38 ++
.../apache/cassandra/db/virtual/VirtualTable.java | 2 +
.../cassandra/exceptions/ExceptionSerializer.java | 2 +-
.../cassandra/exceptions/RequestFailure.java | 3 +-
.../apache/cassandra/locator/RemoteStrategy.java | 29 +
src/java/org/apache/cassandra/net/Verb.java | 3 +
.../apache/cassandra/schema/SchemaConstants.java | 3 +-
.../apache/cassandra/service/CassandraDaemon.java | 4 +
.../db/virtual/AccordDebugKeyspaceTest.java | 138 ++++-
15 files changed, 1025 insertions(+), 11 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index bc17dd0158..86d66c59ff 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.btree.BTree;
/**
* Represents which (non-PK) columns (and optionally which sub-part of a
column for complex columns) are selected
@@ -64,7 +65,6 @@ import org.apache.cassandra.schema.TableMetadata;
*/
public abstract class ColumnFilter
{
-
public static final ColumnFilter NONE =
selection(RegularAndStaticColumns.NONE);
public static final Serializer serializer = new Serializer();
@@ -305,6 +305,20 @@ public abstract class ColumnFilter
return false;
}
+ /**
+ * Rebinds matching columns into a new filter; ignores any missing but
fails if any are a different type
+ */
+ abstract ColumnFilter rebind(TableMetadata newTable);
+
+ public static ColumnFilter rebindVirtual(ColumnFilter filter,
TableMetadata newTable)
+ {
+ // review feedback; nothing actually preventing its use with other
tables,
+ // but unclear utility/rationale so just some protection against
incorrect usage
+ if (!newTable.isVirtual())
+ throw new UnsupportedOperationException("This feature is intended
only to be used with virtual keyspaces");
+ return filter.rebind(newTable);
+ }
+
/**
* Returns the CQL string corresponding to this {@code ColumnFilter}.
*
@@ -630,6 +644,12 @@ public abstract class ColumnFilter
return true;
}
+ @Override
+ ColumnFilter rebind(TableMetadata newTable)
+ {
+ return new WildCardColumnFilter(ColumnFilter.rebind(newTable,
fetchedAndQueried));
+ }
+
@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection>
subSelections()
{
@@ -779,6 +799,21 @@ public abstract class ColumnFilter
return new
Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
}
+ @Override
+ ColumnFilter rebind(TableMetadata newTable)
+ {
+ RegularAndStaticColumns queried = ColumnFilter.rebind(newTable,
this.queried);
+ RegularAndStaticColumns fetched = this.queried == this.fetched ?
queried : ColumnFilter.rebind(newTable, this.fetched);
+ SortedSetMultimap<ColumnIdentifier, ColumnSubselection>
subSelections = null;
+ if (this.subSelections != null)
+ {
+ subSelections = TreeMultimap.create();
+ for (Map.Entry<ColumnIdentifier, ColumnSubselection> e :
this.subSelections.entries())
+ subSelections.put(e.getKey(),
e.getValue().rebind(newTable));
+ }
+ return new SelectionColumnFilter(fetchingStrategy, queried,
fetched, subSelections);
+ }
+
@Override
protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection>
subSelections()
{
@@ -1003,4 +1038,26 @@ public abstract class ColumnFilter
return size;
}
}
+
+ private static RegularAndStaticColumns rebind(TableMetadata newTable,
RegularAndStaticColumns columns)
+ {
+ return new RegularAndStaticColumns(rebind(newTable, columns.statics),
rebind(newTable, columns.regulars));
+ }
+
+ private static Columns rebind(TableMetadata newTable, Columns columns)
+ {
+ if (columns.isEmpty())
+ return columns;
+
+ try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
+ {
+ for (ColumnMetadata in : columns)
+ {
+ ColumnMetadata out = newTable.getColumn(in.name);
+ if (out != null)
+ builder.add(out);
+ }
+ return Columns.from(builder);
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
index 459b636a5a..b4f0346c76 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -76,6 +76,8 @@ public abstract class ColumnSubselection implements
Comparable<ColumnSubselectio
protected abstract CellPath comparisonPath();
+ protected abstract ColumnSubselection rebind(TableMetadata newTable);
+
public int compareTo(ColumnSubselection other)
{
assert other.column().name.equals(column().name);
@@ -118,6 +120,12 @@ public abstract class ColumnSubselection implements
Comparable<ColumnSubselectio
return from;
}
+ @Override
+ protected ColumnSubselection rebind(TableMetadata newTable)
+ {
+ return new Slice(newTable.getColumn(column.name), from, to);
+ }
+
public int compareInclusionOf(CellPath path)
{
Comparator<CellPath> cmp = column.cellPathComparator();
@@ -160,6 +168,12 @@ public abstract class ColumnSubselection implements
Comparable<ColumnSubselectio
return element;
}
+ @Override
+ protected ColumnSubselection rebind(TableMetadata newTable)
+ {
+ return new Element(newTable.getColumn(column.name), element);
+ }
+
public int compareInclusionOf(CellPath path)
{
return column.cellPathComparator().compare(path, element);
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 12c98c7278..16ba694e14 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -141,7 +141,7 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
add(new CustomExpression(metadata, targetIndex, value));
}
- private void add(Expression expression)
+ public void add(Expression expression)
{
expression.validate();
expressions.add(expression);
@@ -549,6 +549,28 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
"Index expression values may not be larger than 64K");
}
+ /**
+ * Rebind this expression to a table metadata that is expected to have
equivalent columns.
+ * If any referenced column is missing, returns null;
+ * if any referenced column has a different type throws an exception
+ */
+ public Expression rebind(TableMetadata newTable)
+ {
+ throw new UnsupportedOperationException("Expression " +
toString(true) + " does not support rebinding to another table definition");
+ }
+
+ protected static ColumnMetadata rebind(ColumnMetadata in,
TableMetadata newTable)
+ {
+ ColumnMetadata out = newTable.getColumn(in.name);
+ if (out == null)
+ return null;
+
+ if (!out.type.equals(in.type) &&
!out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
+ throw new IllegalArgumentException("The provided TableMetadata
is not compatible with the expression");
+
+ return out;
+ }
+
/**
* Returns whether the provided row satisfied this expression or not.
*
@@ -734,6 +756,16 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
super(column, operator, value);
}
+ @Override
+ public Expression rebind(TableMetadata newTable)
+ {
+ ColumnMetadata out = rebind(column, newTable);
+ if (out == null)
+ return null;
+
+ return new SimpleExpression(out, operator, value);
+ }
+
@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey
partitionKey, Row row, long nowInSec)
{
@@ -853,6 +885,16 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
checkBindValueSet(value, "Unsupported unset map value for column
%s", column.name);
}
+ @Override
+ public Expression rebind(TableMetadata newTable)
+ {
+ ColumnMetadata out = rebind(column, newTable);
+ if (out == null)
+ return null;
+
+ return new MapElementExpression(out, key, operator, value);
+ }
+
@Override
public ByteBuffer getIndexValue()
{
@@ -978,6 +1020,12 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
return Kind.CUSTOM;
}
+ @Override
+ public Expression rebind(TableMetadata newTable)
+ {
+ return new CustomExpression(table, targetIndex, value);
+ }
+
// Filtering by custom expressions isn't supported yet, so just accept
any row
@Override
public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey
partitionKey, Row row, long nowInSec)
diff --git
a/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java
new file mode 100644
index 0000000000..8698b3ccd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.schema.SchemaConstants;
+
+public class AccordDebugRemoteKeyspace extends RemoteToLocalVirtualKeyspace
+{
+ public static final AccordDebugRemoteKeyspace instance = new
AccordDebugRemoteKeyspace(SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.instance);
+
+ public AccordDebugRemoteKeyspace(String name, VirtualKeyspace wrap)
+ {
+ super(name, wrap);
+ }
+}
diff --git
a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java
new file mode 100644
index 0000000000..171f494036
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class RemoteToLocalVirtualKeyspace extends VirtualKeyspace
+{
+ public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap)
+ {
+ this(name, wrap, ignore -> true);
+ }
+
+ public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap,
Predicate<VirtualTable> include)
+ {
+ super(name, wrap.tables().stream().filter(include).map(vt -> new
RemoteToLocalVirtualTable(name, vt)).collect(Collectors.toList()));
+ }
+}
diff --git
a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java
new file mode 100644
index 0000000000..837c1feeb1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.function.Function;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.BufferClusteringBound;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Promise;
+import org.apache.cassandra.utils.concurrent.SyncPromise;
+
+import static org.apache.cassandra.db.ClusteringBound.BOTTOM;
+import static org.apache.cassandra.db.ClusteringBound.TOP;
+import static org.apache.cassandra.db.ClusteringBound.boundKind;
+import static org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts.ALLOW;
+import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.SORTED;
+
+public class RemoteToLocalVirtualTable extends AbstractLazyVirtualTable
+{
+ private static final int MAX_CONCURRENCY = 8;
+ final TableMetadata local;
+ final boolean allowFilteringImplicitly;
+ final boolean allowFilteringLocalPartitionKeysImplicitly;
+
+ public RemoteToLocalVirtualTable(String keyspace, VirtualTable
virtualTable)
+ {
+ super(wrap(keyspace, virtualTable.name(), virtualTable.metadata()),
virtualTable instanceof AbstractLazyVirtualTable ? ((AbstractLazyVirtualTable)
virtualTable).onTimeout() : OnTimeout.FAIL, virtualTable.sorted(), SORTED);
+ this.local = virtualTable.metadata();
+ this.allowFilteringImplicitly =
virtualTable.allowFilteringImplicitly();
+ this.allowFilteringLocalPartitionKeysImplicitly =
virtualTable.allowFilteringPrimaryKeysImplicitly();
+ }
+
+ @Override
+ public boolean allowFilteringImplicitly()
+ {
+ return allowFilteringImplicitly;
+ }
+
+ @Override
+ public boolean allowFilteringPrimaryKeysImplicitly()
+ {
+ return true;
+ }
+
+ private static TableMetadata wrap(String keyspace, String name,
TableMetadata local)
+ {
+ if (local.partitionKeyColumns().size() != 1 &&
!(local.partitionKeyType instanceof CompositeType))
+ throw new IllegalArgumentException("Underlying table must have a
single partition key, else use CompositeType for its partitioner");
+ TableMetadata.Builder builder = TableMetadata.builder(keyspace, name);
+ builder.partitioner(new LocalPartitioner(Int32Type.instance));
+ builder.addPartitionKeyColumn("node_id", Int32Type.instance);
+ for (ColumnMetadata cm : local.partitionKeyColumns())
+ builder.addClusteringColumn(cm.name, cm.type, cm.getMask(),
cm.getColumnConstraints());
+ for (ColumnMetadata cm : local.clusteringColumns())
+ builder.addClusteringColumn(cm.name, cm.type, cm.getMask(),
cm.getColumnConstraints());
+ // we don't add static columns as they can't be modelled correctly
with the insertion of a prefix partition column
+ for (ColumnMetadata cm : local.regularColumns())
+ {
+ if (cm.isComplex())
+ throw new IllegalArgumentException("Complex columns are not
currently supported by " + RemoteToLocalVirtualTable.class.getSimpleName());
+ builder.addRegularColumn(cm.name, cm.type, cm.getMask(),
cm.getColumnConstraints());
+ }
+ builder.kind(TableMetadata.Kind.VIRTUAL);
+ return builder.build();
+ }
+
+ @Override
+ protected void collect(PartitionsCollector collector)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ NavigableSet<NodeId> matchingIds = cm.directory.peerIds();
+ DataRange dataRange = collector.dataRange();
+ AbstractBounds<PartitionPosition> bounds = dataRange.keyRange();
+ {
+ NodeId start = null;
+ if (!bounds.left.isMinimum())
+ {
+ if (!(bounds.left instanceof DecoratedKey))
+ throw new InvalidRequestException(metadata + " does not
support filtering by token or incomplete partition keys");
+ start = new NodeId(Int32Type.instance.compose(((DecoratedKey)
bounds.left).getKey()));
+ }
+ NodeId end = null;
+ if (!bounds.right.isMinimum())
+ {
+ if (!(bounds.right instanceof DecoratedKey))
+ throw new InvalidRequestException(metadata + " does not
support filtering by token or incomplete partition keys");
+ end = new NodeId(Int32Type.instance.compose(((DecoratedKey)
bounds.right).getKey()));
+ }
+ if (start != null && end != null) matchingIds =
matchingIds.subSet(start, bounds.isStartInclusive(), end,
bounds.isEndInclusive());
+ else if (start != null) matchingIds = matchingIds.tailSet(start,
bounds.isStartInclusive());
+ else if (end != null) matchingIds = matchingIds.headSet(end,
bounds.isEndInclusive());
+ }
+ if (dataRange.isReversed())
+ matchingIds = matchingIds.descendingSet();
+
+ RowFilter rowFilter = rebind(local, collector.rowFilter());
+ ColumnFilter columnFilter =
ColumnFilter.rebindVirtual(collector.columnFilter(), local);
+ // TODO (expected): count this down as we progress where possible (or
have AbstractLazyVirtualTable do it for us)
+ DataLimits limits = collector.limits();
+
+ Function<DecoratedKey, ByteBuffer[]> pksToCks =
partitionKeyToClusterings(metadata, local);
+ ArrayDeque<RequestAndResponse> pending = new ArrayDeque<>();
+ matchingIds.forEach(id -> {
+ InetAddressAndPort endpoint = cm.directory.endpoint(id);
+ DecoratedKey remoteKey =
metadata.partitioner.decorateKey(Int32Type.instance.decompose(id.id()));
+ ClusteringIndexFilter filter =
dataRange.clusteringIndexFilter(remoteKey);
+ Slices slices = filter.getSlices(metadata);
+
+ int i = 0, advance = 1, end = slices.size();
+ if (dataRange.isReversed())
+ {
+ i = slices.size() - 1;
+ end = -1;
+ advance = -1;
+ }
+
+ PartitionCollector partition = collector.partition(id.id());
+ while (i != end)
+ {
+ List<Request> request = rebind(local, slices.get(i),
dataRange.isReversed(), rowFilter, columnFilter);
+ for (Request send : request)
+ {
+ ReadCommand readCommand;
+ if
(send.dataRange.startKey().equals(send.dataRange.stopKey()) &&
!send.dataRange.startKey().isMinimum())
+ readCommand = SinglePartitionReadCommand.create(local,
collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits,
(DecoratedKey) send.dataRange.startKey(),
send.dataRange.clusteringIndexFilter(remoteKey), ALLOW);
+ else
+ readCommand = PartitionRangeReadCommand.create(local,
collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits,
send.dataRange);
+
+ RequestAndResponse rr = new RequestAndResponse(id,
partition, readCommand);
+ send(rr, endpoint);
+ pending.addLast(rr);
+
+ boolean selectsOneRow = selectsOneRow(local,
send.dataRange, remoteKey);
+ while (pending.size() >= (selectsOneRow ? 1 :
MAX_CONCURRENCY))
+ collect(collector, pending.pollFirst(), pksToCks);
+ }
+ i += advance;
+ }
+ });
+ while (!pending.isEmpty())
+ collect(collector, pending.pollFirst(), pksToCks);
+ }
+
+ private static class RequestAndResponse extends SyncPromise<ReadResponse>
+ {
+ final NodeId nodeId;
+ final PartitionCollector partition;
+ final ReadCommand readCommand;
+ private RequestAndResponse(NodeId nodeId, PartitionCollector
partition, ReadCommand readCommand)
+ {
+ this.nodeId = nodeId;
+ this.partition = partition;
+ this.readCommand = readCommand;
+ }
+ }
+
+ private static class Request
+ {
+ final DataRange dataRange;
+ final RowFilter rowFilter;
+ final ColumnFilter columnFilter;
+
+ private Request(DataRange dataRange, RowFilter rowFilter, ColumnFilter
columnFilter)
+ {
+ this.dataRange = dataRange;
+ this.rowFilter = rowFilter;
+ this.columnFilter = columnFilter;
+ }
+ }
+
+ private void send(RequestAndResponse rr, InetAddressAndPort endpoint)
+ {
+ send(Verb.READ_REQ, rr.readCommand, rr, endpoint);
+ }
+
+ private <Reply> Promise<Reply> send(Verb verb, Object payload,
InetAddressAndPort endpoint)
+ {
+ Promise<Reply> promise = new AsyncPromise<>();
+ send(verb, payload, promise, endpoint);
+ return promise;
+ }
+
+ private <Reply> void send(Verb verb, Object payload, Promise<Reply>
promise, InetAddressAndPort endpoint)
+ {
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ promise.trySuccess(null);
+ return;
+ }
+
+ // we have to send inline some of the MessagingService logic to
circumvent the requirement to use AbstractWriteResponseHandler
+ Message<?> message = Message.out(verb, payload);
+ RequestCallback<?> callback = new RequestCallback<Reply>()
+ {
+ @Override public void onResponse(Message<Reply> msg) {
promise.trySuccess(msg.payload); }
+ @Override public boolean invokeOnFailure() { return true; }
+ @Override public void onFailure(InetAddressAndPort from,
RequestFailure failure)
+ {
+ if (failure.failure == null) promise.tryFailure(new
RuntimeException(failure.reason.toString()));
+ else promise.tryFailure(failure.failure);
+ }
+ };
+
+ MessagingService.instance().sendWithCallback(message, endpoint,
callback);
+ }
+
+ private void collect(PartitionsCollector collector, RequestAndResponse rr,
Function<DecoratedKey, ByteBuffer[]> pksToCks)
+ {
+ if (!rr.awaitUntilThrowUncheckedOnInterrupt(collector.deadlineNanos()))
+ throw new InternalTimeoutException();
+
+ rr.rethrowIfFailed();
+ ReadResponse response = rr.getNow();
+ if (response == null)
+ {
+ ClientWarn.instance.warn("No response from " + rr.nodeId);
+ return;
+ }
+
+ int pkCount = local.partitionKeyColumns().size();
+ try (UnfilteredPartitionIterator partitions =
response.makeIterator(rr.readCommand))
+ {
+ while (partitions.hasNext())
+ {
+ try (UnfilteredRowIterator iter = partitions.next())
+ {
+ ByteBuffer[] clusterings =
pksToCks.apply(iter.partitionKey());
+ while (iter.hasNext())
+ {
+ Unfiltered next = iter.next();
+ if (!next.isRow())
+ throw new UnsupportedOperationException("Range
tombstones not supported");
+
+ Row row = (Row)next;
+ {
+ Clustering<?> clustering = row.clustering();
+ for (int j = 0 ; j < clustering.size(); ++j)
+ clusterings[pkCount + j] =
clustering.bufferAt(j);
+ }
+ rr.partition.collect(rows -> {
+ rows.add((Object[])clusterings)
+ .lazyCollect(columns -> {
+ row.forEach(cd -> {
+ Invariants.require(cd instanceof Cell);
+
columns.add(cd.column().name.toString(), ((Cell<?>) cd).buffer());
+ });
+ });
+ });
+ }
+ }
+ }
+ }
+ }
+
+ private static boolean selectsOneRow(TableMetadata metadata, DataRange
dataRange, DecoratedKey key)
+ {
+ if (dataRange.startKey().isMinimum() ||
!dataRange.startKey().equals(dataRange.stopKey()))
+ return false;
+
+ if (metadata.clusteringColumns().isEmpty())
+ return true;
+
+ Slices slices =
dataRange.clusteringIndexFilter(key).getSlices(metadata);
+ if (slices.size() != 1)
+ return false;
+
+ Slice slice = slices.get(0);
+ return slice.start().equals(slice.end());
+ }
+
+ private static Function<DecoratedKey, ByteBuffer[]>
partitionKeyToClusterings(TableMetadata distributed, TableMetadata local)
+ {
+ ByteBuffer[] cks = new
ByteBuffer[distributed.clusteringColumns().size()];
+ if (local.partitionKeyColumns().size() == 1)
+ {
+ return pk -> {
+ cks[0] = pk.getKey();
+ return cks.clone();
+ };
+ }
+
+ CompositeType type = (CompositeType) local.partitionKeyType;
+ int pkCount = type.types.size();
+ return (pk) -> {
+ System.arraycopy(type.split(pk.getKey()), 0, cks, 0, pkCount);
+ return cks.clone();
+ };
+ }
+
+ private static RowFilter rebind(TableMetadata local, RowFilter rowFilter)
+ {
+ if (rowFilter.isEmpty())
+ return rowFilter;
+
+ RowFilter result = RowFilter.create(false);
+ for (RowFilter.Expression in : rowFilter.getExpressions())
+ {
+ RowFilter.Expression out = in.rebind(local);
+ if (out != null)
+ result.add(out);
+ }
+ return result;
+ }
+
+ private List<Request> rebind(TableMetadata local, Slice slice, boolean
reversed, RowFilter rowFilter, ColumnFilter columnFilter)
+ {
+ ClusteringBound<?> start = slice.start();
+ ClusteringBound<?> end = slice.end();
+ int pkCount = local.partitionKeyColumns().size();
+ // TODO (expected): we can filter by partition key by inserting a new
row filter, but need to impose ALLOW FILTERING restrictions
+ if (((start.size() > 0 && start.size() < pkCount) || (end.size() > 0
&& end.size() < pkCount)))
+ {
+ if (!allowFilteringLocalPartitionKeysImplicitly)
+ throw new InvalidRequestException("Must specify full partition
key bounds for the underlying table");
+
+ List<ColumnMetadata> pks = local.partitionKeyColumns();
+ ByteBuffer[] starts = start.getBufferArray();
+ ByteBuffer[] ends = end.getBufferArray();
+
+ int minCount = Math.min(start.size(), end.size());
+ int maxCount = Math.max(start.size(), end.size());
+ int commonPrefixLength = 0;
+ while (commonPrefixLength < minCount && equalPart(start, end,
commonPrefixLength))
+ ++commonPrefixLength;
+
+ RowFilter commonRowFilter = rowFilter;
+ if (commonPrefixLength > 0)
+ {
+ commonRowFilter = copy(commonRowFilter);
+ for (int i = 0 ; i < commonPrefixLength ; ++i)
+ commonRowFilter.add(pks.get(i), Operator.EQ, starts[i]);
+ }
+
+ Operator lastStartOp = start.isInclusive() ? Operator.GTE :
Operator.GT;
+ Operator lastEndOp = end.isInclusive() ? Operator.LTE :
Operator.LT;
+ if (commonPrefixLength == Math.max(minCount, maxCount - 1))
+ {
+ // can simply add our remaining filters and continue on our way
+ addExpressions(commonRowFilter, pks, commonPrefixLength,
starts, Operator.GTE, lastStartOp);
+ addExpressions(commonRowFilter, pks, commonPrefixLength, ends,
Operator.LTE, lastEndOp);
+ return List.of(new
Request(DataRange.allData(local.partitioner), commonRowFilter, columnFilter));
+ }
+
+ throw new InvalidRequestException("This table currently does not
support the complex partial partition key filters implied for the underlying
table");
+ }
+
+ ByteBuffer[] startBuffers = start.getBufferArray();
+ PartitionPosition startBound;
+ if (start.size() == 0) startBound =
local.partitioner.getMinimumToken().minKeyBound();
+ else if (pkCount == 1) startBound =
local.partitioner.decorateKey(startBuffers[0]);
+ else startBound =
local.partitioner.decorateKey(CompositeType.build(ByteBufferAccessor.instance,
Arrays.copyOf(startBuffers, pkCount)));
+
+ ByteBuffer[] endBuffers = end.getBufferArray();
+ PartitionPosition endBound;
+ if (end.size() == 0) endBound =
local.partitioner.getMinimumToken().maxKeyBound();
+ else if (pkCount == 1) endBound =
local.partitioner.decorateKey(endBuffers[0]);
+ else endBound =
local.partitioner.decorateKey(CompositeType.build(ByteBufferAccessor.instance,
Arrays.copyOf(endBuffers, pkCount)));
+
+ AbstractBounds<PartitionPosition> bounds =
AbstractBounds.bounds(startBound, start.isEmpty() || start.size() > pkCount ||
start.isInclusive(),
+
endBound, end.isEmpty() || end.size() > pkCount || end.isInclusive());
+ boolean hasSlices = start.size() > pkCount || end.size() > pkCount;
+ if (!hasSlices)
+ return List.of(new Request(new DataRange(bounds, new
ClusteringIndexSliceFilter(Slices.ALL, reversed)), rowFilter, columnFilter));
+
+ ClusteringBound<?> startSlice = ClusteringBound.BOTTOM;
+ if (start.size() > pkCount)
+ startSlice = BufferClusteringBound.create(boundKind(true,
start.isInclusive()), Arrays.copyOfRange(startBuffers, pkCount,
startBuffers.length));
+
+ ClusteringBound<?> endSlice = ClusteringBound.TOP;
+ if (end.size() > pkCount)
+ endSlice = BufferClusteringBound.create(boundKind(false,
end.isInclusive()), Arrays.copyOfRange(endBuffers, pkCount, endBuffers.length));
+
+ if (startBound.equals(endBound))
+ return List.of(new Request(new DataRange(bounds, filter(local,
startSlice, endSlice, reversed)), rowFilter, columnFilter));
+
+ List<Request> result = new ArrayList<>(3);
+ if (startSlice != BOTTOM)
+ {
+ AbstractBounds<PartitionPosition> startBoundOnly =
AbstractBounds.bounds(startBound, true, startBound, true);
+ result.add(new Request(new DataRange(startBoundOnly, filter(local,
startSlice, TOP, reversed)), rowFilter, columnFilter));
+ }
+ result.add(new Request(new
DataRange(AbstractBounds.bounds(bounds.left, bounds.inclusiveLeft() &&
startSlice == BOTTOM,
+ bounds.right,
bounds.inclusiveRight() && endSlice == TOP),
+ new ClusteringIndexSliceFilter(Slices.ALL,
reversed)), rowFilter, columnFilter)
+ );
+ if (endSlice != TOP)
+ {
+ AbstractBounds<PartitionPosition> endBoundOnly =
AbstractBounds.bounds(endBound, true, endBound, true);
+ result.add(new Request(new DataRange(endBoundOnly, filter(local,
BOTTOM, endSlice, reversed)), rowFilter, columnFilter));
+ }
+ if (reversed)
+ Collections.reverse(result);
+ return result;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static boolean equalPart(ClusteringBound start, ClusteringBound
end, int i)
+ {
+ return 0 == start.accessor().compare(start.get(i), end.get(i),
end.accessor());
+ }
+
+ private static RowFilter copy(RowFilter copy)
+ {
+ RowFilter newRowFilter = RowFilter.create(false);
+ for (RowFilter.Expression expression : copy)
+ newRowFilter.add(expression);
+ return newRowFilter;
+ }
+
+ private static void addExpressions(RowFilter rowFilter,
List<ColumnMetadata> cms, int start, ByteBuffer[] values, Operator op, Operator
lastOp)
+ {
+ for (int i = start ; i < values.length ; ++i)
+ rowFilter.add(cms.get(i), i + 1 == values.length ? lastOp : op,
values[i]);
+ }
+
+ private static ClusteringIndexSliceFilter filter(TableMetadata metadata,
ClusteringBound<?> start, ClusteringBound<?> end, boolean reversed)
+ {
+ return new ClusteringIndexSliceFilter(Slices.with(metadata.comparator,
Slice.make(start, end)), reversed);
+ }
+
+ @Override
+ public void apply(PartitionUpdate update)
+ {
+ int nodeId =
Int32Type.instance.compose(update.partitionKey().getKey());
+ InetAddressAndPort endpoint =
ClusterMetadata.current().directory.endpoint(new NodeId(nodeId));
+ if (endpoint == null)
+ throw new InvalidRequestException("Unknown node " + nodeId);
+
+ DeletionInfo deletionInfo = update.deletionInfo();
+ if (!deletionInfo.getPartitionDeletion().isLive())
+ {
+ truncate(endpoint).syncThrowUncheckedOnInterrupt();
+ return;
+ }
+
+ int pkCount = local.partitionKeyColumns().size();
+ ByteBuffer[] pkBuffer, ckBuffer;
+ {
+ int ckCount = local.clusteringColumns().size();
+ pkBuffer = pkCount == 1 ? null : new ByteBuffer[pkCount];
+ ckBuffer = new ByteBuffer[ckCount];
+ }
+
+ PartitionUpdate.Builder builder = null;
+ ArrayDeque<Promise<Void>> results = new ArrayDeque<>();
+
+ if (deletionInfo.hasRanges())
+ {
+ Iterator<RangeTombstone> iterator =
deletionInfo.rangeIterator(false);
+ while (iterator.hasNext())
+ {
+ RangeTombstone rt = iterator.next();
+ ClusteringBound start = rt.deletedSlice().start();
+ ClusteringBound end = rt.deletedSlice().end();
+ if (start.size() < pkCount || end.size() < pkCount)
+ throw new InvalidRequestException("Range deletions must
specify a complete partition key in the underlying table " + metadata);
+
+ for (int i = 0 ; i < pkCount ; ++i)
+ {
+ if (0 != start.accessor().compare(start.get(i),
end.get(i), end.accessor()))
+ throw new InvalidRequestException("Range deletions
must specify a single partition key in the underlying table " + metadata);
+ }
+
+ DecoratedKey key = remoteClusteringToLocalPartitionKey(local,
start, pkCount, pkBuffer);
+ builder = maybeRolloverAndWait(key, builder, results,
endpoint);
+ if (start.size() == pkCount && end.size() == pkCount)
+ {
+ builder.addPartitionDeletion(rt.deletionTime());
+ }
+ else
+ {
+ start = ClusteringBound.create(start.kind(),
Clustering.make(remoteClusteringToLocalClustering(start.clustering(), pkCount,
ckBuffer)));
+ end = ClusteringBound.create(end.kind(),
Clustering.make(remoteClusteringToLocalClustering(end.clustering(), pkCount,
ckBuffer)));
+ builder.add(new RangeTombstone(Slice.make(start, end),
rt.deletionTime()));
+ }
+ }
+ }
+
+ if (!update.staticRow().isEmpty())
+ throw new InvalidRequestException("Static rows are not supported
for remote table " + metadata);
+
+ try (BTree.FastBuilder<ColumnData> columns = BTree.fastBuilder())
+ {
+ for (Row row : update)
+ {
+ Clustering<?> clustering = row.clustering();
+ DecoratedKey key = remoteClusteringToLocalPartitionKey(local,
clustering, pkCount, pkBuffer);
+ builder = maybeRolloverAndWait(key, builder, results,
endpoint);
+ Clustering<?> newClustering =
Clustering.make(remoteClusteringToLocalClustering(clustering, pkCount,
ckBuffer));
+ columns.reset();
+ for (ColumnData cd : row)
+ columns.add(rebind(local, cd));
+ builder.add(BTreeRow.create(newClustering,
row.primaryKeyLivenessInfo(), row.deletion(), columns.build()));
+ }
+ }
+
+ if (builder != null)
+ results.add(send(Verb.VIRTUAL_MUTATION_REQ, new
VirtualMutation(builder.build()), endpoint));
+
+ while (!results.isEmpty())
+ results.pollFirst().syncThrowUncheckedOnInterrupt();
+ }
+
+ private PartitionUpdate.Builder maybeRolloverAndWait(DecoratedKey key,
PartitionUpdate.Builder builder, ArrayDeque<Promise<Void>> waiting,
InetAddressAndPort endpoint)
+ {
+ if (builder == null || !builder.partitionKey().equals(key))
+ {
+ if (builder != null)
+ waiting.add(send(Verb.VIRTUAL_MUTATION_REQ, new
VirtualMutation(builder.build()), endpoint));
+ builder = new PartitionUpdate.Builder(local, key,
local.regularAndStaticColumns(), 8);
+ while (waiting.size() >= MAX_CONCURRENCY)
+ waiting.pollFirst().syncThrowUncheckedOnInterrupt();
+ }
+ return builder;
+ }
+
+ private Promise<Void> truncate(InetAddressAndPort endpoint)
+ {
+ return send(Verb.TRUNCATE_REQ, new TruncateRequest(local.keyspace,
local.name), endpoint);
+ }
+
+ private static ColumnData rebind(TableMetadata local, ColumnData cd)
+ {
+ ColumnMetadata column = local.getColumn(cd.column().name);
+
+ Invariants.require(column != null, cd.column() + " not found in " +
local);
+ Invariants.require(!column.isComplex(), "Complex column " + column + "
not supported; should have been removed from metadata");
+
+ return ((Cell<?>) cd).withUpdatedColumn(column);
+ }
+
+ private static DecoratedKey
remoteClusteringToLocalPartitionKey(TableMetadata local, ClusteringPrefix<?>
clustering, int pkCount, ByteBuffer[] pkBuffer)
+ {
+ ByteBuffer bytes;
+ if (pkCount == 1) bytes = clustering.bufferAt(0);
+ else
+ {
+ for (int i = 0 ; i < pkBuffer.length ; ++i)
+ pkBuffer[i] = clustering.bufferAt(i);
+ bytes = CompositeType.build(ByteBufferAccessor.instance, pkBuffer);
+ }
+ return local.partitioner.decorateKey(bytes);
+ }
+
+ private static ByteBuffer[]
remoteClusteringToLocalClustering(ClusteringPrefix<?> clustering, int pkCount,
ByteBuffer[] ckBuffer)
+ {
+ for (int i = pkCount ; i < clustering.size(); ++i)
+ ckBuffer[i - pkCount] = clustering.bufferAt(i);
+
+ return Arrays.copyOf(ckBuffer, clustering.size() - pkCount);
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
index ee98da29c1..41e9e96266 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.virtual;
+import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -26,12 +27,20 @@ import javax.annotation.Nullable;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
+import accord.utils.Invariants;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ClientState;
@@ -42,6 +51,35 @@ import org.apache.cassandra.service.ClientState;
*/
public final class VirtualMutation implements IMutation
{
+ public static final IVersionedSerializer<VirtualMutation> serializer = new
IVersionedSerializer<VirtualMutation>()
+ {
+ @Override
+ public void serialize(VirtualMutation t, DataOutputPlus out, int
version) throws IOException
+ {
+ Invariants.require(t.modifications.size() == 1);
+
PartitionUpdate.serializer.serialize(t.modifications.values().iterator().next(),
out, version);
+ }
+
+ @Override
+ public VirtualMutation deserialize(DataInputPlus in, int version)
throws IOException
+ {
+ PartitionUpdate update =
PartitionUpdate.serializer.deserialize(in, version,
DeserializationHelper.Flag.FROM_REMOTE);
+ return new VirtualMutation(update);
+ }
+
+ @Override
+ public long serializedSize(VirtualMutation t, int version)
+ {
+ Invariants.require(t.modifications.size() == 1);
+ return
PartitionUpdate.serializer.serializedSize(t.modifications.values().iterator().next(),
version);
+ }
+ };
+
+ public static final IVerbHandler<VirtualMutation> handler = message -> {
+ message.payload.apply();
+ MessagingService.instance().respond(NoPayload.noPayload, message);
+ };
+
private final String keyspaceName;
private final DecoratedKey partitionKey;
private final ImmutableMap<TableId, PartitionUpdate> modifications;
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index 3f05ed3a69..0423b12a4d 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -100,4 +100,6 @@ public interface VirtualTable
{
return allowFilteringImplicitly();
}
+
+ default Sorted sorted() { return Sorted.UNSORTED; }
}
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
b/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
index de379739a3..834abca49d 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
@@ -46,7 +46,7 @@ public class ExceptionSerializer
{
public static class RemoteException extends RuntimeException
{
- private final String originalClass;
+ public final String originalClass;
public RemoteException(String originalClass, String originalMessage,
StackTraceElement[] stackTrace)
{
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
index b9bba7fc70..03d088a239 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.tcm.NotCMSException;
import static com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.cassandra.exceptions.ExceptionSerializer.nullableRemoteExceptionSerializer;
+import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
/**
* Allow inclusion of a serialized exception in failure response messages
@@ -125,7 +126,7 @@ public class RequestFailure
if (t instanceof CoordinatorBehindException)
return COORDINATOR_BEHIND;
- return UNKNOWN;
+ return new RequestFailure(t);
}
public static RequestFailure forReason(RequestFailureReason reason)
diff --git a/src/java/org/apache/cassandra/locator/RemoteStrategy.java
b/src/java/org/apache/cassandra/locator/RemoteStrategy.java
new file mode 100644
index 0000000000..515462ee98
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RemoteStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Map;
+
+public class RemoteStrategy extends LocalStrategy
+{
+ public RemoteStrategy(String keyspaceName, Map<String, String>
configOptions)
+ {
+ super(keyspaceName, configOptions);
+ }
+}
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index d24c9e64ad..4a0ebb2c22 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.TruncateRequest;
import org.apache.cassandra.db.TruncateResponse;
import org.apache.cassandra.db.TruncateVerbHandler;
+import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2;
@@ -195,6 +196,8 @@ public enum Verb
{
MUTATION_RSP (60, P1, writeTimeout, REQUEST_RESPONSE, () ->
NoPayload.serializer, RESPONSE_HANDLER
),
MUTATION_REQ (0, P3, writeTimeout, MUTATION, () ->
Mutation.serializer, () -> MutationVerbHandler.instance,
MUTATION_RSP ),
+ VIRTUAL_MUTATION_RSP (200, P1, writeTimeout, REQUEST_RESPONSE, () ->
NoPayload.serializer, RESPONSE_HANDLER
),
+ VIRTUAL_MUTATION_REQ (201, P3, writeTimeout, MUTATION, () ->
VirtualMutation.serializer, () -> VirtualMutation.handler,
VIRTUAL_MUTATION_RSP),
HINT_RSP (61, P1, writeTimeout, REQUEST_RESPONSE, () ->
NoPayload.serializer, RESPONSE_HANDLER
),
HINT_REQ (1, P4, writeTimeout, MUTATION, () ->
HintMessage.serializer, () -> HintVerbHandler.instance,
HINT_RSP ),
READ_REPAIR_RSP (62, P1, writeTimeout, REQUEST_RESPONSE, () ->
NoPayload.serializer, RESPONSE_HANDLER
),
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java
b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 6e3cb7a72e..ada413537d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -57,6 +57,7 @@ public final class SchemaConstants
public static final String VIRTUAL_VIEWS = "system_views";
public static final String VIRTUAL_METRICS = "system_metrics";
public static final String VIRTUAL_ACCORD_DEBUG = "system_accord_debug";
+ public static final String VIRTUAL_ACCORD_DEBUG_REMOTE =
"system_accord_debug_remote";
public static final String DUMMY_KEYSPACE_OR_TABLE_NAME = "--dummy--";
@@ -66,7 +67,7 @@ public final class SchemaConstants
/* virtual table system keyspace names */
public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES =
- ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS);
+ ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS,
VIRTUAL_ACCORD_DEBUG, VIRTUAL_ACCORD_DEBUG_REMOTE);
/* replicate system keyspace names (the ones with a "true" replication
strategy) */
public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f4ed67a440..2f85e6e58e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.db.SystemKeyspaceMigrator41;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
import org.apache.cassandra.db.virtual.ExceptionsTable;
+import org.apache.cassandra.db.virtual.AccordDebugRemoteKeyspace;
import org.apache.cassandra.db.virtual.LogMessagesTable;
import org.apache.cassandra.db.virtual.SlowQueriesTable;
import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
@@ -559,7 +560,10 @@ public class CassandraDaemon
VirtualKeyspaceRegistry.instance.register(new
VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables()));
if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace)
+ {
VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
+
VirtualKeyspaceRegistry.instance.register(AccordDebugRemoteKeyspace.instance);
+ }
// Flush log messages to system_views.system_logs virtual table as
there were messages already logged
// before that virtual table was instantiated.
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 6cac2f25fc..0889ebffe5 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -52,6 +52,8 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.exceptions.ExceptionSerializer;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -88,54 +90,108 @@ public class AccordDebugKeyspaceTest extends CQLTester
private static final String QUERY_TXN_BLOCKED_BY =
String.format("SELECT * FROM %s.%s WHERE txn_id=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_BLOCKED_BY);
+ private static final String QUERY_TXN_BLOCKED_BY_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_BLOCKED_BY);
+
private static final String QUERY_COMMANDS_FOR_KEY =
String.format("SELECT txn_id, status FROM %s.%s WHERE key=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.COMMANDS_FOR_KEY);
+ private static final String QUERY_COMMANDS_FOR_KEY_REMOTE =
+ String.format("SELECT txn_id, status FROM %s.%s WHERE node_id = ? AND
key=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.COMMANDS_FOR_KEY);
+
private static final String QUERY_TXN =
String.format("SELECT txn_id, save_status FROM %s.%s WHERE txn_id=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN);
+ private static final String QUERY_TXN_REMOTE =
+ String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ?
AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN);
+
private static final String QUERY_TXNS =
String.format("SELECT save_status FROM %s.%s WHERE command_store_id =
? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN);
+ private static final String QUERY_TXNS_REMOTE =
+ String.format("SELECT save_status FROM %s.%s WHERE node_id = ? AND
command_store_id = ? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN);
+
private static final String QUERY_TXNS_SEARCH =
String.format("SELECT save_status FROM %s.%s WHERE command_store_id =
? AND txn_id > ? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN);
+ private static final String QUERY_TXNS_SEARCH_REMOTE =
+ String.format("SELECT save_status FROM %s.%s WHERE node_id = ? AND
command_store_id = ? AND txn_id > ? LIMIT 5",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN);
+
private static final String QUERY_JOURNAL =
String.format("SELECT txn_id, save_status FROM %s.%s WHERE txn_id=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.JOURNAL);
+ private static final String ERASE_JOURNAL_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND
command_store_id = ? AND txn_id=?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.JOURNAL);
+
+ private static final String QUERY_JOURNAL_REMOTE =
+ String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ?
AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.JOURNAL);
+
private static final String SET_TRACE =
String.format("UPDATE %s.%s SET permits = ? WHERE txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACE);
+ private static final String SET_TRACE_REMOTE =
+ String.format("UPDATE %s.%s SET permits = ? WHERE node_id = ? AND
txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+
private static final String QUERY_TRACE =
String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type =
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
+ private static final String QUERY_TRACE_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ?
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+
private static final String UNSET_TRACE1 =
String.format("DELETE FROM %s.%s WHERE txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
+ private static final String UNSET_TRACE1_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE);
+
private static final String UNSET_TRACE2 =
String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
+ private static final String UNSET_TRACE2_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACE);
+
private static final String QUERY_TRACES =
String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type =
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
+ private static final String QUERY_TRACES_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ?
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
+
private static final String ERASE_TRACES1 =
String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?
AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG,
AccordDebugKeyspace.TXN_TRACES);
+ private static final String ERASE_TRACES1_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
+
private static final String ERASE_TRACES2 =
String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
+ private static final String ERASE_TRACES2_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.TXN_TRACES);
+
private static final String ERASE_TRACES3 =
String.format("DELETE FROM %s.%s WHERE txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
+ private static final String ERASE_TRACES3_REMOTE =
+ String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES);
+
private static final String QUERY_REDUNDANT_BEFORE =
String.format("SELECT * FROM %s.%s",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
+ private static final String QUERY_REDUNDANT_BEFORE_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
private static final String
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ =
String.format("SELECT * FROM %s.%s WHERE quorum_applied >= ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
+ private static final String
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND
quorum_applied >= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
private static final String
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ =
String.format("SELECT * FROM %s.%s WHERE shard_applied >= ?",
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
+ private static final String
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE =
+ String.format("SELECT * FROM %s.%s WHERE node_id = ? AND shard_applied
>= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE,
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
@BeforeClass
public static void setUpClass()
{
@@ -170,43 +226,55 @@ public class AccordDebugKeyspaceTest extends CQLTester
public void tracing()
{
// simple test to confirm basic tracing functionality works, doesn't
validate specific behaviours only requesting/querying/erasing
+ String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ AccordService accord = accord();
+ DatabaseDescriptor.getAccord().fetch_txn = "1s";
+ int nodeId = accord.nodeId().id;
+
AccordMsgFilter filter = new AccordMsgFilter();
MessagingService.instance().outboundSink.add(filter);
try
{
- String tableName = createTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
- AccordService accord = accord();
- DatabaseDescriptor.getAccord().fetch_txn = "1s";
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+ filter.appliesTo(id);
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
execute(SET_TRACE, 0, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
execute(UNSET_TRACE1, id.toString());
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
execute(UNSET_TRACE2, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"));
execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"),
row(id.toString(), "WAIT_PROGRESS", 1));
- accord.node().coordinate(id, txn).beginAsResult();
+ assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
filter.appliesTo(id);
+ accord.node().coordinate(id, txn).beginAsResult();
filter.preAccept.awaitThrowUncheckedOnInterrupt();
-
filter.apply.awaitThrowUncheckedOnInterrupt();
spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES,
id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0));
+ spinUntilSuccess(() ->
Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(),
"WAIT_PROGRESS").size()).isGreaterThan(0));
execute(ERASE_TRACES1, id.toString(), "FETCH", Long.MAX_VALUE);
execute(ERASE_TRACES2, id.toString(), "FETCH");
execute(ERASE_TRACES1, id.toString(), "WAIT_PROGRESS",
Long.MAX_VALUE);
Assertions.assertThat(execute(QUERY_TRACES, id.toString(),
"WAIT_PROGRESS").size()).isEqualTo(0);
+ Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId,
id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0);
// just check other variants don't fail
execute(ERASE_TRACES2, id.toString(), "WAIT_PROGRESS");
execute(ERASE_TRACES3, id.toString());
+
}
finally
{
@@ -219,6 +287,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
{
String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
var accord = accord();
+ int nodeId = accord.nodeId().id;
TableId tableId = Schema.instance.getTableMetadata(KEYSPACE,
tableName).id;
TxnId syncId1 = new TxnId(100, 200, Txn.Kind.ExclusiveSyncPoint,
Routable.Domain.Range, accord.nodeId());
TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint,
Routable.Domain.Range, accord.nodeId());
@@ -234,6 +303,27 @@ public class AccordDebugKeyspaceTest extends CQLTester
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ,
syncId2.toString()).size()).isEqualTo(1);
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ,
syncId1.toString()).size()).isEqualTo(1);
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ,
syncId2.toString()).size()).isEqualTo(0);
+ Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_REMOTE,
nodeId).size()).isGreaterThan(0);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE,
nodeId, syncId1.toString()).size()).isEqualTo(2);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE,
nodeId, syncId2.toString()).size()).isEqualTo(1);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE,
nodeId, syncId1.toString()).size()).isEqualTo(1);
+
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE,
nodeId, syncId2.toString()).size()).isEqualTo(0);
+ }
+
+ @Test
+ public void reportInvalidRequestForUnsupportedRemoteToLocal()
+ {
+ AccordService accord = accord();
+ int nodeId = accord.nodeId().id;
+ TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
+ try
+ {
+ execute(ERASE_JOURNAL_REMOTE, nodeId, 1, id.toString());
+ }
+ catch (ExceptionSerializer.RemoteException t)
+ {
+
Assertions.assertThat(t.originalClass).isEqualTo(InvalidRequestException.class.getName());
+ }
}
@Test
@@ -241,6 +331,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
{
String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
AccordService accord = accord();
+ int nodeId = accord.nodeId().id;
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c,
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
String keyStr = txn.keys().get(0).toUnseekable().toString();
@@ -249,15 +340,19 @@ public class AccordDebugKeyspaceTest extends CQLTester
spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY,
id.toString()),
row(id.toString(), anyInt(), 0, "",
"", any(), anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(),
SaveStatus.Applied.name()))));
assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(),
"Applied"));
+ assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()),
row(id.toString(), "Applied"));
assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(),
"PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"),
row(id.toString(), null));
+ assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()),
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"),
row(id.toString(), "Applied"), row(id.toString(), null));
assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(),
"APPLIED_DURABLE"));
+ assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr),
row(id.toString(), "APPLIED_DURABLE"));
}
@Test
- public void manyTxns() throws ExecutionException, InterruptedException
+ public void manyTxns()
{
String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
AccordService accord = accord();
+ int nodeId = accord.nodeId().id;
List<IAccordService.IAccordResult> await = new ArrayList<>();
Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c,
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
for (int i = 0 ; i < 100; ++i)
@@ -281,6 +376,22 @@ public class AccordDebugKeyspaceTest extends CQLTester
row("Applied"),
row("Applied")
);
+
+ assertRows(execute(QUERY_TXNS_REMOTE, nodeId, commandStore.id()),
+ row("Applied"),
+ row("Applied"),
+ row("Applied"),
+ row("Applied"),
+ row("Applied")
+ );
+
+ assertRows(execute(QUERY_TXNS_SEARCH_REMOTE, nodeId,
commandStore.id(), TxnId.NONE.toString()),
+ row("Applied"),
+ row("Applied"),
+ row("Applied"),
+ row("Applied"),
+ row("Applied")
+ );
}
@Test
@@ -293,6 +404,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
{
String tableName = createTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
AccordService accord = accord();
+ int nodeId = accord.nodeId().id;
TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write,
Routable.Domain.Key);
String insertTxn = String.format("BEGIN TRANSACTION\n" +
" LET r = (SELECT * FROM %s.%s
WHERE k = ? AND c = ?);\n" +
@@ -307,9 +419,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
row(id.toString(), anyInt(), 0, "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
id.toString()),
+ row(nodeId, id.toString(), anyInt(), 0, "", "", any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
row(id.toString(), anyInt(), 0, "", "", any(),
SaveStatus.ReadyToExecute.name()));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
id.toString()),
+ row(nodeId, id.toString(), anyInt(), 0, "", "", any(),
SaveStatus.ReadyToExecute.name()));
}
finally
{
@@ -329,6 +445,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
{
String tableName = createTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
AccordService accord = accord();
+ int nodeId = accord.nodeId().id;
TxnId first =
accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
String insertTxn = String.format("BEGIN TRANSACTION\n" +
" LET r = (SELECT * FROM %s.%s
WHERE k = ? AND c = ?);\n" +
@@ -343,9 +460,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
row(first.toString(), anyInt(), 0, "", any(), any(),
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
first.toString()),
+ row(nodeId, first.toString(), anyInt(), 0, "", any(),
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
row(first.toString(), anyInt(), 0, "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
first.toString()),
+ row(nodeId, first.toString(), anyInt(), 0, "", any(),
anyNonNull(), SaveStatus.ReadyToExecute.name()));
filter.reset();
@@ -366,6 +487,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
row(second.toString(), anyInt(), 1, any(),
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1",
second.toString()),
row(second.toString(), anyInt(), 0, any(), "",
anyNonNull(), SaveStatus.Stable.name()));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId,
second.toString()),
+ row(nodeId, second.toString(), anyInt(), 0, "", "",
anyNonNull(), SaveStatus.Stable.name()),
+ row(nodeId, second.toString(), anyInt(), 1, any(),
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE + " AND depth < 1",
nodeId, second.toString()),
+ row(nodeId, second.toString(), anyInt(), 0, any(), "",
anyNonNull(), SaveStatus.Stable.name()));
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]