Repository: cassandra Updated Branches: refs/heads/trunk 176f2a444 -> 45c0f860f
Run repair with down replicas Patch by Sankalp Kohli & Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-10446 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45c0f860 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45c0f860 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45c0f860 Branch: refs/heads/trunk Commit: 45c0f860f3c7f8e0a7c80809c4ff47f4acf65557 Parents: 176f2a4 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Oct 12 10:14:16 2016 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Jun 30 11:31:15 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/repair/RepairRunnable.java | 12 +++++- .../apache/cassandra/repair/RepairSession.java | 39 ++++++++++++++++++-- .../cassandra/repair/RepairSessionResult.java | 15 +++++++- .../cassandra/repair/messages/RepairOption.java | 25 ++++++++++++- .../cassandra/service/ActiveRepairService.java | 15 ++++++-- .../apache/cassandra/tools/nodetool/Repair.java | 4 ++ .../cassandra/repair/RepairSessionTest.java | 2 +- .../consistent/CoordinatorSessionTest.java | 2 +- .../repair/messages/RepairOptionTest.java | 22 +++++++++++ 10 files changed, 125 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 866c6fd..6444994 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 4.0 + * Run repair with down replicas (CASSANDRA-10446) + * Added started & completed repair metrics (CASSANDRA-13598) * Added started & completed repair metrics (CASSANDRA-13598) * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130) * Improve calculation of available disk space for compaction (CASSANDRA-13068) http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index eca162e..29347a4 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -289,9 +289,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti // filter out null(=failed) results and get successful ranges for (RepairSessionResult sessionResult : results) { + logger.debug("Repair result: {}", results); if (sessionResult != null) { - successfulRanges.addAll(sessionResult.ranges); + // don't promote sstables for sessions we skipped replicas for + if (!sessionResult.skippedReplicas) + { + successfulRanges.addAll(sessionResult.ranges); + } + else + { + logger.debug("Skipping anticompaction for {}", results); + } } else { @@ -424,6 +433,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti p.left, isConsistent, options.isPullRepair(), + options.isForcedRepair(), options.getPreviewKind(), executor, cfnames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index c1b3f41..98ed1a3 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.*; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; @@ -89,6 +90,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private final String[] cfnames; public final RepairParallelism parallelismDegree; public final boolean pullRepair; + + // indicates some replicas were not included in the repair. Only relevant for --force option + public final boolean skippedReplicas; + /** Range to repair */ public final Collection<Range<Token>> ranges; public final Set<InetAddress> endpoints; @@ -116,8 +121,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * @param keyspace name of keyspace * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees * @param endpoints the data centers that should be part of the repair; null for all DCs - * @param repairedAt when the repair occurred (millis) * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) + * @param force true if the repair should ignore dead endpoints (instead of failing) * @param cfnames names of columnfamilies */ public RepairSession(UUID parentRepairSession, @@ -128,6 +133,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement Set<InetAddress> endpoints, boolean isConsistent, boolean pullRepair, + boolean force, PreviewKind previewKind, String... cfnames) { @@ -139,10 +145,35 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.keyspace = keyspace; this.cfnames = cfnames; this.ranges = ranges; + + //If force then filter out dead endpoints + boolean forceSkippedReplicas = false; + if (force) + { + logger.debug("force flag set, removing dead endpoints"); + final Set<InetAddress> removeCandidates = new HashSet<>(); + for (final InetAddress endpoint : endpoints) + { + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.info("Removing a dead node from Repair due to -force " + endpoint); + removeCandidates.add(endpoint); + } + } + if (!removeCandidates.isEmpty()) + { + // we shouldn't be promoting sstables to repaired if any replicas are excluded from the repair + forceSkippedReplicas = true; + endpoints = new HashSet<>(endpoints); + endpoints.removeAll(removeCandidates); + } + } + this.endpoints = endpoints; this.isConsistent = isConsistent; this.previewKind = previewKind; this.pullRepair = pullRepair; + this.skippedReplicas = forceSkippedReplicas; } public UUID getId() @@ -241,7 +272,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges)); Tracing.traceRepair(message); - set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList())); + set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas)); if (!previewKind.isPreview()) { SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); @@ -252,7 +283,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // Checking all nodes are live for (InetAddress endpoint : endpoints) { - if (!FailureDetector.instance.isAlive(endpoint)) + if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas) { message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); logger.error("{} {}", previewKind.logPrefix(getId()), message); @@ -283,7 +314,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // this repair session is completed logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully"); Tracing.traceRepair("Completed sync of range {}", ranges); - set(new RepairSessionResult(id, keyspace, ranges, results)); + set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas)); taskExecutor.shutdown(); // mark this session as terminated http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSessionResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java index d4fff37..491ab2f 100644 --- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java +++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java @@ -32,12 +32,25 @@ public class RepairSessionResult public final String keyspace; public final Collection<Range<Token>> ranges; public final Collection<RepairResult> repairJobResults; + public final boolean skippedReplicas; - public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults) + public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults, boolean skippedReplicas) { this.sessionId = sessionId; this.keyspace = keyspace; this.ranges = ranges; this.repairJobResults = repairJobResults; + this.skippedReplicas = skippedReplicas; + } + + public String toString() + { + return "RepairSessionResult{" + + "sessionId=" + sessionId + + ", keyspace='" + keyspace + '\'' + + ", ranges=" + ranges + + ", repairJobResults=" + repairJobResults + + ", skippedReplicas=" + skippedReplicas + + '}'; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 6d69cf0..a95ee19 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -48,6 +48,7 @@ public class RepairOption public static final String TRACE_KEY = "trace"; public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair"; public static final String PULL_REPAIR_KEY = "pullRepair"; + public static final String FORCE_REPAIR_KEY = "forceRepair"; public static final String PREVIEW = "previewKind"; // we don't want to push nodes too much for repair @@ -125,6 +126,11 @@ public class RepairOption * This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td> * <td>false</td> * </tr> + * <tr> + * <td>forceRepair</td> + * <td>"true" if the repair should continue, even if one of the replicas involved is down. + * <td>false</td> + * </tr> * </tbody> * </table> * @@ -140,6 +146,7 @@ public class RepairOption boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString())); boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); + boolean force = Boolean.parseBoolean(options.get(FORCE_REPAIR_KEY)); boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY)); int jobThreads = 1; @@ -178,7 +185,7 @@ public class RepairOption } } - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -249,6 +256,11 @@ public class RepairOption throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges"); } + if (option.isIncremental() && option.isForcedRepair()) + { + throw new IllegalArgumentException("Cannot force incremental repair"); + } + return option; } @@ -259,6 +271,7 @@ public class RepairOption private final int jobThreads; private final boolean isSubrangeRepair; private final boolean pullRepair; + private final boolean forceRepair; private final PreviewKind previewKind; private final Collection<String> columnFamilies = new HashSet<>(); @@ -266,7 +279,7 @@ public class RepairOption private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind) { if (FBUtilities.isWindows && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && @@ -285,6 +298,7 @@ public class RepairOption this.ranges.addAll(ranges); this.isSubrangeRepair = isSubrangeRepair; this.pullRepair = pullRepair; + this.forceRepair = forceRepair; this.previewKind = previewKind; } @@ -313,6 +327,11 @@ public class RepairOption return pullRepair; } + public boolean isForcedRepair() + { + return forceRepair; + } + public int getJobThreads() { return jobThreads; @@ -376,6 +395,7 @@ public class RepairOption ", previewKind: " + previewKind + ", # of ranges: " + ranges.size() + ", pull repair: " + pullRepair + + ", force repair: " + forceRepair + ')'; } @@ -393,6 +413,7 @@ public class RepairOption options.put(TRACE_KEY, Boolean.toString(trace)); options.put(RANGES_KEY, Joiner.on(",").join(ranges)); options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair)); + options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair)); options.put(PREVIEW, previewKind.toString()); return options; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index a397ca2..d50dc3f 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -187,6 +187,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai Set<InetAddress> endpoints, boolean isConsistent, boolean pullRepair, + boolean force, PreviewKind previewKind, ListeningExecutorService executor, String... cfnames) @@ -197,7 +198,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, force, previewKind, cfnames); sessions.put(session.getId(), session); // register listeners @@ -389,8 +390,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } else { - // bailout early to avoid potentially waiting for a long time. - failRepair(parentRepairSession, "Endpoint not alive: " + neighbour); + if (options.isForcedRepair()) + { + prepareLatch.countDown(); + } + else + { + // bailout early to avoid potentially waiting for a long time. + failRepair(parentRepairSession, "Endpoint not alive: " + neighbour); + } + } } try http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/tools/nodetool/Repair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 317a677..77ad214 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -74,6 +74,9 @@ public class Repair extends NodeToolCmd @Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.") private boolean fullRepair = false; + @Option(title = "force", name = {"-force", "--force"}, description = "Use -force to filter out down endpoints") + private boolean force = false; + @Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair") private boolean preview = false; @@ -139,6 +142,7 @@ public class Repair extends NodeToolCmd options.put(RepairOption.TRACE_KEY, Boolean.toString(trace)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair)); + options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force)); options.put(RepairOption.PREVIEW, getPreviewKind().toString()); if (!startToken.isEmpty() || !endToken.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 5a4e5b1..efae538 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -65,7 +65,7 @@ public class RepairSessionTest Set<InetAddress> endpoints = Sets.newHashSet(remote); RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, - endpoints, false, false, + endpoints, false, false, false, PreviewKind.NONE, "Standard1"); // perform convict http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java index 4f5b7e6..3c27b5e 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -76,7 +76,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest private static RepairSessionResult createResult(CoordinatorSession coordinator) { - return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null); + return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false); } private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected) http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java index 9eb7c86..13d7575 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.junit.Assert; import org.junit.Test; import com.google.common.collect.ImmutableMap; @@ -161,6 +162,27 @@ public class RepairOptionTest } + @Test + public void testForceOption() throws Exception + { + RepairOption option; + Map<String, String> options = new HashMap<>(); + + // default value + option = RepairOption.parse(options, Murmur3Partitioner.instance); + Assert.assertFalse(option.isForcedRepair()); + + // explicit true + options.put(RepairOption.FORCE_REPAIR_KEY, "true"); + option = RepairOption.parse(options, Murmur3Partitioner.instance); + Assert.assertTrue(option.isForcedRepair()); + + // explicit false + options.put(RepairOption.FORCE_REPAIR_KEY, "false"); + option = RepairOption.parse(options, Murmur3Partitioner.instance); + Assert.assertFalse(option.isForcedRepair()); + } + private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage) { try --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org