This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b31d15b9b5 Avoid failing queries when epoch changes and replica goes up/down b31d15b9b5 is described below commit b31d15b9b58926676436807ddc1efdd5616e13b3 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Mar 26 15:48:13 2025 +0100 Avoid failing queries when epoch changes and replica goes up/down Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20489 --- CHANGES.txt | 1 + .../apache/cassandra/locator/ReplicaLayout.java | 16 ++- .../org/apache/cassandra/locator/ReplicaPlan.java | 38 ++++--- .../org/apache/cassandra/locator/ReplicaPlans.java | 33 +++--- .../org/apache/cassandra/service/paxos/Paxos.java | 6 ++ .../distributed/test/RepairDigestTrackingTest.java | 6 +- .../test/tcm/FailureDetectorRecomputeTest.java | 113 +++++++++++++++++++++ .../cassandra/service/reads/DataResolverTest.java | 2 +- .../service/reads/DigestResolverTest.java | 2 +- .../cassandra/service/reads/ReadExecutorTest.java | 2 +- .../reads/repair/AbstractReadRepairTest.java | 1 + 11 files changed, 172 insertions(+), 48 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 45a4f16b56..eee55c65e6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Avoid failing queries when epoch changes and replica goes up/down (CASSANDRA-20489) * Split out truncation record lock (CASSANDRA-20480) * Throw new IndexBuildInProgressException when queries fail during index build, instead of IndexNotAvailableException (CASSANDRA-20402) * Fix Paxos repair interrupts running transactions (CASSANDRA-20469) diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java index f0069f2555..30a52be73a 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java +++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java @@ -25,7 +25,6 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.FBUtilities; @@ -354,32 +353,31 @@ public abstract class ReplicaLayout<E extends Endpoints<E>> } /** - * @return the read layout for a token - this includes only live natural replicas, i.e. those that are not pending - * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch + * @return the read layout for a token - this includes natural replicas, i.e. those that are not pending. + * They are reverse sorted by the badness score of the configured snitch */ - static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, Token token) + static ReplicaLayout.ForTokenRead forTokenReadSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, Token token) { EndpointsForToken replicas = keyspace.getMetadata().params.replication.isLocal() ? forLocalStrategyToken(metadata, replicationStrategy, token) : forNonLocalStrategyTokenRead(metadata, keyspace.getMetadata(), token); + replicas = DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); - replicas = replicas.filter(FailureDetector.isReplicaAlive); + return new ReplicaLayout.ForTokenRead(replicationStrategy, replicas); } /** * TODO: we should really double check that the provided range does not overlap multiple token ring regions - * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending - * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch + * @return the read layout for a range - these are reverse sorted by the badness score of the configured snitch */ - static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range) + static ReplicaLayout.ForRangeRead forRangeReadSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range) { EndpointsForRange replicas = keyspace.getMetadata().params.replication.isLocal() ? forLocalStrategyRange(metadata, replicationStrategy, range) : forNonLocalStategyRangeRead(metadata, keyspace.getMetadata(), range); replicas = DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); - replicas = replicas.filter(FailureDetector.isReplicaAlive); return new ReplicaLayout.ForRangeRead(replicationStrategy, range, replicas); } diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java index 62db6f85f3..7d08b341b8 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java @@ -27,7 +27,6 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.FBUtilities; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -44,6 +43,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> ConsistencyLevel consistencyLevel(); E contacts(); + E liveAndDown(); Replica lookup(InetAddressAndPort endpoint); P withContacts(E contacts); @@ -82,29 +82,28 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal)) protected final E contacts; + protected final E liveAndDown; protected final Function<ClusterMetadata, P> recompute; protected List<InetAddressAndPort> contacted = new CopyOnWriteArrayList<>(); - AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, Function<ClusterMetadata, P> recompute, Epoch epoch) + AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, E liveAndDown, Function<ClusterMetadata, P> recompute, Epoch epoch) { assert contacts != null; this.keyspace = keyspace; this.replicationStrategy = replicationStrategy; this.consistencyLevel = consistencyLevel; this.contacts = contacts; + this.liveAndDown = liveAndDown; this.recompute = recompute; this.epoch = epoch; } public E contacts() { return contacts; } + public E liveAndDown() { return liveAndDown; } public Keyspace keyspace() { return keyspace; } public AbstractReplicationStrategy replicationStrategy() { return replicationStrategy; } public ConsistencyLevel consistencyLevel() { return consistencyLevel; } - public boolean canDoLocalRequest() - { - return contacts.contains(FBUtilities.getBroadcastAddressAndPort()); - } public Epoch epoch() { @@ -132,10 +131,11 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> ConsistencyLevel consistencyLevel, E candidates, E contacts, + E liveAndDown, Function<ClusterMetadata, P> recompute, Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, contacts, recompute, epoch); + super(keyspace, replicationStrategy, consistencyLevel, contacts, liveAndDown, recompute, epoch); this.candidates = candidates; this.readQuorum = consistencyLevel.blockFor(replicationStrategy); } @@ -171,13 +171,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> ForRead<?, ?> newPlan = recompute.apply(newMetadata); - if (readCandidates().equals(newPlan.readCandidates())) + if (liveAndDown().equals(newPlan.liveAndDown())) return true; int readQuorum = newPlan.readQuorum(); for (InetAddressAndPort addr : contacted) { - if (newPlan.readCandidates().contains(addr)) + if (newPlan.liveAndDown().contains(addr)) readQuorum--; } @@ -204,17 +204,18 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contacts, + EndpointsForToken liveAndDown, Function<ClusterMetadata, ReplicaPlan.ForTokenRead> recompute, Function<ReplicaPlan<?, ?>, ReplicaPlan.ForWrite> repairPlan, Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, recompute, epoch); + super(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, liveAndDown, recompute, epoch); this.repairPlan = repairPlan; } public ForTokenRead withContacts(EndpointsForToken newContacts) { - ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, newContacts, recompute, repairPlan, epoch); + ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, newContacts, liveAndDown, recompute, repairPlan, epoch); res.contacted.addAll(contacted); return res; } @@ -240,12 +241,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, + EndpointsForRange liveAndDown, int vnodeCount, Function<ClusterMetadata, ReplicaPlan.ForRangeRead> recompute, BiFunction<ReplicaPlan<?, ?>, Token, ReplicaPlan.ForWrite> repairPlan, Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, candidates, contact, recompute, epoch); + super(keyspace, replicationStrategy, consistencyLevel, candidates, contact, liveAndDown, recompute, epoch); this.range = range; this.vnodeCount = vnodeCount; this.repairPlan = repairPlan; @@ -260,7 +262,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> public ForRangeRead withContacts(EndpointsForRange newContact) { - ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, readCandidates(), newContact, vnodeCount, recompute, repairPlan, epoch); + ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, readCandidates(), newContact, liveAndDown, vnodeCount, recompute, repairPlan, epoch); res.contacted.addAll(contacted); return res; } @@ -284,6 +286,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, + EndpointsForRange liveAndDown, int vnodeCount, Epoch epoch) { @@ -291,7 +294,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> // the epoch change during the course of query execution so no recomputation function is supplied. Likewise, // no read repair is expected to be performed during this type of query so a null is also used in place of a // function for calculating the repair plan. - super(keyspace, replicationStrategy, consistencyLevel, range, candidates, contact, vnodeCount, null, null, epoch); + super(keyspace, replicationStrategy, consistencyLevel, range, candidates, contact, liveAndDown, vnodeCount, null, null, epoch); } @Override @@ -305,7 +308,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> { // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch final EndpointsForToken pending; - final EndpointsForToken liveAndDown; final EndpointsForToken live; final int writeQuorum; @@ -319,9 +321,8 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> Function<ClusterMetadata, ForWrite> recompute, Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, contact, recompute, epoch); + super(keyspace, replicationStrategy, consistencyLevel, contact, liveAndDown, recompute, epoch); this.pending = pending; - this.liveAndDown = liveAndDown; this.live = live; this.writeQuorum = consistencyLevel.blockForWrite(replicationStrategy, pending); } @@ -331,9 +332,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */ public EndpointsForToken pending() { return pending; } - /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */ - public EndpointsForToken liveAndDown() { return liveAndDown; } - /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */ public EndpointsForToken live() { return live; } diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index b6a03b683b..53b32797c6 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -839,13 +839,9 @@ public class ReplicaPlans private static ReplicaPlan.ForTokenRead forSingleReplicaRead(ClusterMetadata metadata, Keyspace keyspace, Token token, Replica replica) { - // todo; replica does not always contain token, figure out why -// if (!metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(token).contains(replica)) -// throw UnavailableException.create(ConsistencyLevel.ONE, 1, 1, 0, 0); - EndpointsForToken one = EndpointsForToken.of(token, replica); - return new ReplicaPlan.ForTokenRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one, + return new ReplicaPlan.ForTokenRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one, one, (newClusterMetadata) -> forSingleReplicaRead(newClusterMetadata, keyspace, token, replica), (self) -> { throw new IllegalStateException("Read repair is not supported for short read/replica filtering protection."); @@ -866,7 +862,7 @@ public class ReplicaPlans // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class EndpointsForRange one = EndpointsForRange.of(replica); - return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, vnodeCount, + return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, one, vnodeCount, (newClusterMetadata) -> forSingleReplicaRead(metadata, keyspace, range, replica, vnodeCount), (self, token) -> { throw new IllegalStateException("Read repair is not supported for short read/replica filtering protection."); @@ -901,17 +897,24 @@ public class ReplicaPlans return forRead(metadata, keyspace, token, indexQueryPlan, consistencyLevel, retry, true); } - private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata, Keyspace keyspace, Token token, @Nullable Index.QueryPlan indexQueryPlan, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry, boolean throwOnInsufficientLiveReplicas) + private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata, + Keyspace keyspace, + Token token, + @Nullable Index.QueryPlan indexQueryPlan, + ConsistencyLevel consistencyLevel, + SpeculativeRetryPolicy retry, + boolean throwOnInsufficientLiveReplicas) { AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); - ReplicaLayout.ForTokenRead forTokenRead = ReplicaLayout.forTokenReadLiveSorted(metadata, keyspace, replicationStrategy, token); - EndpointsForToken candidates = candidatesForRead(keyspace, indexQueryPlan, consistencyLevel, forTokenRead.natural()); + ReplicaLayout.ForTokenRead forTokenReadLiveAndDown = ReplicaLayout.forTokenReadSorted(metadata, keyspace, replicationStrategy, token); + ReplicaLayout.ForTokenRead forTokenReadLive = forTokenReadLiveAndDown.filter(FailureDetector.isReplicaAlive); + EndpointsForToken candidates = candidatesForRead(keyspace, indexQueryPlan, consistencyLevel, forTokenReadLive.all()); EndpointsForToken contacts = contactForRead(metadata.locator, replicationStrategy, consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates); if (throwOnInsufficientLiveReplicas) assureSufficientLiveReplicasForRead(metadata.locator, replicationStrategy, consistencyLevel, contacts); - return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, + return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, forTokenReadLiveAndDown.all(), (newClusterMetadata) -> forRead(newClusterMetadata, keyspace, token, indexQueryPlan, consistencyLevel, retry, false), (self) -> forReadRepair(self, metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive), metadata.epoch); @@ -942,8 +945,9 @@ public class ReplicaPlans boolean throwOnInsufficientLiveReplicas) { AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); - ReplicaLayout.ForRangeRead forRangeRead = ReplicaLayout.forRangeReadLiveSorted(metadata, keyspace, replicationStrategy, range); - EndpointsForRange candidates = candidatesForRead(keyspace, indexQueryPlan, consistencyLevel, forRangeRead.natural()); + ReplicaLayout.ForRangeRead forRangeReadLiveAndDown = ReplicaLayout.forRangeReadSorted(metadata, keyspace, replicationStrategy, range); + ReplicaLayout.ForRangeRead forRangeReadLive = forRangeReadLiveAndDown.filter(FailureDetector.isReplicaAlive); + EndpointsForRange candidates = candidatesForRead(keyspace, indexQueryPlan, consistencyLevel, forRangeReadLive.natural()); EndpointsForRange contacts = contactForRead(metadata.locator, replicationStrategy, consistencyLevel, false, candidates); if (throwOnInsufficientLiveReplicas) @@ -955,6 +959,7 @@ public class ReplicaPlans range, candidates, contacts, + forRangeReadLiveAndDown.all(), vnodeCount, (newClusterMetadata) -> forRangeRead(newClusterMetadata, keyspace, indexQueryPlan, consistencyLevel, range, vnodeCount, false), (self, token) -> forReadRepair(self, metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive), @@ -983,7 +988,7 @@ public class ReplicaPlans EndpointsForRange contacts = builder.build(); ClusterMetadata metadata = ClusterMetadata.current(); - return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy, consistencyLevel, range, contacts, contacts, vnodeCount, metadata.epoch); + return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy, consistencyLevel, range, contacts, contacts, contacts, vnodeCount, metadata.epoch); } /** @@ -1000,6 +1005,7 @@ public class ReplicaPlans if (!left.epoch.equals(right.epoch)) return null; + EndpointsForRange mergedLiveAndDown = left.liveAndDown().keep(right.liveAndDown().endpoints()); EndpointsForRange mergedCandidates = left.readCandidates().keep(right.readCandidates().endpoints()); AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); EndpointsForRange contacts = contactForRead(metadata.locator, replicationStrategy, consistencyLevel, false, mergedCandidates); @@ -1023,6 +1029,7 @@ public class ReplicaPlans newRange, mergedCandidates, contacts, + mergedLiveAndDown, newVnodeCount, (newClusterMetadata) -> forRangeRead(newClusterMetadata, keyspace, diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index 06f90907d5..15d2b320fb 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -415,6 +415,12 @@ public class Paxos return electorateNatural; } + @Override + public EndpointsForToken liveAndDown() + { + return all; + } + @Override public boolean stillAppliesTo(ClusterMetadata newMetadata) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java index 1e5773c67a..ce3df571d8 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java @@ -380,7 +380,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl * local reads triggered by read repair (after speculative reads) execute at roughly the same time. * * This test depends on whether node1 gets a data or a digest request first, we force it to be a digest request - * in the forTokenReadLiveSorted ByteBuddy rule below. + * in the forTokenReadSorted ByteBuddy rule below. */ @Test public void testLocalDataAndRemoteRequestConcurrency() throws Exception @@ -440,7 +440,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl .load(classLoader, ClassLoadingStrategy.Default.INJECTION); new ByteBuddy().rebase(ReplicaLayout.class) - .method(named("forTokenReadLiveSorted").and(takesArguments(ClusterMetadata.class, Keyspace.class, AbstractReplicationStrategy.class, Token.class))) + .method(named("forTokenReadSorted").and(takesArguments(ClusterMetadata.class, Keyspace.class, AbstractReplicationStrategy.class, Token.class))) .intercept(MethodDelegation.to(BBHelper.class)) .make() .load(classLoader, ClassLoadingStrategy.Default.INJECTION); @@ -475,7 +475,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl } @SuppressWarnings({ "unused" }) - public static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, Token token) + public static ReplicaLayout.ForTokenRead forTokenReadSorted(ClusterMetadata metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, Token token) { try { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java new file mode 100644 index 0000000000..7abfd7386c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; + +import org.junit.Test; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.transformations.CustomTransformation; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class FailureDetectorRecomputeTest extends TestBaseImpl +{ + @Test + public void readTest() throws IOException + { + try (Cluster cluster = init(Cluster.build(3) + .withInstanceInitializer(BB::install) + .start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + cluster.get(1).runOnInstance(() -> BB.enabled.set(true)); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where id=?"), ConsistencyLevel.QUORUM, i); + } + } + + @Test + public void writeTest() throws IOException + { + try (Cluster cluster = init(Cluster.build(3) + .withInstanceInitializer(BB::install) + .start())) + { + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + cluster.get(1).runOnInstance(() -> BB.enabled.set(true)); + for (int i = 0; i < 10; i++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.QUORUM, i); + } + } + + public static class BB + { + public static AtomicBoolean enabled = new AtomicBoolean(); + + public static void install(ClassLoader cl, int i) + { + new ByteBuddy().rebase(FailureDetector.class) + .method(named("isAlive").and(takesArguments(1))) + .intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + + new ByteBuddy().rebase(ReplicaPlan.AbstractForRead.class) + .method(named("stillAppliesTo").and(takesArguments(1))) + .intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + static int downNode = 1; + public static boolean isAlive(InetAddressAndPort ep) + { + if (!enabled.get()) + return true; + enabled.set(false); + ClusterMetadataService.instance().commit(CustomTransformation.make("hello")); + enabled.set(true); + return !ep.equals(InetAddressAndPort.getByNameUnchecked("127.0.0." + ((downNode % 3) + 1))); + } + + public static boolean stillAppliesTo(ClusterMetadata metadata, @SuperCall Callable<Boolean> zuper) throws Exception + { + if (!enabled.get()) + return true; + downNode++; + return zuper.call(); + } + } +} diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java index 9c56f00a80..d281025666 100644 --- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java @@ -1331,7 +1331,7 @@ public class DataResolverTest extends AbstractReadResponseTest ks.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS, - replicas, replicas, + replicas, replicas, replicas, 1, null, repairPlan, Epoch.EMPTY)); diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java index 84a1167295..17baa4fa55 100644 --- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java +++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java @@ -215,7 +215,7 @@ public class DigestResolverTest extends AbstractReadResponseTest private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas) { - return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, null, (self) -> null, Epoch.EMPTY)); + return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, replicas, null, (self) -> null, Epoch.EMPTY)); } private void waitForLatch(CountDownLatch startlatch) diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index 046da259e9..e23c7078b4 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -278,6 +278,6 @@ public class ReadExecutorTest private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected) { - return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, natural, selected, (cm) -> null, (self) -> null, Epoch.EMPTY); + return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), consistencyLevel, natural, selected, natural, (cm) -> null, (self) -> null, Epoch.EMPTY); } } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java index f0026d35b9..1689069cf9 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java @@ -357,6 +357,7 @@ public abstract class AbstractReadRepairTest ReplicaUtils.FULL_BOUNDS, replicas, targets, + replicas, 1, null, (self, token) -> forReadRepair(self, ClusterMetadata.current(), keyspace, consistencyLevel, token, (r) -> true), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org