This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b712cfb930 RemoteToLocalVirtualKeyspace: supporting access to all 
nodes' local virtual tables from any node in the cluster
b712cfb930 is described below

commit b712cfb930106972904e6fdb381c030bc649b554
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Sep 13 10:50:32 2025 +0100

    RemoteToLocalVirtualKeyspace: supporting access to all nodes' local virtual 
tables from any node in the cluster
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20900
---
 .../apache/cassandra/db/filter/ColumnFilter.java   |  59 +-
 .../cassandra/db/filter/ColumnSubselection.java    |  14 +
 .../org/apache/cassandra/db/filter/RowFilter.java  |  50 +-
 .../db/virtual/AccordDebugRemoteKeyspace.java      |  31 +
 .../db/virtual/RemoteToLocalVirtualKeyspace.java   |  35 ++
 .../db/virtual/RemoteToLocalVirtualTable.java      | 625 +++++++++++++++++++++
 .../cassandra/db/virtual/VirtualMutation.java      |  38 ++
 .../apache/cassandra/db/virtual/VirtualTable.java  |   2 +
 .../cassandra/exceptions/ExceptionSerializer.java  |   2 +-
 .../cassandra/exceptions/RequestFailure.java       |   3 +-
 .../apache/cassandra/locator/RemoteStrategy.java   |  29 +
 src/java/org/apache/cassandra/net/Verb.java        |   3 +
 .../apache/cassandra/schema/SchemaConstants.java   |   3 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   4 +
 .../db/virtual/AccordDebugKeyspaceTest.java        | 138 ++++-
 15 files changed, 1025 insertions(+), 11 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index bc17dd0158..86d66c59ff 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.btree.BTree;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a 
column for complex columns) are selected
@@ -64,7 +65,6 @@ import org.apache.cassandra.schema.TableMetadata;
  */
 public abstract class ColumnFilter
 {
-
     public static final ColumnFilter NONE = 
selection(RegularAndStaticColumns.NONE);
 
     public static final Serializer serializer = new Serializer();
@@ -305,6 +305,20 @@ public abstract class ColumnFilter
         return false;
     }
 
