This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b31d15b9b5 Avoid failing queries when epoch changes and replica goes 
up/down
b31d15b9b5 is described below

commit b31d15b9b58926676436807ddc1efdd5616e13b3
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Wed Mar 26 15:48:13 2025 +0100

    Avoid failing queries when epoch changes and replica goes up/down
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20489
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/locator/ReplicaLayout.java    |  16 ++-
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  38 ++++---
 .../org/apache/cassandra/locator/ReplicaPlans.java |  33 +++---
 .../org/apache/cassandra/service/paxos/Paxos.java  |   6 ++
 .../distributed/test/RepairDigestTrackingTest.java |   6 +-
 .../test/tcm/FailureDetectorRecomputeTest.java     | 113 +++++++++++++++++++++
 .../cassandra/service/reads/DataResolverTest.java  |   2 +-
 .../service/reads/DigestResolverTest.java          |   2 +-
 .../cassandra/service/reads/ReadExecutorTest.java  |   2 +-
 .../reads/repair/AbstractReadRepairTest.java       |   1 +
 11 files changed, 172 insertions(+), 48 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 45a4f16b56..eee55c65e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Avoid failing queries when epoch changes and replica goes up/down 
(CASSANDRA-20489)
  * Split out truncation record lock (CASSANDRA-20480)
  * Throw new IndexBuildInProgressException when queries fail during index 
build, instead of IndexNotAvailableException (CASSANDRA-20402)
  * Fix Paxos repair interrupts running transactions (CASSANDRA-20469)
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java 
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index f0069f2555..30a52be73a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.FBUtilities;
@@ -354,32 +353,31 @@ public abstract class ReplicaLayout<E extends 
Endpoints<E>>
     }
 
     /**
-     * @return the read layout for a token - this includes only live natural 
replicas, i.e. those that are not pending
-     * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
+     * @return the read layout for a token - this includes natural replicas, 
i.e. those that are not pending.
+     * They are reverse sorted by the badness score of the configured snitch
      */
-    static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(ClusterMetadata 
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, 
Token token)
+    static ReplicaLayout.ForTokenRead forTokenReadSorted(ClusterMetadata 
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, 
Token token)
     {
         EndpointsForToken replicas = 
keyspace.getMetadata().params.replication.isLocal()
                                      ? forLocalStrategyToken(metadata, 
replicationStrategy, token)
                                      : forNonLocalStrategyTokenRead(metadata, 
keyspace.getMetadata(), token);
+
         replicas = 
DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
-        replicas = replicas.filter(FailureDetector.isReplicaAlive);
+
         return new ReplicaLayout.ForTokenRead(replicationStrategy, replicas);
     }
 
     /**
      * TODO: we should really double check that the provided range does not 
overlap multiple token ring regions
-     * @return the read layout for a range - this includes only live natural 
replicas, i.e. those that are not pending
-     * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
+     * @return the read layout for a range - these are reverse sorted by the 
badness score of the configured snitch
      */
-    static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(ClusterMetadata 
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, 
AbstractBounds<PartitionPosition> range)
+    static ReplicaLayout.ForRangeRead forRangeReadSorted(ClusterMetadata 
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, 
AbstractBounds<PartitionPosition> range)
     {
         EndpointsForRange replicas = 
keyspace.getMetadata().params.replication.isLocal()
                                      ? forLocalStrategyRange(metadata, 
replicationStrategy, range)
                                      : forNonLocalStategyRangeRead(metadata, 
keyspace.getMetadata(), range);
 
         replicas = 
DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
-        replicas = replicas.filter(FailureDetector.isReplicaAlive);
         return new ReplicaLayout.ForRangeRead(replicationStrategy, range, 
replicas);
     }
 
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 62db6f85f3..7d08b341b8 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.FBUtilities;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -44,6 +43,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
     ConsistencyLevel consistencyLevel();
 
     E contacts();
+    E liveAndDown();
 
     Replica lookup(InetAddressAndPort endpoint);
     P withContacts(E contacts);
@@ -82,29 +82,28 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
         //  - paxos, includes all live replicas (natural+pending), for this DC 
if SERIAL_LOCAL
         //      ==> live.all()  (if consistencyLevel.isDCLocal(), then 
.filter(consistencyLevel.isLocal))
         protected final E contacts;
+        protected final E liveAndDown;
 
         protected final Function<ClusterMetadata, P> recompute;
         protected List<InetAddressAndPort> contacted = new 
CopyOnWriteArrayList<>();
 