+    /**
+     * Rebinds matching columns into a new filter; ignores any missing but 
fails if any are a different type
+     */
+    abstract ColumnFilter rebind(TableMetadata newTable);
+
+    public static ColumnFilter rebindVirtual(ColumnFilter filter, 
TableMetadata newTable)
+    {
+        // review feedback; nothing actually preventing its use with other 
tables,
+        // but unclear utility/rationale so just some protection against 
incorrect usage
+        if (!newTable.isVirtual())
+            throw new UnsupportedOperationException("This feature is intended 
only to be used with virtual keyspaces");
+        return filter.rebind(newTable);
+    }
+
     /**
      * Returns the CQL string corresponding to this {@code ColumnFilter}.
      *
@@ -630,6 +644,12 @@ public abstract class ColumnFilter
             return true;
         }
 
+        @Override
+        ColumnFilter rebind(TableMetadata newTable)
+        {
+            return new WildCardColumnFilter(ColumnFilter.rebind(newTable, 
fetchedAndQueried));
+        }
+
         @Override
         protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections()
         {
@@ -779,6 +799,21 @@ public abstract class ColumnFilter
             return new 
Tester(fetchingStrategy.fetchesAllColumns(column.isStatic()), s.iterator());
         }
 
+        @Override
+        ColumnFilter rebind(TableMetadata newTable)
+        {
+            RegularAndStaticColumns queried = ColumnFilter.rebind(newTable, 
this.queried);
+            RegularAndStaticColumns fetched = this.queried == this.fetched ? 
queried : ColumnFilter.rebind(newTable, this.fetched);
+            SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections = null;
+            if (this.subSelections != null)
+            {
+                subSelections = TreeMultimap.create();
+                for (Map.Entry<ColumnIdentifier, ColumnSubselection> e : 
this.subSelections.entries())
+                    subSelections.put(e.getKey(), 
e.getValue().rebind(newTable));
+            }
+            return new SelectionColumnFilter(fetchingStrategy, queried, 
fetched, subSelections);
+        }
+
         @Override
         protected SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections()
         {
@@ -1003,4 +1038,26 @@ public abstract class ColumnFilter
             return size;
         }
     }
+
+    private static RegularAndStaticColumns rebind(TableMetadata newTable, 
RegularAndStaticColumns columns)
+    {
+        return new RegularAndStaticColumns(rebind(newTable, columns.statics), 
rebind(newTable, columns.regulars));
+    }
+
+    private static Columns rebind(TableMetadata newTable, Columns columns)
+    {
+        if (columns.isEmpty())
+            return columns;
+
+        try (BTree.FastBuilder<ColumnMetadata> builder = BTree.fastBuilder())
+        {
+            for (ColumnMetadata in : columns)
+            {
+                ColumnMetadata out = newTable.getColumn(in.name);
+                if (out != null)
+                    builder.add(out);
+            }
+            return Columns.from(builder);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java 
b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
index 459b636a5a..b4f0346c76 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -76,6 +76,8 @@ public abstract class ColumnSubselection implements 
Comparable<ColumnSubselectio
 
     protected abstract CellPath comparisonPath();
 
+    protected abstract ColumnSubselection rebind(TableMetadata newTable);
+
     public int compareTo(ColumnSubselection other)
     {
         assert other.column().name.equals(column().name);
@@ -118,6 +120,12 @@ public abstract class ColumnSubselection implements 
Comparable<ColumnSubselectio
             return from;
         }
 
+        @Override
+        protected ColumnSubselection rebind(TableMetadata newTable)
+        {
+            return new Slice(newTable.getColumn(column.name), from, to);
+        }
+
         public int compareInclusionOf(CellPath path)
         {
             Comparator<CellPath> cmp = column.cellPathComparator();
@@ -160,6 +168,12 @@ public abstract class ColumnSubselection implements 
Comparable<ColumnSubselectio
             return element;
         }
 
+        @Override
+        protected ColumnSubselection rebind(TableMetadata newTable)
+        {
+            return new Element(newTable.getColumn(column.name), element);
+        }
+
         public int compareInclusionOf(CellPath path)
         {
             return column.cellPathComparator().compare(path, element);
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 12c98c7278..16ba694e14 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -141,7 +141,7 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
         add(new CustomExpression(metadata, targetIndex, value));
     }
 
-    private void add(Expression expression)
+    public void add(Expression expression)
     {
         expression.validate();
         expressions.add(expression);
@@ -549,6 +549,28 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
                        "Index expression values may not be larger than 64K");
         }
 
+        /**
+         * Rebind this expression to a table metadata that is expected to have 
equivalent columns.
+         * If any referenced column is missing, returns null;
+         * if any referenced column has a different type throws an exception
+         */
+        public Expression rebind(TableMetadata newTable)
+        {
+            throw new UnsupportedOperationException("Expression " + 
toString(true) + " does not support rebinding to another table definition");
+        }
+
+        protected static ColumnMetadata rebind(ColumnMetadata in, 
TableMetadata newTable)
+        {
+            ColumnMetadata out = newTable.getColumn(in.name);
+            if (out == null)
+                return null;
+
+            if (!out.type.equals(in.type) && 
!out.type.isCompatibleWith(in.type) || !in.type.isCompatibleWith(out.type))
+                throw new IllegalArgumentException("The provided TableMetadata 
is not compatible with the expression");
+
+            return out;
+        }
+
         /**
          * Returns whether the provided row satisfied this expression or not.
          *
@@ -734,6 +756,16 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
             super(column, operator, value);
         }
 
+        @Override
+        public Expression rebind(TableMetadata newTable)
+        {
+            ColumnMetadata out = rebind(column, newTable);
+            if (out == null)
+                return null;
+
+            return new SimpleExpression(out, operator, value);
+        }
+
         @Override
         public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row, long nowInSec)
         {
@@ -853,6 +885,16 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
             checkBindValueSet(value, "Unsupported unset map value for column 
%s", column.name);
         }
 
+        @Override
+        public Expression rebind(TableMetadata newTable)
+        {
+            ColumnMetadata out = rebind(column, newTable);
+            if (out == null)
+                return null;
+
+            return new MapElementExpression(out, key, operator, value);
+        }
+
         @Override
         public ByteBuffer getIndexValue()
         {
@@ -978,6 +1020,12 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
             return Kind.CUSTOM;
         }
 
+        @Override
+        public Expression rebind(TableMetadata newTable)
+        {
+            return new CustomExpression(table, targetIndex, value);
+        }
+
         // Filtering by custom expressions isn't supported yet, so just accept 
any row
         @Override
         public boolean isSatisfiedBy(TableMetadata metadata, DecoratedKey 
partitionKey, Row row, long nowInSec)
diff --git 
a/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java
new file mode 100644
index 0000000000..8698b3ccd8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugRemoteKeyspace.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.schema.SchemaConstants;
+
+public class AccordDebugRemoteKeyspace extends RemoteToLocalVirtualKeyspace
+{
+    public static final AccordDebugRemoteKeyspace instance = new 
AccordDebugRemoteKeyspace(SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.instance);
+
+    public AccordDebugRemoteKeyspace(String name, VirtualKeyspace wrap)
+    {
+        super(name, wrap);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java
new file mode 100644
index 0000000000..171f494036
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualKeyspace.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public class RemoteToLocalVirtualKeyspace extends VirtualKeyspace
+{
+    public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap)
+    {
+        this(name, wrap, ignore -> true);
+    }
+
+    public RemoteToLocalVirtualKeyspace(String name, VirtualKeyspace wrap, 
Predicate<VirtualTable> include)
+    {
+        super(name, wrap.tables().stream().filter(include).map(vt -> new 
RemoteToLocalVirtualTable(name, vt)).collect(Collectors.toList()));
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java
new file mode 100644
index 0000000000..837c1feeb1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/RemoteToLocalVirtualTable.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.function.Function;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.BufferClusteringBound;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Promise;
+import org.apache.cassandra.utils.concurrent.SyncPromise;
+
+import static org.apache.cassandra.db.ClusteringBound.BOTTOM;
+import static org.apache.cassandra.db.ClusteringBound.TOP;
+import static org.apache.cassandra.db.ClusteringBound.boundKind;
+import static org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts.ALLOW;
+import static org.apache.cassandra.db.virtual.VirtualTable.Sorted.SORTED;
+
+public class RemoteToLocalVirtualTable extends AbstractLazyVirtualTable
+{
+    private static final int MAX_CONCURRENCY = 8;
+    final TableMetadata local;
+    final boolean allowFilteringImplicitly;
+    final boolean allowFilteringLocalPartitionKeysImplicitly;
+
+    public RemoteToLocalVirtualTable(String keyspace, VirtualTable 
virtualTable)
+    {
+        super(wrap(keyspace, virtualTable.name(), virtualTable.metadata()), 
virtualTable instanceof AbstractLazyVirtualTable ? ((AbstractLazyVirtualTable) 
virtualTable).onTimeout() : OnTimeout.FAIL, virtualTable.sorted(), SORTED);
+        this.local = virtualTable.metadata();
+        this.allowFilteringImplicitly = 
virtualTable.allowFilteringImplicitly();
+        this.allowFilteringLocalPartitionKeysImplicitly = 
virtualTable.allowFilteringPrimaryKeysImplicitly();
+    }
+
+    @Override
+    public boolean allowFilteringImplicitly()
+    {
+        return allowFilteringImplicitly;
+    }
+
+    @Override
+    public boolean allowFilteringPrimaryKeysImplicitly()
+    {
+        return true;
+    }
+
+    private static TableMetadata wrap(String keyspace, String name, 
TableMetadata local)
+    {
+        if (local.partitionKeyColumns().size() != 1 && 
!(local.partitionKeyType instanceof CompositeType))
+            throw new IllegalArgumentException("Underlying table must have a 
single partition key, else use CompositeType for its partitioner");
+        TableMetadata.Builder builder = TableMetadata.builder(keyspace, name);
+        builder.partitioner(new LocalPartitioner(Int32Type.instance));
+        builder.addPartitionKeyColumn("node_id", Int32Type.instance);
+        for (ColumnMetadata cm : local.partitionKeyColumns())
+            builder.addClusteringColumn(cm.name, cm.type, cm.getMask(), 
cm.getColumnConstraints());
+        for (ColumnMetadata cm : local.clusteringColumns())
+            builder.addClusteringColumn(cm.name, cm.type, cm.getMask(), 
cm.getColumnConstraints());
+        // we don't add static columns as they can't be modelled correctly 
with the insertion of a prefix partition column
+        for (ColumnMetadata cm : local.regularColumns())
+        {
+            if (cm.isComplex())
+                throw new IllegalArgumentException("Complex columns are not 
currently supported by " + RemoteToLocalVirtualTable.class.getSimpleName());
+            builder.addRegularColumn(cm.name, cm.type, cm.getMask(), 
cm.getColumnConstraints());
+        }
+        builder.kind(TableMetadata.Kind.VIRTUAL);
+        return builder.build();
+    }
+
+    @Override
+    protected void collect(PartitionsCollector collector)
+    {
+        ClusterMetadata cm = ClusterMetadata.current();
+        NavigableSet<NodeId> matchingIds = cm.directory.peerIds();
+        DataRange dataRange = collector.dataRange();
+        AbstractBounds<PartitionPosition> bounds = dataRange.keyRange();
+        {
+            NodeId start = null;
+            if (!bounds.left.isMinimum())
+            {
+                if (!(bounds.left instanceof DecoratedKey))
+                    throw new InvalidRequestException(metadata + " does not 
support filtering by token or incomplete partition keys");
+                start = new NodeId(Int32Type.instance.compose(((DecoratedKey) 
bounds.left).getKey()));
+            }
+            NodeId end = null;
+            if (!bounds.right.isMinimum())
+            {
+                if (!(bounds.right instanceof DecoratedKey))
+                    throw new InvalidRequestException(metadata + " does not 
support filtering by token or incomplete partition keys");
+                end = new NodeId(Int32Type.instance.compose(((DecoratedKey) 
bounds.right).getKey()));
+            }
+            if (start != null && end != null) matchingIds = 
matchingIds.subSet(start, bounds.isStartInclusive(), end, 
bounds.isEndInclusive());
+            else if (start != null) matchingIds = matchingIds.tailSet(start, 
bounds.isStartInclusive());
+            else if (end != null) matchingIds = matchingIds.headSet(end, 
bounds.isEndInclusive());
+        }
+        if (dataRange.isReversed())
+            matchingIds = matchingIds.descendingSet();
+
+        RowFilter rowFilter = rebind(local, collector.rowFilter());
+        ColumnFilter columnFilter = 
ColumnFilter.rebindVirtual(collector.columnFilter(), local);
+        // TODO (expected): count this down as we progress where possible (or 
have AbstractLazyVirtualTable do it for us)
+        DataLimits limits = collector.limits();
+
+        Function<DecoratedKey, ByteBuffer[]> pksToCks = 
partitionKeyToClusterings(metadata, local);
+        ArrayDeque<RequestAndResponse> pending = new ArrayDeque<>();
+        matchingIds.forEach(id -> {
+            InetAddressAndPort endpoint = cm.directory.endpoint(id);
+            DecoratedKey remoteKey = 
metadata.partitioner.decorateKey(Int32Type.instance.decompose(id.id()));
+            ClusteringIndexFilter filter = 
dataRange.clusteringIndexFilter(remoteKey);
+            Slices slices = filter.getSlices(metadata);
+
+            int i = 0, advance = 1, end = slices.size();
+            if (dataRange.isReversed())
+            {
+                i = slices.size() - 1;
+                end = -1;
+                advance = -1;
+            }
+
+            PartitionCollector partition = collector.partition(id.id());
+            while (i != end)
+            {
+                List<Request> request = rebind(local, slices.get(i), 
dataRange.isReversed(), rowFilter, columnFilter);
+                for (Request send : request)
+                {
+                    ReadCommand readCommand;
+                    if 
(send.dataRange.startKey().equals(send.dataRange.stopKey()) && 
!send.dataRange.startKey().isMinimum())
+                        readCommand = SinglePartitionReadCommand.create(local, 
collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits, 
(DecoratedKey) send.dataRange.startKey(), 
send.dataRange.clusteringIndexFilter(remoteKey), ALLOW);
+                    else
+                        readCommand = PartitionRangeReadCommand.create(local, 
collector.nowInSeconds(), send.columnFilter, send.rowFilter, limits, 
send.dataRange);
+
+                    RequestAndResponse rr = new RequestAndResponse(id, 
partition, readCommand);
+                    send(rr, endpoint);
+                    pending.addLast(rr);
+
+                    boolean selectsOneRow = selectsOneRow(local, 
send.dataRange, remoteKey);
+                    while (pending.size() >= (selectsOneRow ? 1 : 
MAX_CONCURRENCY))
+                        collect(collector, pending.pollFirst(), pksToCks);
+                }
+                i += advance;
+            }
+        });
+        while (!pending.isEmpty())
+            collect(collector, pending.pollFirst(), pksToCks);
+    }
+
+    private static class RequestAndResponse extends SyncPromise<ReadResponse>
+    {
+        final NodeId nodeId;
+        final PartitionCollector partition;
+        final ReadCommand readCommand;
+        private RequestAndResponse(NodeId nodeId, PartitionCollector 
partition, ReadCommand readCommand)
+        {
+            this.nodeId = nodeId;
+            this.partition = partition;
+            this.readCommand = readCommand;
+        }
+    }
+
+    private static class Request
+    {
+        final DataRange dataRange;
+        final RowFilter rowFilter;
+        final ColumnFilter columnFilter;
+
+        private Request(DataRange dataRange, RowFilter rowFilter, ColumnFilter 
columnFilter)
+        {
+            this.dataRange = dataRange;
+            this.rowFilter = rowFilter;
+            this.columnFilter = columnFilter;
+        }
+    }
+
+    private void send(RequestAndResponse rr, InetAddressAndPort endpoint)
+    {
+        send(Verb.READ_REQ, rr.readCommand, rr, endpoint);
+    }
+
+    private <Reply> Promise<Reply> send(Verb verb, Object payload, 
InetAddressAndPort endpoint)
+    {
+        Promise<Reply> promise = new AsyncPromise<>();
+        send(verb, payload, promise, endpoint);
+        return promise;
+    }
+
+    private <Reply> void send(Verb verb, Object payload, Promise<Reply> 
promise, InetAddressAndPort endpoint)
+    {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            promise.trySuccess(null);
+            return;
+        }
+
+        // we have to send inline some of the MessagingService logic to 
circumvent the requirement to use AbstractWriteResponseHandler
+        Message<?> message = Message.out(verb, payload);
+        RequestCallback<?> callback = new RequestCallback<Reply>()
+        {
+            @Override public void onResponse(Message<Reply> msg) { 
promise.trySuccess(msg.payload); }
+            @Override public boolean invokeOnFailure() { return true; }
+            @Override public void onFailure(InetAddressAndPort from, 
RequestFailure failure)
+            {
+                if (failure.failure == null) promise.tryFailure(new 
RuntimeException(failure.reason.toString()));
+                else promise.tryFailure(failure.failure);
+            }
+        };
+
+        MessagingService.instance().sendWithCallback(message, endpoint, 
callback);
+    }
+
+    private void collect(PartitionsCollector collector, RequestAndResponse rr, 
Function<DecoratedKey, ByteBuffer[]> pksToCks)
+    {
+        if (!rr.awaitUntilThrowUncheckedOnInterrupt(collector.deadlineNanos()))
+            throw new InternalTimeoutException();
+
+        rr.rethrowIfFailed();
+        ReadResponse response = rr.getNow();
+        if (response == null)
+        {
+            ClientWarn.instance.warn("No response from " + rr.nodeId);
+            return;
+        }
+
+        int pkCount = local.partitionKeyColumns().size();
+        try (UnfilteredPartitionIterator partitions = 
response.makeIterator(rr.readCommand))
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    ByteBuffer[] clusterings = 
pksToCks.apply(iter.partitionKey());
+                    while (iter.hasNext())
+                    {
+                        Unfiltered next = iter.next();
+                        if (!next.isRow())
+                            throw new UnsupportedOperationException("Range 
tombstones not supported");
+
+                        Row row = (Row)next;
+                        {
+                            Clustering<?> clustering = row.clustering();
+                            for (int j = 0 ; j < clustering.size(); ++j)
+                                clusterings[pkCount + j] = 
clustering.bufferAt(j);
+                        }
+                        rr.partition.collect(rows -> {
+                            rows.add((Object[])clusterings)
+                                .lazyCollect(columns -> {
+                                    row.forEach(cd -> {
+                                        Invariants.require(cd instanceof Cell);
+                                        
columns.add(cd.column().name.toString(), ((Cell<?>) cd).buffer());
+                                    });
+                                });
+                        });
+                    }
+                }
+            }
+        }
+    }
+
+    private static boolean selectsOneRow(TableMetadata metadata, DataRange 
dataRange, DecoratedKey key)
+    {
+        if (dataRange.startKey().isMinimum() || 
!dataRange.startKey().equals(dataRange.stopKey()))
+            return false;
+
+        if (metadata.clusteringColumns().isEmpty())
+            return true;
+
+        Slices slices = 
dataRange.clusteringIndexFilter(key).getSlices(metadata);
+        if (slices.size() != 1)
+            return false;
+
+        Slice slice = slices.get(0);
+        return slice.start().equals(slice.end());
+    }
+
+    private static Function<DecoratedKey, ByteBuffer[]> 
partitionKeyToClusterings(TableMetadata distributed, TableMetadata local)
+    {
+        ByteBuffer[] cks = new 
ByteBuffer[distributed.clusteringColumns().size()];
+        if (local.partitionKeyColumns().size() == 1)
+        {
+            return pk -> {
+                cks[0] = pk.getKey();
+                return cks.clone();
+            };
+        }
+
+        CompositeType type = (CompositeType) local.partitionKeyType;
+        int pkCount = type.types.size();
+        return (pk) -> {
+            System.arraycopy(type.split(pk.getKey()), 0, cks, 0, pkCount);
+            return cks.clone();
+        };
+    }
+
+    private static RowFilter rebind(TableMetadata local, RowFilter rowFilter)
+    {
+        if (rowFilter.isEmpty())
+            return rowFilter;
+
+        RowFilter result = RowFilter.create(false);
+        for (RowFilter.Expression in : rowFilter.getExpressions())
+        {
+            RowFilter.Expression out = in.rebind(local);
+            if (out != null)
+                result.add(out);
+        }
+        return result;
+    }
+
+    private List<Request> rebind(TableMetadata local, Slice slice, boolean 
reversed, RowFilter rowFilter, ColumnFilter columnFilter)
+    {
+        ClusteringBound<?> start = slice.start();
+        ClusteringBound<?> end = slice.end();
+        int pkCount = local.partitionKeyColumns().size();
+        // TODO (expected): we can filter by partition key by inserting a new 
row filter, but need to impose ALLOW FILTERING restrictions
+        if (((start.size() > 0 && start.size() < pkCount) || (end.size() > 0 
&& end.size() < pkCount)))
+        {
+            if (!allowFilteringLocalPartitionKeysImplicitly)
+                throw new InvalidRequestException("Must specify full partition 
key bounds for the underlying table");
+
+            List<ColumnMetadata> pks = local.partitionKeyColumns();
+            ByteBuffer[] starts = start.getBufferArray();
+            ByteBuffer[] ends = end.getBufferArray();
+
+            int minCount = Math.min(start.size(), end.size());
+            int maxCount = Math.max(start.size(), end.size());
+            int commonPrefixLength = 0;
+            while (commonPrefixLength < minCount && equalPart(start, end, 
commonPrefixLength))
+                ++commonPrefixLength;
+
+            RowFilter commonRowFilter = rowFilter;
+            if (commonPrefixLength > 0)
+            {
+                commonRowFilter = copy(commonRowFilter);
+                for (int i = 0 ; i < commonPrefixLength ; ++i)
+                    commonRowFilter.add(pks.get(i), Operator.EQ, starts[i]);
+            }
+
+            Operator lastStartOp = start.isInclusive() ? Operator.GTE : 
Operator.GT;
+            Operator lastEndOp = end.isInclusive() ? Operator.LTE : 
Operator.LT;
+            if (commonPrefixLength == Math.max(minCount, maxCount - 1))
+            {
+                // can simply add our remaining filters and continue on our way
+                addExpressions(commonRowFilter, pks, commonPrefixLength, 
starts, Operator.GTE, lastStartOp);
+                addExpressions(commonRowFilter, pks, commonPrefixLength, ends, 
Operator.LTE, lastEndOp);
+                return List.of(new 
Request(DataRange.allData(local.partitioner), commonRowFilter, columnFilter));
+            }
+
+            throw new InvalidRequestException("This table currently does not 
support the complex partial partition key filters implied for the underlying 
table");
+        }
+
+        ByteBuffer[] startBuffers = start.getBufferArray();
+        PartitionPosition startBound;
+        if (start.size() == 0) startBound = 
local.partitioner.getMinimumToken().minKeyBound();
+        else if (pkCount == 1) startBound = 
local.partitioner.decorateKey(startBuffers[0]);
+        else startBound = 
local.partitioner.decorateKey(CompositeType.build(ByteBufferAccessor.instance, 
Arrays.copyOf(startBuffers, pkCount)));
+
+        ByteBuffer[] endBuffers = end.getBufferArray();
+        PartitionPosition endBound;
+        if (end.size() == 0) endBound = 
local.partitioner.getMinimumToken().maxKeyBound();
+        else if (pkCount == 1) endBound = 
local.partitioner.decorateKey(endBuffers[0]);
+        else endBound = 
local.partitioner.decorateKey(CompositeType.build(ByteBufferAccessor.instance, 
Arrays.copyOf(endBuffers, pkCount)));
+
+        AbstractBounds<PartitionPosition> bounds = 
AbstractBounds.bounds(startBound, start.isEmpty() || start.size() > pkCount || 
start.isInclusive(),
+                                                                         
endBound, end.isEmpty() || end.size() > pkCount || end.isInclusive());
+        boolean hasSlices = start.size() > pkCount || end.size() > pkCount;
+        if (!hasSlices)
+            return List.of(new Request(new DataRange(bounds, new 
ClusteringIndexSliceFilter(Slices.ALL, reversed)), rowFilter, columnFilter));
+
+        ClusteringBound<?> startSlice = ClusteringBound.BOTTOM;
+        if (start.size() > pkCount)
+            startSlice = BufferClusteringBound.create(boundKind(true, 
start.isInclusive()), Arrays.copyOfRange(startBuffers, pkCount, 
startBuffers.length));
+
+        ClusteringBound<?> endSlice = ClusteringBound.TOP;
+        if (end.size() > pkCount)
+            endSlice = BufferClusteringBound.create(boundKind(false, 
end.isInclusive()), Arrays.copyOfRange(endBuffers, pkCount, endBuffers.length));
+
+        if (startBound.equals(endBound))
+            return List.of(new Request(new DataRange(bounds, filter(local, 
startSlice, endSlice, reversed)), rowFilter, columnFilter));
+
+        List<Request> result = new ArrayList<>(3);
+        if (startSlice != BOTTOM)
+        {
+            AbstractBounds<PartitionPosition> startBoundOnly = 
AbstractBounds.bounds(startBound, true, startBound, true);
+            result.add(new Request(new DataRange(startBoundOnly, filter(local, 
startSlice, TOP, reversed)), rowFilter, columnFilter));
+        }
+        result.add(new Request(new 
DataRange(AbstractBounds.bounds(bounds.left, bounds.inclusiveLeft() && 
startSlice == BOTTOM,
+                                                       bounds.right, 
bounds.inclusiveRight() && endSlice == TOP),
+                                 new ClusteringIndexSliceFilter(Slices.ALL, 
reversed)), rowFilter, columnFilter)
+        );
+        if (endSlice != TOP)
+        {
+            AbstractBounds<PartitionPosition> endBoundOnly = 
AbstractBounds.bounds(endBound, true, endBound, true);
+            result.add(new Request(new DataRange(endBoundOnly, filter(local, 
BOTTOM, endSlice, reversed)), rowFilter, columnFilter));
+        }
+        if (reversed)
+            Collections.reverse(result);
+        return result;
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private static boolean equalPart(ClusteringBound start, ClusteringBound 
end, int i)
+    {
+        return 0 == start.accessor().compare(start.get(i), end.get(i), 
end.accessor());
+    }
+
+    private static RowFilter copy(RowFilter copy)
+    {
+        RowFilter newRowFilter = RowFilter.create(false);
+        for (RowFilter.Expression expression : copy)
+            newRowFilter.add(expression);
+        return newRowFilter;
+    }
+
+    private static void addExpressions(RowFilter rowFilter, 
List<ColumnMetadata> cms, int start, ByteBuffer[] values, Operator op, Operator 
lastOp)
+    {
+        for (int i = start ; i < values.length ; ++i)
+            rowFilter.add(cms.get(i), i + 1 == values.length ? lastOp : op, 
values[i]);
+    }
+
+    private static ClusteringIndexSliceFilter filter(TableMetadata metadata, 
ClusteringBound<?> start, ClusteringBound<?> end, boolean reversed)
+    {
+        return new ClusteringIndexSliceFilter(Slices.with(metadata.comparator, 
Slice.make(start, end)), reversed);
+    }
+
+    @Override
+    public void apply(PartitionUpdate update)
+    {
+        int nodeId = 
Int32Type.instance.compose(update.partitionKey().getKey());
+        InetAddressAndPort endpoint = 
ClusterMetadata.current().directory.endpoint(new NodeId(nodeId));
+        if (endpoint == null)
+            throw new InvalidRequestException("Unknown node " + nodeId);
+
+        DeletionInfo deletionInfo = update.deletionInfo();
+        if (!deletionInfo.getPartitionDeletion().isLive())
+        {
+            truncate(endpoint).syncThrowUncheckedOnInterrupt();
+            return;
+        }
+
+        int pkCount = local.partitionKeyColumns().size();
+        ByteBuffer[] pkBuffer, ckBuffer;
+        {
+            int ckCount = local.clusteringColumns().size();
+            pkBuffer = pkCount == 1 ? null : new ByteBuffer[pkCount];
+            ckBuffer = new ByteBuffer[ckCount];
+        }
+
+        PartitionUpdate.Builder builder = null;
+        ArrayDeque<Promise<Void>> results = new ArrayDeque<>();
+
+        if (deletionInfo.hasRanges())
+        {
+            Iterator<RangeTombstone> iterator = 
deletionInfo.rangeIterator(false);
+            while (iterator.hasNext())
+            {
+                RangeTombstone rt = iterator.next();
+                ClusteringBound start = rt.deletedSlice().start();
+                ClusteringBound end = rt.deletedSlice().end();
+                if (start.size() < pkCount || end.size() < pkCount)
+                    throw new InvalidRequestException("Range deletions must 
specify a complete partition key in the underlying table " + metadata);
+
+                for (int i = 0 ; i < pkCount ; ++i)
+                {
+                    if (0 != start.accessor().compare(start.get(i), 
end.get(i), end.accessor()))
+                        throw new InvalidRequestException("Range deletions 
must specify a single partition key in the underlying table " + metadata);
+                }
+
+                DecoratedKey key = remoteClusteringToLocalPartitionKey(local, 
start, pkCount, pkBuffer);
+                builder = maybeRolloverAndWait(key, builder, results, 
endpoint);
+                if (start.size() == pkCount && end.size() == pkCount)
+                {
+                    builder.addPartitionDeletion(rt.deletionTime());
+                }
+                else
+                {
+                    start = ClusteringBound.create(start.kind(), 
Clustering.make(remoteClusteringToLocalClustering(start.clustering(), pkCount, 
ckBuffer)));
+                    end = ClusteringBound.create(end.kind(), 
Clustering.make(remoteClusteringToLocalClustering(end.clustering(), pkCount, 
ckBuffer)));
+                    builder.add(new RangeTombstone(Slice.make(start, end), 
rt.deletionTime()));
+                }
+            }
+        }
+
+        if (!update.staticRow().isEmpty())
+            throw new InvalidRequestException("Static rows are not supported 
for remote table " + metadata);
+
+        try (BTree.FastBuilder<ColumnData> columns = BTree.fastBuilder())
+        {
+            for (Row row : update)
+            {
+                Clustering<?> clustering = row.clustering();
+                DecoratedKey key = remoteClusteringToLocalPartitionKey(local, 
clustering, pkCount, pkBuffer);
+                builder = maybeRolloverAndWait(key, builder, results, 
endpoint);
+                Clustering<?> newClustering = 
Clustering.make(remoteClusteringToLocalClustering(clustering, pkCount, 
ckBuffer));
+                columns.reset();
+                for (ColumnData cd : row)
+                    columns.add(rebind(local, cd));
+                builder.add(BTreeRow.create(newClustering, 
row.primaryKeyLivenessInfo(), row.deletion(), columns.build()));
+            }
+        }
+
+        if (builder != null)
+            results.add(send(Verb.VIRTUAL_MUTATION_REQ, new 
VirtualMutation(builder.build()), endpoint));
+
+        while (!results.isEmpty())
+            results.pollFirst().syncThrowUncheckedOnInterrupt();
+    }
+
+    private PartitionUpdate.Builder maybeRolloverAndWait(DecoratedKey key, 
PartitionUpdate.Builder builder, ArrayDeque<Promise<Void>> waiting, 
InetAddressAndPort endpoint)
+    {
+        if (builder == null || !builder.partitionKey().equals(key))
+        {
+            if (builder != null)
+                waiting.add(send(Verb.VIRTUAL_MUTATION_REQ, new 
VirtualMutation(builder.build()), endpoint));
+            builder = new PartitionUpdate.Builder(local, key, 
local.regularAndStaticColumns(), 8);
+            while (waiting.size() >= MAX_CONCURRENCY)
+                waiting.pollFirst().syncThrowUncheckedOnInterrupt();
+        }
+        return builder;
+    }
+
+    private Promise<Void> truncate(InetAddressAndPort endpoint)
+    {
+        return send(Verb.TRUNCATE_REQ, new TruncateRequest(local.keyspace, 
local.name), endpoint);
+    }
+
+    private static ColumnData rebind(TableMetadata local, ColumnData cd)
+    {
+        ColumnMetadata column = local.getColumn(cd.column().name);
+
+        Invariants.require(column != null, cd.column() + " not found in " + 
local);
+        Invariants.require(!column.isComplex(), "Complex column " + column + " 
not supported; should have been removed from metadata");
+
+        return ((Cell<?>) cd).withUpdatedColumn(column);
+    }
+
+    private static DecoratedKey 
remoteClusteringToLocalPartitionKey(TableMetadata local, ClusteringPrefix<?> 
clustering, int pkCount, ByteBuffer[] pkBuffer)
+    {
+        ByteBuffer bytes;
+        if (pkCount == 1) bytes = clustering.bufferAt(0);
+        else
+        {
+            for (int i = 0 ; i < pkBuffer.length ; ++i)
+                pkBuffer[i] = clustering.bufferAt(i);
+            bytes = CompositeType.build(ByteBufferAccessor.instance, pkBuffer);
+        }
+        return local.partitioner.decorateKey(bytes);
+    }
+
+    private static ByteBuffer[] 
remoteClusteringToLocalClustering(ClusteringPrefix<?> clustering, int pkCount, 
ByteBuffer[] ckBuffer)
+    {
+        for (int i = pkCount ; i < clustering.size(); ++i)
+            ckBuffer[i - pkCount] = clustering.bufferAt(i);
+
+        return Arrays.copyOf(ckBuffer, clustering.size() - pkCount);
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
index ee98da29c1..41e9e96266 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db.virtual;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
@@ -26,12 +27,20 @@ import javax.annotation.Nullable;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand.PotentialTxnConflicts;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.DeserializationHelper;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ClientState;
 
@@ -42,6 +51,35 @@ import org.apache.cassandra.service.ClientState;
  */
 public final class VirtualMutation implements IMutation
 {
+    public static final IVersionedSerializer<VirtualMutation> serializer = new 
IVersionedSerializer<VirtualMutation>()
+    {
+        @Override
+        public void serialize(VirtualMutation t, DataOutputPlus out, int 
version) throws IOException
+        {
+            Invariants.require(t.modifications.size() == 1);
+            
PartitionUpdate.serializer.serialize(t.modifications.values().iterator().next(),
 out, version);
+        }
+
+        @Override
+        public VirtualMutation deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            PartitionUpdate update = 
PartitionUpdate.serializer.deserialize(in, version, 
DeserializationHelper.Flag.FROM_REMOTE);
+            return new VirtualMutation(update);
+        }
+
+        @Override
+        public long serializedSize(VirtualMutation t, int version)
+        {
+            Invariants.require(t.modifications.size() == 1);
+            return 
PartitionUpdate.serializer.serializedSize(t.modifications.values().iterator().next(),
 version);
+        }
+    };
+
+    public static final IVerbHandler<VirtualMutation> handler = message -> {
+        message.payload.apply();
+        MessagingService.instance().respond(NoPayload.noPayload, message);
+    };
+
     private final String keyspaceName;
     private final DecoratedKey partitionKey;
     private final ImmutableMap<TableId, PartitionUpdate> modifications;
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index 3f05ed3a69..0423b12a4d 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -100,4 +100,6 @@ public interface VirtualTable
     {
         return allowFilteringImplicitly();
     }
+
+    default Sorted sorted() { return Sorted.UNSORTED; }
 }
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java 
b/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
index de379739a3..834abca49d 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionSerializer.java
@@ -46,7 +46,7 @@ public class ExceptionSerializer
 {
     public static class RemoteException extends RuntimeException
     {
-        private final String originalClass;
+        public final String originalClass;
 
         public RemoteException(String originalClass, String originalMessage, 
StackTraceElement[] stackTrace)
         {
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
index b9bba7fc70..03d088a239 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.tcm.NotCMSException;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.cassandra.exceptions.ExceptionSerializer.nullableRemoteExceptionSerializer;
+import static org.apache.cassandra.exceptions.RequestFailureReason.UNKNOWN;
 
 /**
  * Allow inclusion of a serialized exception in failure response messages
@@ -125,7 +126,7 @@ public class RequestFailure
         if (t instanceof CoordinatorBehindException)
             return COORDINATOR_BEHIND;
 
-        return UNKNOWN;
+        return new RequestFailure(t);
     }
 
     public static RequestFailure forReason(RequestFailureReason reason)
diff --git a/src/java/org/apache/cassandra/locator/RemoteStrategy.java 
b/src/java/org/apache/cassandra/locator/RemoteStrategy.java
new file mode 100644
index 0000000000..515462ee98
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RemoteStrategy.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Map;
+
+public class RemoteStrategy extends LocalStrategy
+{
+    public RemoteStrategy(String keyspaceName, Map<String, String> 
configOptions)
+    {
+        super(keyspaceName, configOptions);
+    }
+}
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index d24c9e64ad..4a0ebb2c22 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.db.TruncateRequest;
 import org.apache.cassandra.db.TruncateResponse;
 import org.apache.cassandra.db.TruncateVerbHandler;
+import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
@@ -195,6 +196,8 @@ public enum Verb
 {
     MUTATION_RSP           (60,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 RESPONSE_HANDLER                          
   ),
     MUTATION_REQ           (0,   P3, writeTimeout,    MUTATION,          () -> 
Mutation.serializer,                  () -> MutationVerbHandler.instance,       
 MUTATION_RSP        ),
+    VIRTUAL_MUTATION_RSP   (200, P1, writeTimeout,    REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 RESPONSE_HANDLER                          
   ),
+    VIRTUAL_MUTATION_REQ   (201, P3, writeTimeout,    MUTATION,          () -> 
VirtualMutation.serializer,           () -> VirtualMutation.handler,            
 VIRTUAL_MUTATION_RSP),
     HINT_RSP               (61,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 RESPONSE_HANDLER                          
   ),
     HINT_REQ               (1,   P4, writeTimeout,    MUTATION,          () -> 
HintMessage.serializer,               () -> HintVerbHandler.instance,           
 HINT_RSP            ),
     READ_REPAIR_RSP        (62,  P1, writeTimeout,    REQUEST_RESPONSE,  () -> 
NoPayload.serializer,                 RESPONSE_HANDLER                          
   ),
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java 
b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index 6e3cb7a72e..ada413537d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -57,6 +57,7 @@ public final class SchemaConstants
     public static final String VIRTUAL_VIEWS = "system_views";
     public static final String VIRTUAL_METRICS = "system_metrics";
     public static final String VIRTUAL_ACCORD_DEBUG = "system_accord_debug";
+    public static final String VIRTUAL_ACCORD_DEBUG_REMOTE = 
"system_accord_debug_remote";
 
     public static final String DUMMY_KEYSPACE_OR_TABLE_NAME = "--dummy--";
 
@@ -66,7 +67,7 @@ public final class SchemaConstants
 
     /* virtual table system keyspace names */
     public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES =
-        ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS);
+        ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS, 
VIRTUAL_ACCORD_DEBUG, VIRTUAL_ACCORD_DEBUG_REMOTE);
 
     /* replicate system keyspace names (the ones with a "true" replication 
strategy) */
     public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f4ed67a440..2f85e6e58e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.db.SystemKeyspaceMigrator41;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
 import org.apache.cassandra.db.virtual.ExceptionsTable;
+import org.apache.cassandra.db.virtual.AccordDebugRemoteKeyspace;
 import org.apache.cassandra.db.virtual.LogMessagesTable;
 import org.apache.cassandra.db.virtual.SlowQueriesTable;
 import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
@@ -559,7 +560,10 @@ public class CassandraDaemon
         VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables()));
 
         if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace)
+        {
             
VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
+            
VirtualKeyspaceRegistry.instance.register(AccordDebugRemoteKeyspace.instance);
+        }
 
         // Flush log messages to system_views.system_logs virtual table as 
there were messages already logged
         // before that virtual table was instantiated.
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 6cac2f25fc..0889ebffe5 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -52,6 +52,8 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.exceptions.ExceptionSerializer;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -88,54 +90,108 @@ public class AccordDebugKeyspaceTest extends CQLTester
     private static final String QUERY_TXN_BLOCKED_BY =
         String.format("SELECT * FROM %s.%s WHERE txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_BLOCKED_BY);
 
+    private static final String QUERY_TXN_BLOCKED_BY_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_BLOCKED_BY);
+
     private static final String QUERY_COMMANDS_FOR_KEY =
         String.format("SELECT txn_id, status FROM %s.%s WHERE key=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.COMMANDS_FOR_KEY);
 
+    private static final String QUERY_COMMANDS_FOR_KEY_REMOTE =
+        String.format("SELECT txn_id, status FROM %s.%s WHERE node_id = ? AND 
key=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.COMMANDS_FOR_KEY);
+
     private static final String QUERY_TXN =
         String.format("SELECT txn_id, save_status FROM %s.%s WHERE txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN);
 
+    private static final String QUERY_TXN_REMOTE =
+        String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ? 
AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN);
+
     private static final String QUERY_TXNS =
         String.format("SELECT save_status FROM %s.%s WHERE command_store_id = 
? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN);
 
+    private static final String QUERY_TXNS_REMOTE =
+        String.format("SELECT save_status FROM %s.%s WHERE node_id = ? AND 
command_store_id = ? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN);
+
     private static final String QUERY_TXNS_SEARCH =
         String.format("SELECT save_status FROM %s.%s WHERE command_store_id = 
? AND txn_id > ? LIMIT 5", SchemaConstants.VIRTUAL_ACCORD_DEBUG, 
AccordDebugKeyspace.TXN);
 
+    private static final String QUERY_TXNS_SEARCH_REMOTE =
+        String.format("SELECT save_status FROM %s.%s WHERE node_id = ? AND 
command_store_id = ? AND txn_id > ? LIMIT 5", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN);
+
     private static final String QUERY_JOURNAL =
         String.format("SELECT txn_id, save_status FROM %s.%s WHERE txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.JOURNAL);
 
+    private static final String ERASE_JOURNAL_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND 
command_store_id = ? AND txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.JOURNAL);
+
+    private static final String QUERY_JOURNAL_REMOTE =
+        String.format("SELECT txn_id, save_status FROM %s.%s WHERE node_id = ? 
AND txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.JOURNAL);
+
     private static final String SET_TRACE =
         String.format("UPDATE %s.%s SET permits = ? WHERE txn_id = ? AND 
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, 
AccordDebugKeyspace.TXN_TRACE);
 
+    private static final String SET_TRACE_REMOTE =
+        String.format("UPDATE %s.%s SET permits = ? WHERE node_id = ? AND 
txn_id = ? AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACE);
+
     private static final String QUERY_TRACE =
         String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type = 
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
 
+    private static final String QUERY_TRACE_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ? 
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACE);
+
     private static final String UNSET_TRACE1 =
         String.format("DELETE FROM %s.%s WHERE txn_id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
 
+    private static final String UNSET_TRACE1_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACE);
+
     private static final String UNSET_TRACE2 =
         String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACE);
 