-        AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, 
Function<ClusterMetadata, P> recompute, Epoch epoch)
+        AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, E 
liveAndDown, Function<ClusterMetadata, P> recompute, Epoch epoch)
         {
             assert contacts != null;
             this.keyspace = keyspace;
             this.replicationStrategy = replicationStrategy;
             this.consistencyLevel = consistencyLevel;
             this.contacts = contacts;
+            this.liveAndDown = liveAndDown;
             this.recompute = recompute;
             this.epoch = epoch;
         }
 
         public E contacts() { return contacts; }
+        public E liveAndDown() { return liveAndDown; }
         public Keyspace keyspace() { return keyspace; }
         public AbstractReplicationStrategy replicationStrategy() { return 
replicationStrategy; }
         public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
-        public boolean canDoLocalRequest()
-        {
-            return contacts.contains(FBUtilities.getBroadcastAddressAndPort());
-        }
 
         public Epoch epoch()
         {
@@ -132,10 +131,11 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
                         ConsistencyLevel consistencyLevel,
                         E candidates,
                         E contacts,
+                        E liveAndDown,
                         Function<ClusterMetadata, P> recompute,
                         Epoch epoch)
         {
-            super(keyspace, replicationStrategy, consistencyLevel, contacts, 
recompute, epoch);
+            super(keyspace, replicationStrategy, consistencyLevel, contacts, 
liveAndDown, recompute, epoch);
             this.candidates = candidates;
             this.readQuorum = consistencyLevel.blockFor(replicationStrategy);
         }
@@ -171,13 +171,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
 
             ForRead<?, ?> newPlan = recompute.apply(newMetadata);
 
-            if (readCandidates().equals(newPlan.readCandidates()))
+            if (liveAndDown().equals(newPlan.liveAndDown()))
                 return true;
 
             int readQuorum = newPlan.readQuorum();
             for (InetAddressAndPort addr : contacted)
             {
-                if (newPlan.readCandidates().contains(addr))
+                if (newPlan.liveAndDown().contains(addr))
                     readQuorum--;
             }
 
@@ -204,17 +204,18 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
                             ConsistencyLevel consistencyLevel,
                             EndpointsForToken candidates,
                             EndpointsForToken contacts,
+                            EndpointsForToken liveAndDown,
                             Function<ClusterMetadata, 
ReplicaPlan.ForTokenRead> recompute,
                             Function<ReplicaPlan<?, ?>, ReplicaPlan.ForWrite> 
repairPlan,
                             Epoch epoch)
         {
-            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contacts, recompute, epoch);
+            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contacts, liveAndDown, recompute, epoch);
             this.repairPlan = repairPlan;
         }
 
         public ForTokenRead withContacts(EndpointsForToken newContacts)
         {
-            ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates, newContacts, recompute, repairPlan, epoch);
+            ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates, newContacts, liveAndDown, recompute, repairPlan, 
epoch);
             res.contacted.addAll(contacted);
             return res;
         }
@@ -240,12 +241,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
                             AbstractBounds<PartitionPosition> range,
                             EndpointsForRange candidates,
                             EndpointsForRange contact,
+                            EndpointsForRange liveAndDown,
                             int vnodeCount,
                             Function<ClusterMetadata, 
ReplicaPlan.ForRangeRead> recompute,
                             BiFunction<ReplicaPlan<?, ?>, Token, 
ReplicaPlan.ForWrite> repairPlan,
                             Epoch epoch)
         {
-            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contact, recompute, epoch);
+            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contact, liveAndDown, recompute, epoch);
             this.range = range;
             this.vnodeCount = vnodeCount;
             this.repairPlan = repairPlan;
@@ -260,7 +262,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
 
         public ForRangeRead withContacts(EndpointsForRange newContact)
         {
-            ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, readCandidates(), newContact, vnodeCount, recompute, 
repairPlan, epoch);
+            ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, readCandidates(), newContact, liveAndDown, vnodeCount, 
recompute, repairPlan, epoch);
             res.contacted.addAll(contacted);
             return res;
         }
@@ -284,6 +286,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
                                 AbstractBounds<PartitionPosition> range,
                                 EndpointsForRange candidates,
                                 EndpointsForRange contact,
+                                EndpointsForRange liveAndDown,
                                 int vnodeCount,
                                 Epoch epoch)
         {
@@ -291,7 +294,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
             // the epoch change during the course of query execution so no 
recomputation function is supplied. Likewise,
             // no read repair is expected to be performed during this type of 
query so a null is also used in place of a
             // function for calculating the repair plan.
-            super(keyspace, replicationStrategy, consistencyLevel, range, 
candidates, contact, vnodeCount, null, null, epoch);
+            super(keyspace, replicationStrategy, consistencyLevel, range, 
candidates, contact, liveAndDown, vnodeCount, null, null, epoch);
         }
 
         @Override