+    private static final String UNSET_TRACE2_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND 
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACE);
+
     private static final String QUERY_TRACES =
         String.format("SELECT * FROM %s.%s WHERE txn_id = ? AND event_type = 
?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
 
+    private static final String QUERY_TRACES_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ? AND txn_id = ? 
AND event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACES);
+
     private static final String ERASE_TRACES1 =
         String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ? 
AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, 
AccordDebugKeyspace.TXN_TRACES);
 
+    private static final String ERASE_TRACES1_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND 
event_type = ? AND id_micros < ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACES);
+
     private static final String ERASE_TRACES2 =
         String.format("DELETE FROM %s.%s WHERE txn_id = ? AND event_type = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
 
+    private static final String ERASE_TRACES2_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ? AND 
event_type = ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.TXN_TRACES);
+
     private static final String ERASE_TRACES3 =
         String.format("DELETE FROM %s.%s WHERE txn_id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_TRACES);
 
+    private static final String ERASE_TRACES3_REMOTE =
+        String.format("DELETE FROM %s.%s WHERE node_id = ? AND txn_id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, AccordDebugKeyspace.TXN_TRACES);
+
     private static final String QUERY_REDUNDANT_BEFORE =
         String.format("SELECT * FROM %s.%s", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
 
+    private static final String QUERY_REDUNDANT_BEFORE_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
     private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ =
         String.format("SELECT * FROM %s.%s WHERE quorum_applied >= ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
 
+    private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ? AND 
quorum_applied >= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
     private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ =
         String.format("SELECT * FROM %s.%s WHERE shard_applied >= ?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.REDUNDANT_BEFORE);
 
+    private static final String 
QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE =
+        String.format("SELECT * FROM %s.%s WHERE node_id = ? AND shard_applied 
>= ?", SchemaConstants.VIRTUAL_ACCORD_DEBUG_REMOTE, 
AccordDebugKeyspace.REDUNDANT_BEFORE);
+
     @BeforeClass
     public static void setUpClass()
     {
@@ -170,43 +226,55 @@ public class AccordDebugKeyspaceTest extends CQLTester
     public void tracing()
     {
         // simple test to confirm basic tracing functionality works, doesn't 
validate specific behaviours only requesting/querying/erasing
+        String tableName = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+        AccordService accord = accord();
+        DatabaseDescriptor.getAccord().fetch_txn = "1s";
+        int nodeId = accord.nodeId().id;
+
         AccordMsgFilter filter = new AccordMsgFilter();
         MessagingService.instance().outboundSink.add(filter);
         try
         {
-            String tableName = createTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
-            AccordService accord = accord();
-            DatabaseDescriptor.getAccord().fetch_txn = "1s";
             TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, 
Routable.Domain.Key);
             Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, 
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+            filter.appliesTo(id);
 
             execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), 
row(id.toString(), "WAIT_PROGRESS", 1));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
             execute(SET_TRACE, 0, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"));
             execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), 
row(id.toString(), "WAIT_PROGRESS", 1));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
             execute(UNSET_TRACE1, id.toString());
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"));
             execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), 
row(id.toString(), "WAIT_PROGRESS", 1));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
             execute(UNSET_TRACE2, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"));
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"));
             execute(SET_TRACE, 1, id.toString(), "WAIT_PROGRESS");
             assertRows(execute(QUERY_TRACE, id.toString(), "WAIT_PROGRESS"), 
row(id.toString(), "WAIT_PROGRESS", 1));
-            accord.node().coordinate(id, txn).beginAsResult();
+            assertRows(execute(QUERY_TRACE_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS"), row(nodeId, id.toString(), "WAIT_PROGRESS", 1));
             filter.appliesTo(id);
+            accord.node().coordinate(id, txn).beginAsResult();
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
-
             filter.apply.awaitThrowUncheckedOnInterrupt();
             spinUntilSuccess(() -> Assertions.assertThat(execute(QUERY_TRACES, 
id.toString(), "WAIT_PROGRESS").size()).isGreaterThan(0));
+            spinUntilSuccess(() -> 
Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, id.toString(), 
"WAIT_PROGRESS").size()).isGreaterThan(0));
             execute(ERASE_TRACES1, id.toString(), "FETCH", Long.MAX_VALUE);
             execute(ERASE_TRACES2, id.toString(), "FETCH");
             execute(ERASE_TRACES1, id.toString(), "WAIT_PROGRESS", 
Long.MAX_VALUE);
             Assertions.assertThat(execute(QUERY_TRACES, id.toString(), 
"WAIT_PROGRESS").size()).isEqualTo(0);
+            Assertions.assertThat(execute(QUERY_TRACES_REMOTE, nodeId, 
id.toString(), "WAIT_PROGRESS").size()).isEqualTo(0);
             // just check other variants don't fail
             execute(ERASE_TRACES2, id.toString(), "WAIT_PROGRESS");
             execute(ERASE_TRACES3, id.toString());
+
         }
         finally
         {
@@ -219,6 +287,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
     {
         String tableName = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
         var accord = accord();
+        int nodeId = accord.nodeId().id;
         TableId tableId = Schema.instance.getTableMetadata(KEYSPACE, 
tableName).id;
         TxnId syncId1 = new TxnId(100, 200, Txn.Kind.ExclusiveSyncPoint, 
Routable.Domain.Range, accord.nodeId());
         TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, 
Routable.Domain.Range, accord.nodeId());
@@ -234,6 +303,27 @@ public class AccordDebugKeyspaceTest extends CQLTester
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ, 
syncId2.toString()).size()).isEqualTo(1);
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ, 
syncId1.toString()).size()).isEqualTo(1);
         
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ, 
syncId2.toString()).size()).isEqualTo(0);
+        Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_REMOTE, 
nodeId).size()).isGreaterThan(0);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE,
 nodeId, syncId1.toString()).size()).isEqualTo(2);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_QUORUM_APPLIED_GEQ_REMOTE,
 nodeId, syncId2.toString()).size()).isEqualTo(1);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE,
 nodeId, syncId1.toString()).size()).isEqualTo(1);