@@ -305,7 +308,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
     {
         // TODO: this is only needed because of poor isolation of concerns 
elsewhere - we can remove it soon, and will do so in a follow-up patch
         final EndpointsForToken pending;
-        final EndpointsForToken liveAndDown;
         final EndpointsForToken live;
         final int writeQuorum;
 
@@ -319,9 +321,8 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
                         Function<ClusterMetadata, ForWrite> recompute,
                         Epoch epoch)
         {
-            super(keyspace, replicationStrategy, consistencyLevel, contact, 
recompute, epoch);
+            super(keyspace, replicationStrategy, consistencyLevel, contact, 
liveAndDown, recompute, epoch);
             this.pending = pending;
-            this.liveAndDown = liveAndDown;
             this.live = live;
             this.writeQuorum = 
consistencyLevel.blockForWrite(replicationStrategy, pending);
         }
@@ -331,9 +332,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P 
extends ReplicaPlan<E, P>
         /** Replicas that a region of the ring is moving to; not yet ready to 
serve reads, but should receive writes */
         public EndpointsForToken pending() { return pending; }
 
-        /** Replicas that can participate in the write - this always includes 
all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM 
(which is local DC only) */
-        public EndpointsForToken liveAndDown() { return liveAndDown; }
-
         /** The live replicas present in liveAndDown, usually derived from 
FailureDetector.isReplicaAlive */
         public EndpointsForToken live() { return live; }
 
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index b6a03b683b..53b32797c6 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -839,13 +839,9 @@ public class ReplicaPlans
 
     private static ReplicaPlan.ForTokenRead 
forSingleReplicaRead(ClusterMetadata metadata, Keyspace keyspace, Token token, 
Replica replica)
     {
-        // todo; replica does not always contain token, figure out why
-//        if 
(!metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(token).contains(replica))
-//            throw UnavailableException.create(ConsistencyLevel.ONE, 1, 1, 0, 
0);
-
         EndpointsForToken one = EndpointsForToken.of(token, replica);
 
-        return new ReplicaPlan.ForTokenRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one,
+        return new ReplicaPlan.ForTokenRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one, one,
                                             (newClusterMetadata) -> 
forSingleReplicaRead(newClusterMetadata, keyspace, token, replica),
                                             (self) -> {
                                                 throw new 
IllegalStateException("Read repair is not supported for short read/replica 
filtering protection.");
@@ -866,7 +862,7 @@ public class ReplicaPlans
         // TODO: this is unsafe, as one.range() may be inconsistent with our 
supplied range; should refactor Range/AbstractBounds to single class
         EndpointsForRange one = EndpointsForRange.of(replica);
 
-        return new ReplicaPlan.ForRangeRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, 
vnodeCount,
+        return new ReplicaPlan.ForRangeRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, one, 
vnodeCount,
                                             (newClusterMetadata) -> 
forSingleReplicaRead(metadata, keyspace, range, replica, vnodeCount),
                                             (self, token) -> {
                                                 throw new 
IllegalStateException("Read repair is not supported for short read/replica 
filtering protection.");
@@ -901,17 +897,24 @@ public class ReplicaPlans
         return forRead(metadata, keyspace, token, indexQueryPlan, 
consistencyLevel, retry, true);
     }
 
-    private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata, 
Keyspace keyspace, Token token, @Nullable Index.QueryPlan indexQueryPlan, 
ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry,  boolean 
throwOnInsufficientLiveReplicas)
+    private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata,
+                                                    Keyspace keyspace,
+                                                    Token token,
+                                                    @Nullable Index.QueryPlan 
indexQueryPlan,
+                                                    ConsistencyLevel 
consistencyLevel,
+                                                    SpeculativeRetryPolicy 
retry,
+                                                    boolean 
throwOnInsufficientLiveReplicas)
     {
         AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
-        ReplicaLayout.ForTokenRead forTokenRead = 
ReplicaLayout.forTokenReadLiveSorted(metadata, keyspace, replicationStrategy, 
token);
-        EndpointsForToken candidates = candidatesForRead(keyspace, 
indexQueryPlan, consistencyLevel, forTokenRead.natural());
+        ReplicaLayout.ForTokenRead forTokenReadLiveAndDown = 
ReplicaLayout.forTokenReadSorted(metadata, keyspace, replicationStrategy, 
token);
+        ReplicaLayout.ForTokenRead forTokenReadLive = 
forTokenReadLiveAndDown.filter(FailureDetector.isReplicaAlive);
+        EndpointsForToken candidates = candidatesForRead(keyspace, 
indexQueryPlan, consistencyLevel, forTokenReadLive.all());
         EndpointsForToken contacts = contactForRead(metadata.locator, 
replicationStrategy, consistencyLevel, 
retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates);
 
         if (throwOnInsufficientLiveReplicas)
             assureSufficientLiveReplicasForRead(metadata.locator, 
replicationStrategy, consistencyLevel, contacts);
 
-        return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates, contacts,
+        return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates, contacts, forTokenReadLiveAndDown.all(),
                                             (newClusterMetadata) -> 
forRead(newClusterMetadata, keyspace, token, indexQueryPlan, consistencyLevel, 
retry, false),
                                             (self) -> forReadRepair(self, 
metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive),
                                             metadata.epoch);
@@ -942,8 +945,9 @@ public class ReplicaPlans
                                                         boolean 
throwOnInsufficientLiveReplicas)
     {
         AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
-        ReplicaLayout.ForRangeRead forRangeRead = 
ReplicaLayout.forRangeReadLiveSorted(metadata, keyspace, replicationStrategy, 
range);
-        EndpointsForRange candidates = candidatesForRead(keyspace, 
indexQueryPlan, consistencyLevel, forRangeRead.natural());
+        ReplicaLayout.ForRangeRead forRangeReadLiveAndDown = 
ReplicaLayout.forRangeReadSorted(metadata, keyspace, replicationStrategy, 
range);
+        ReplicaLayout.ForRangeRead forRangeReadLive = 
forRangeReadLiveAndDown.filter(FailureDetector.isReplicaAlive);
+        EndpointsForRange candidates = candidatesForRead(keyspace, 
indexQueryPlan, consistencyLevel, forRangeReadLive.natural());
         EndpointsForRange contacts = contactForRead(metadata.locator, 
replicationStrategy, consistencyLevel, false, candidates);
 
         if (throwOnInsufficientLiveReplicas)
@@ -955,6 +959,7 @@ public class ReplicaPlans
                                             range,
                                             candidates,
                                             contacts,
+                                            forRangeReadLiveAndDown.all(),
                                             vnodeCount,
                                             (newClusterMetadata) -> 
forRangeRead(newClusterMetadata, keyspace, indexQueryPlan, consistencyLevel, 
range, vnodeCount, false),
                                             (self, token) -> 
forReadRepair(self, metadata, keyspace, consistencyLevel, token, 
FailureDetector.isReplicaAlive),
@@ -983,7 +988,7 @@ public class ReplicaPlans
         EndpointsForRange contacts = builder.build();
 
         ClusterMetadata metadata = ClusterMetadata.current();
-        return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, contacts, contacts, vnodeCount, metadata.epoch);
+        return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, contacts, contacts, contacts, vnodeCount, 
metadata.epoch);
     }
 
     /**
@@ -1000,6 +1005,7 @@ public class ReplicaPlans
         if (!left.epoch.equals(right.epoch))
             return null;
 
+        EndpointsForRange mergedLiveAndDown = 
left.liveAndDown().keep(right.liveAndDown().endpoints());
         EndpointsForRange mergedCandidates = 
left.readCandidates().keep(right.readCandidates().endpoints());
         AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
         EndpointsForRange contacts = contactForRead(metadata.locator, 
replicationStrategy, consistencyLevel, false, mergedCandidates);
@@ -1023,6 +1029,7 @@ public class ReplicaPlans
                                             newRange,
                                             mergedCandidates,
                                             contacts,
+                                            mergedLiveAndDown,
                                             newVnodeCount,
                                             (newClusterMetadata) -> 
forRangeRead(newClusterMetadata,
                                                                                
  keyspace,
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 06f90907d5..15d2b320fb 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -415,6 +415,12 @@ public class Paxos
             return electorateNatural;
         }
 
+        @Override
+        public EndpointsForToken liveAndDown()
+        {
+            return all;
+        }
+
         @Override
         public boolean stillAppliesTo(ClusterMetadata newMetadata)
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 1e5773c67a..ce3df571d8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -380,7 +380,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
      * local reads triggered by read repair (after speculative reads) execute 
at roughly the same time.
      *
      * This test depends on whether node1 gets a data or a digest request 
first, we force it to be a digest request
-     * in the forTokenReadLiveSorted ByteBuddy rule below.
+     * in the forTokenReadSorted ByteBuddy rule below.
      */
     @Test
     public void testLocalDataAndRemoteRequestConcurrency() throws Exception