+        
Assertions.assertThat(execute(QUERY_REDUNDANT_BEFORE_FILTER_SHARD_APPLIED_GEQ_REMOTE,
 nodeId, syncId2.toString()).size()).isEqualTo(0);
+    }
+
+    @Test
+    public void reportInvalidRequestForUnsupportedRemoteToLocal()
+    {
+        AccordService accord = accord();
+        int nodeId = accord.nodeId().id;
+        TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, 
Routable.Domain.Key);
+        try
+        {
+            execute(ERASE_JOURNAL_REMOTE, nodeId, 1, id.toString());
+        }
+        catch (ExceptionSerializer.RemoteException t)
+        {
+            
Assertions.assertThat(t.originalClass).isEqualTo(InvalidRequestException.class.getName());
+        }
     }
 
     @Test
@@ -241,6 +331,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
     {
         String tableName = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
         AccordService accord = accord();
+        int nodeId = accord.nodeId().id;
         TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, 
Routable.Domain.Key);
         Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, 
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
         String keyStr = txn.keys().get(0).toUnseekable().toString();
@@ -249,15 +340,19 @@ public class AccordDebugKeyspaceTest extends CQLTester
         spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, 
id.toString()),
                                           row(id.toString(), anyInt(), 0, "", 
"", any(), anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), 
SaveStatus.Applied.name()))));
         assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), 
"Applied"));
+        assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()), 
row(id.toString(), "Applied"));
         assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), 
"PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), 
row(id.toString(), null));
+        assertRows(execute(QUERY_JOURNAL_REMOTE, nodeId, id.toString()), 
row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), 
row(id.toString(), "Applied"), row(id.toString(), null));
         assertRows(execute(QUERY_COMMANDS_FOR_KEY, keyStr), row(id.toString(), 
"APPLIED_DURABLE"));
+        assertRows(execute(QUERY_COMMANDS_FOR_KEY_REMOTE, nodeId, keyStr), 
row(id.toString(), "APPLIED_DURABLE"));
     }
 
     @Test
-    public void manyTxns() throws ExecutionException, InterruptedException
+    public void manyTxns()
     {
         String tableName = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
         AccordService accord = accord();
+        int nodeId = accord.nodeId().id;
         List<IAccordService.IAccordResult> await = new ArrayList<>();
         Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, 
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
         for (int i = 0 ; i < 100; ++i)
@@ -281,6 +376,22 @@ public class AccordDebugKeyspaceTest extends CQLTester
                    row("Applied"),
                    row("Applied")
         );
+
+        assertRows(execute(QUERY_TXNS_REMOTE, nodeId, commandStore.id()),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied")
+        );
+
+        assertRows(execute(QUERY_TXNS_SEARCH_REMOTE, nodeId, 
commandStore.id(), TxnId.NONE.toString()),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied"),
+                   row("Applied")
+        );
     }
 
     @Test