@@ -440,7 +440,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
                                .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
 
                 new ByteBuddy().rebase(ReplicaLayout.class)
-                               
.method(named("forTokenReadLiveSorted").and(takesArguments(ClusterMetadata.class,
 Keyspace.class, AbstractReplicationStrategy.class, Token.class)))
+                               
.method(named("forTokenReadSorted").and(takesArguments(ClusterMetadata.class, 
Keyspace.class, AbstractReplicationStrategy.class, Token.class)))
                                .intercept(MethodDelegation.to(BBHelper.class))
                                .make()
                                .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
@@ -475,7 +475,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
         }
 
         @SuppressWarnings({ "unused" })
-        public static ReplicaLayout.ForTokenRead 
forTokenReadLiveSorted(ClusterMetadata metadata, Keyspace keyspace, 
AbstractReplicationStrategy replicationStrategy, Token token)
+        public static ReplicaLayout.ForTokenRead 
forTokenReadSorted(ClusterMetadata metadata, Keyspace keyspace, 
AbstractReplicationStrategy replicationStrategy, Token token)
         {
             try
             {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
new file mode 100644
index 0000000000..7abfd7386c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class FailureDetectorRecomputeTest extends TestBaseImpl
+{
+    @Test
+    public void readTest() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           
.withInstanceInitializer(BB::install)
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            cluster.get(1).runOnInstance(() -> BB.enabled.set(true));
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl where id=?"), ConsistencyLevel.QUORUM, i);
+        }
+    }
+
+    @Test
+    public void writeTest() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           
.withInstanceInitializer(BB::install)
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            cluster.get(1).runOnInstance(() -> BB.enabled.set(true));
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.QUORUM, i);
+        }
+    }
+
+    public static class BB
+    {
+        public static AtomicBoolean enabled = new AtomicBoolean();
+
+        public static void install(ClassLoader cl, int i)
+        {
+            new ByteBuddy().rebase(FailureDetector.class)
+                           .method(named("isAlive").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+            new ByteBuddy().rebase(ReplicaPlan.AbstractForRead.class)
+                           
.method(named("stillAppliesTo").and(takesArguments(1)))
+                           
.intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        static int downNode = 1;
+        public static boolean isAlive(InetAddressAndPort ep)
+        {
+            if (!enabled.get())
+                return true;
+            enabled.set(false);
+            
ClusterMetadataService.instance().commit(CustomTransformation.make("hello"));
+            enabled.set(true);
+            return !ep.equals(InetAddressAndPort.getByNameUnchecked("127.0.0." 
+ ((downNode % 3) + 1)));
+        }
+
+        public static boolean stillAppliesTo(ClusterMetadata metadata, 
@SuperCall Callable<Boolean> zuper) throws Exception
+        {
+            if (!enabled.get())
+                return true;
+            downNode++;
+            return zuper.call();
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 9c56f00a80..d281025666 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -1331,7 +1331,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
                                                                
ks.getReplicationStrategy(),
                                                                
consistencyLevel,
                                                                
ReplicaUtils.FULL_BOUNDS,
-                                                               replicas, 
replicas,
+                                                               replicas, 
replicas, replicas,
                                                                1, null,
                                                                repairPlan,
                                                                Epoch.EMPTY));
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 84a1167295..17baa4fa55 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -215,7 +215,7 @@ public class DigestResolverTest extends 
AbstractReadResponseTest
 
     private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel 
consistencyLevel, EndpointsForToken replicas)
     {
-        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, null, (self) 
-> null, Epoch.EMPTY));
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, replicas, 
null, (self) -> null, Epoch.EMPTY));
     }
 
     private void waitForLatch(CountDownLatch startlatch)
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 046da259e9..e23c7078b4 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -278,6 +278,6 @@ public class ReadExecutorTest
 
     private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken natural, EndpointsForToken selected)
     {
-        return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), 
consistencyLevel, natural, selected, (cm) -> null, (self) -> null, Epoch.EMPTY);
+        return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), 
consistencyLevel, natural, selected, natural, (cm) -> null, (self) -> null, 
Epoch.EMPTY);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index f0026d35b9..1689069cf9 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -357,6 +357,7 @@ public abstract  class AbstractReadRepairTest
                                             ReplicaUtils.FULL_BOUNDS,
                                             replicas,
                                             targets,
+                                            replicas,
                                             1,
                                             null,
                                             (self, token) -> 
forReadRepair(self, ClusterMetadata.current(), keyspace, consistencyLevel, 
token, (r) -> true),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to