@@ -293,6 +404,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
         {
             String tableName = createTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
             AccordService accord = accord();
+            int nodeId = accord.nodeId().id;
             TxnId id = accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, 
Routable.Domain.Key);
             String insertTxn = String.format("BEGIN TRANSACTION\n" +
                                              "    LET r = (SELECT * FROM %s.%s 
WHERE k = ? AND c = ?);\n" +
@@ -307,9 +419,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
                        row(id.toString(), anyInt(), 0, "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
id.toString()),
+                       row(nodeId, id.toString(), anyInt(), 0, "", "", any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
                        row(id.toString(), anyInt(), 0, "", "", any(), 
SaveStatus.ReadyToExecute.name()));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
id.toString()),
+                       row(nodeId, id.toString(), anyInt(), 0, "", "", any(), 
SaveStatus.ReadyToExecute.name()));
         }
         finally
         {
@@ -329,6 +445,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
         {
             String tableName = createTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
             AccordService accord = accord();
+            int nodeId = accord.nodeId().id;
             TxnId first = 
accord.node().nextTxnIdWithDefaultFlags(Txn.Kind.Write, Routable.Domain.Key);
             String insertTxn = String.format("BEGIN TRANSACTION\n" +
                                              "    LET r = (SELECT * FROM %s.%s 
WHERE k = ? AND c = ?);\n" +
@@ -343,9 +460,13 @@ public class AccordDebugKeyspaceTest extends CQLTester
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
                        row(first.toString(), anyInt(), 0, "", any(), any(), 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
first.toString()),
+                       row(nodeId, first.toString(), anyInt(), 0, "", any(), 
any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
                        row(first.toString(), anyInt(), 0, "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
first.toString()),
+                       row(nodeId, first.toString(), anyInt(), 0, "", any(), 
anyNonNull(), SaveStatus.ReadyToExecute.name()));
 
             filter.reset();
 
@@ -366,6 +487,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
                        row(second.toString(), anyInt(), 1, any(), 
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
             assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1", 
second.toString()),
                        row(second.toString(), anyInt(), 0, any(), "", 
anyNonNull(), SaveStatus.Stable.name()));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, 
second.toString()),
+                       row(nodeId, second.toString(), anyInt(), 0, "", "", 
anyNonNull(), SaveStatus.Stable.name()),
+                       row(nodeId, second.toString(), anyInt(), 1, any(), 
first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+            assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE + " AND depth < 1", 
nodeId, second.toString()),
+                       row(nodeId, second.toString(), anyInt(), 0, any(), "", 
anyNonNull(), SaveStatus.Stable.name()));
         }
         finally
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to