This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 89a8155916ff7f94db9436de3f096aa22e047e35 Merge: d9dbd5e1d0 505f5af645 Author: Andrés de la Peña <[email protected]> AuthorDate: Thu Feb 8 11:11:07 2024 +0000 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + conf/cassandra.yaml | 36 +++++- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../cassandra/repair/AbstractRepairTask.java | 6 +- .../cassandra/repair/IncrementalRepairTask.java | 4 +- .../apache/cassandra/repair/NormalRepairTask.java | 4 +- .../apache/cassandra/repair/PreviewRepairTask.java | 4 +- .../org/apache/cassandra/repair/RepairJob.java | 106 ++++++++-------- .../apache/cassandra/repair/RepairRunnable.java | 4 +- .../org/apache/cassandra/repair/RepairSession.java | 3 + .../org/apache/cassandra/repair/RepairTask.java | 6 +- .../org/apache/cassandra/repair/Scheduler.java | 118 ++++++++++++++++++ .../cassandra/service/ActiveRepairService.java | 17 ++- .../service/ActiveRepairServiceMBean.java | 4 + .../test/OptimiseStreamsRepairTest.java | 18 +-- .../repair/ConcurrentValidationRequestsTest.java | 136 +++++++++++++++++++++ .../org/apache/cassandra/repair/RepairJobTest.java | 6 +- .../apache/cassandra/repair/RepairSessionTest.java | 2 +- 19 files changed, 403 insertions(+), 84 deletions(-) diff --cc CHANGES.txt index 4b8393ebc2,baeb9b7f61..44ca779885 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,11 +1,14 @@@ -4.0.13 +4.1.4 + * Memoize Cassandra verion and add a backoff interval for failed schema pulls (CASSANDRA-18902) + * Fix StackOverflowError on ALTER after many previous schema changes (CASSANDRA-19166) + * Fixed the inconsistency between distributedKeyspaces and distributedAndLocalKeyspaces (CASSANDRA-18747) + * Internode legacy SSL storage port certificate is not hot reloaded on update (CASSANDRA-18681) + * Nodetool paxos-only repair is no longer incremental (CASSANDRA-18466) + * Waiting indefinitely on ReceivedMessage response in StreamSession#receive() can cause deadlock (CASSANDRA-18733) + * Allow empty keystore_password in encryption_options (CASSANDRA-18778) + * Skip ColumnFamilyStore#topPartitions initialization when client or tool mode (CASSANDRA-18697) +Merged from 4.0: + * Add new concurrent_merkle_tree_requests config property to prevent OOM during multi-range and/or multi-table repairs (CASSANDRA-19336) -Merged from 3.11: -Merged from 3.0: - * Backport CASSANDRA-16418 to 3.x (CASSANDRA-18824) - - -4.0.12 * Skip version check if an endpoint is dead state in Gossiper#upgradeFromVersionSupplier (CASSANDRA-19187) * Fix Gossiper::hasMajorVersion3Nodes to return false during minor upgrade (CASSANDRA-18999) * Revert unnecessary read lock acquisition when reading ring version in TokenMetadata introduced in CASSANDRA-16286 (CASSANDRA-19107) diff --cc conf/cassandra.yaml index 4b2711cfb7,7f162749d2..1986d6fa29 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@@ -652,9 -565,29 +655,30 @@@ memtable_allocation_type: heap_buffer # # For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096. # -# repair_session_space_in_mb: +# Min unit: MiB +# repair_session_space: + # The number of simultaneous Merkle tree requests during repairs that can + # be performed by a repair command. The size of each validation request is + # limited by the repair_session_space property, so setting this to 1 will make + # sure that a repair command doesn't exceed that limit, even if the repair + # command is repairing multiple tables or multiple virtual nodes. + # + # There isn't a limit by default for backwards compatibility, but this can + # produce OOM for commands repairing multiple tables or multiple virtual nodes. + # A limit of just 1 simultaneous Merkle tree request is generally recommended + # with no virtual nodes so repair_session_space, and thereof the Merkle tree + # resolution, can be high. For virtual nodes a value of 1 with the default + # repair_session_space value will produce higher resolution Merkle trees + # at the expense of speed. Alternatively, when working with virtual nodes it + # can make sense to reduce the repair_session_space and increase the value of + # concurrent_merkle_tree_requests because each range will contain fewer data. + # + # For more details see https://issues.apache.org/jira/browse/CASSANDRA-19336. + # + # A zero value means no limit. + # concurrent_merkle_tree_requests: 0 + # Total space to use for commit logs on disk. # # If space gets above this value, Cassandra will flush every dirty CF diff --cc src/java/org/apache/cassandra/config/Config.java index 8a59ca2cda,d7517124df..298903acf3 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -192,9 -132,12 +192,11 @@@ public class Confi // Limit the maximum depth of repair session merkle trees @Deprecated public volatile Integer repair_session_max_tree_depth = null; - public volatile Integer repair_session_space_in_mb = null; - - public volatile long repair_request_timeout_in_ms = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + @Replaces(oldName = "repair_session_space_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true) + public volatile DataStorageSpec.IntMebibytesBound repair_session_space = null; + public volatile int concurrent_merkle_tree_requests = 0; + public volatile boolean use_offheap_merkle_trees = true; public int storage_port = 7000; diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index a04e85c1bc,377f67117d..0805b1f9ec --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -3400,34 -2863,33 +3400,44 @@@ public class DatabaseDescripto conf.repair_session_max_tree_depth = depth; } - public static int getRepairSessionSpaceInMegabytes() + public static int getRepairSessionSpaceInMiB() { - return conf.repair_session_space_in_mb; + return conf.repair_session_space.toMebibytes(); } - public static void setRepairSessionSpaceInMegabytes(int sizeInMegabytes) + public static void setRepairSessionSpaceInMiB(int sizeInMiB) { - if (sizeInMegabytes < 1) - throw new ConfigurationException("Cannot set repair_session_space_in_mb to " + sizeInMegabytes + - " < 1 megabyte"); - else if (sizeInMegabytes > (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))) - logger.warn("A repair_session_space_in_mb of " + conf.repair_session_space_in_mb + - " megabytes is likely to cause heap pressure."); + if (sizeInMiB < 1) + throw new ConfigurationException("Cannot set repair_session_space to " + sizeInMiB + + " < 1 mebibyte"); + else if (sizeInMiB > (int) (Runtime.getRuntime().maxMemory() / (4 * 1048576))) + logger.warn("A repair_session_space of " + conf.repair_session_space + + " is likely to cause heap pressure."); - conf.repair_session_space_in_mb = sizeInMegabytes; + conf.repair_session_space = new DataStorageSpec.IntMebibytesBound(sizeInMiB); } + public static int getConcurrentMerkleTreeRequests() + { + return conf.concurrent_merkle_tree_requests; + } + + public static void setConcurrentMerkleTreeRequests(int value) + { + conf.concurrent_merkle_tree_requests = value; + } + + public static int getPaxosRepairParallelism() + { + return conf.paxos_repair_parallelism; + } + + public static void setPaxosRepairParallelism(int v) + { + Preconditions.checkArgument(v > 0); + conf.paxos_repair_parallelism = v; + } + public static Float getMemtableCleanupThreshold() { return conf.memtable_cleanup_threshold; diff --cc src/java/org/apache/cassandra/repair/AbstractRepairTask.java index d2a6f1a786,0000000000..f2449f703a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java +++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java @@@ -1,124 -1,0 +1,128 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.FutureCombiner; + +public abstract class AbstractRepairTask implements RepairTask +{ + protected static final Logger logger = LoggerFactory.getLogger(AbstractRepairTask.class); + + protected final RepairOption options; + protected final String keyspace; + protected final RepairNotifier notifier; + + protected AbstractRepairTask(RepairOption options, String keyspace, RepairNotifier notifier) + { + this.options = Objects.requireNonNull(options); + this.keyspace = Objects.requireNonNull(keyspace); + this.notifier = Objects.requireNonNull(notifier); + } + + private List<RepairSession> submitRepairSessions(TimeUUID parentSession, + boolean isIncremental, + ExecutorPlus executor, ++ Scheduler validationScheduler, + List<CommonRange> commonRanges, + String... cfnames) + { + List<RepairSession> futures = new ArrayList<>(options.getRanges().size()); + + for (CommonRange commonRange : commonRanges) + { + logger.info("Starting RepairSession for {}", commonRange); + RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + commonRange, + keyspace, + options.getParallelism(), + isIncremental, + options.isPullRepair(), + options.getPreviewKind(), + options.optimiseStreams(), + options.repairPaxos(), + options.paxosOnly(), + executor, ++ validationScheduler, + cfnames); ++ + if (session == null) + continue; + session.addCallback(new RepairSessionCallback(session)); + futures.add(session); + } + return futures; + } + + protected Future<CoordinatedRepairResult> runRepair(TimeUUID parentSession, + boolean isIncremental, + ExecutorPlus executor, ++ Scheduler validationScheduler, + List<CommonRange> commonRanges, + String... cfnames) + { - List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, commonRanges, cfnames); ++ List<RepairSession> allSessions = submitRepairSessions(parentSession, isIncremental, executor, validationScheduler, commonRanges, cfnames); + List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, RepairSession::ranges); + Future<List<RepairSessionResult>> f = FutureCombiner.successfulOf(allSessions); + return f.map(results -> { + logger.debug("Repair result: {}", results); + return CoordinatedRepairResult.create(ranges, results); + }); + } + + private class RepairSessionCallback implements FutureCallback<RepairSessionResult> + { + private final RepairSession session; + + public RepairSessionCallback(RepairSession session) + { + this.session = session; + } + + public void onSuccess(RepairSessionResult result) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.ranges().toString()); + notifier.notifyProgress(message); + } + + public void onFailure(Throwable t) + { + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.ranges().toString(), t.getMessage()); + notifier.notifyError(new RuntimeException(message, t)); + } + } +} diff --cc src/java/org/apache/cassandra/repair/IncrementalRepairTask.java index af1a234d34,0000000000..6cfd6ff84b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java +++ b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java @@@ -1,75 -1,0 +1,75 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.repair.consistent.CoordinatorSession; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.Future; + +public class IncrementalRepairTask extends AbstractRepairTask +{ + private final TimeUUID parentSession; + private final RepairRunnable.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected IncrementalRepairTask(RepairOption options, + String keyspace, + RepairNotifier notifier, + TimeUUID parentSession, + RepairRunnable.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair"; + } + + @Override - public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception ++ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) throws Exception + { + // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted + Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder() + .addAll(neighborsAndRanges.participants) + .add(FBUtilities.getBroadcastAddressAndPort()) + .build(); + // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints. + List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants); + - return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, allRanges, cfnames)); ++ return coordinatorSession.execute(() -> runRepair(parentSession, true, executor, validationScheduler, allRanges, cfnames)); + + } +} diff --cc src/java/org/apache/cassandra/repair/NormalRepairTask.java index 56a03f8816,0000000000..a42b38012e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/NormalRepairTask.java +++ b/src/java/org/apache/cassandra/repair/NormalRepairTask.java @@@ -1,57 -1,0 +1,57 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.Future; + +public class NormalRepairTask extends AbstractRepairTask +{ + private final TimeUUID parentSession; + private final List<CommonRange> commonRanges; + private final String[] cfnames; + + protected NormalRepairTask(RepairOption options, + String keyspace, + RepairNotifier notifier, + TimeUUID parentSession, + List<CommonRange> commonRanges, + String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.commonRanges = commonRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair"; + } + + @Override - public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) ++ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { - return runRepair(parentSession, false, executor, commonRanges, cfnames); ++ return runRepair(parentSession, false, executor, validationScheduler, commonRanges, cfnames); + } +} diff --cc src/java/org/apache/cassandra/repair/PreviewRepairTask.java index 7ce7d1fbba,0000000000..c97632ef8f mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java +++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java @@@ -1,153 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.RepairMetrics; +import org.apache.cassandra.repair.consistent.SyncStatSummary; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.DiagnosticSnapshotService; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.Future; + +public class PreviewRepairTask extends AbstractRepairTask +{ + private final TimeUUID parentSession; + private final List<CommonRange> commonRanges; + private final String[] cfnames; + private volatile String successMessage = name() + " completed successfully"; + + protected PreviewRepairTask(RepairOption options, String keyspace, RepairNotifier notifier, TimeUUID parentSession, List<CommonRange> commonRanges, String[] cfnames) + { + super(options, keyspace, notifier); + this.parentSession = parentSession; + this.commonRanges = commonRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "Repair preview"; + } + + @Override + public String successMessage() + { + return successMessage; + } + + @Override - public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) ++ public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { - Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, commonRanges, cfnames); ++ Future<CoordinatedRepairResult> f = runRepair(parentSession, false, executor, validationScheduler, commonRanges, cfnames); + return f.map(result -> { + if (result.hasFailed()) + return result; + + PreviewKind previewKind = options.getPreviewKind(); + Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE"); + SyncStatSummary summary = new SyncStatSummary(true); + summary.consumeSessionResults(result.results); + + final String message; + if (summary.isEmpty()) + { + message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; + } + else + { + message = (previewKind == PreviewKind.REPAIRED ? "Repaired data is inconsistent\n" : "Preview complete\n") + summary; + RepairMetrics.previewFailures.inc(); + if (previewKind == PreviewKind.REPAIRED) + maybeSnapshotReplicas(parentSession, keyspace, result.results.get()); // we know its present as summary used it + } + successMessage += "; " + message; + notifier.notification(message); + + return result; + }); + } + + private void maybeSnapshotReplicas(TimeUUID parentSession, String keyspace, List<RepairSessionResult> results) + { + if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch()) + return; + + try + { + Set<String> mismatchingTables = new HashSet<>(); + Set<InetAddressAndPort> nodes = new HashSet<>(); + for (RepairSessionResult sessionResult : results) + { + for (RepairResult repairResult : emptyIfNull(sessionResult.repairJobResults)) + { + for (SyncStat stat : emptyIfNull(repairResult.stats)) + { + if (stat.numberOfDifferences > 0) + mismatchingTables.add(repairResult.desc.columnFamily); + // snapshot all replicas, even if they don't have any differences + nodes.add(stat.nodes.coordinator); + nodes.add(stat.nodes.peer); + } + } + } + + String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX); + for (String table : mismatchingTables) + { + // we can just check snapshot existence locally since the repair coordinator is always a replica (unlike in the read case) + if (!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName)) + { + logger.info("{} Snapshotting {}.{} for preview repair mismatch with tag {} on instances {}", + options.getPreviewKind().logPrefix(parentSession), + keyspace, table, snapshotName, nodes); + DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(), nodes); + } + else + { + logger.info("{} Not snapshotting {}.{} - snapshot {} exists", + options.getPreviewKind().logPrefix(parentSession), + keyspace, table, snapshotName); + } + } + } + catch (Exception e) + { + logger.error("{} Failed snapshotting replicas", options.getPreviewKind().logPrefix(parentSession), e); + } + } + + private static <T> Iterable<T> emptyIfNull(Iterable<T> iter) + { + if (iter == null) + return Collections.emptyList(); + return iter; + } +} diff --cc src/java/org/apache/cassandra/repair/RepairJob.java index aba8bd8c85,95fe84f550..d6f8820e56 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@@ -120,49 -100,13 +120,48 @@@ public class RepairJob extends AsyncFut Keyspace ks = Keyspace.open(desc.keyspace); ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily); cfs.metric.repairsStarted.inc(); - List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.commonRange.endpoints); + List<InetAddressAndPort> allEndpoints = new ArrayList<>(session.state.commonRange.endpoints); allEndpoints.add(FBUtilities.getBroadcastAddressAndPort()); - Future<List<TreeResponse>> treeResponses; + Future<Void> paxosRepair; + if (paxosRepairEnabled() && ((useV2() && session.repairPaxos) || session.paxosOnly)) + { + logger.info("{} {}.{} starting paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + TableMetadata metadata = Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily); + paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor); + } + else + { + logger.info("{} {}.{} not running paxos repair", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + paxosRepair = ImmediateFuture.success(null); + } + + if (session.paxosOnly) + { + paxosRepair.addCallback(new FutureCallback<Void>() + { + public void onSuccess(Void v) + { + logger.info("{} {}.{} paxos repair completed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + trySuccess(new RepairResult(desc, Collections.emptyList())); + } + + /** + * Snapshot, validation and sync failures are all handled here + */ + public void onFailure(Throwable t) + { + logger.warn("{} {}.{} paxos repair failed", session.previewKind.logPrefix(session.getId()), desc.keyspace, desc.columnFamily); + tryFailure(t); + } + }, taskExecutor); + return; + } + // Create a snapshot at all nodes unless we're using pure parallel repairs - ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks; ++ final Future<?> allSnapshotTasks; if (parallelismDegree != RepairParallelism.PARALLEL) { - Future<?> allSnapshotTasks; if (session.isIncremental) { // consistent repair does it's own "snapshotting" @@@ -171,45 -115,29 +170,34 @@@ else { // Request snapshot to all replica - List<ListenableFuture<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size()); - for (InetAddressAndPort endpoint : allEndpoints) - { - SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint); - snapshotTasks.add(snapshotTask); - taskExecutor.execute(snapshotTask); - } - allSnapshotTasks = Futures.allAsList(snapshotTasks); + allSnapshotTasks = paxosRepair.flatMap(input -> { + List<Future<InetAddressAndPort>> snapshotTasks = new ArrayList<>(allEndpoints.size()); + state.phase.snapshotsSubmitted(); + for (InetAddressAndPort endpoint : allEndpoints) + { + SnapshotTask snapshotTask = new SnapshotTask(desc, endpoint); + snapshotTasks.add(snapshotTask); + taskExecutor.execute(snapshotTask); + } + return FutureCombiner.allOf(snapshotTasks).map(a -> { + state.phase.snapshotsCompleted(); + return a; + }); + }); } - - // When all snapshot complete, send validation requests - treeResponses = allSnapshotTasks.flatMap(endpoints -> { - if (parallelismDegree == RepairParallelism.SEQUENTIAL) - return sendSequentialValidationRequest(allEndpoints); - else - return sendDCAwareValidationRequest(allEndpoints); - }, taskExecutor); } else { - // If not sequential, just send validation request to all replica - treeResponses = paxosRepair.flatMap(input -> sendValidationRequest(allEndpoints)); + allSnapshotTasks = null; } - treeResponses = treeResponses.map(a -> { - state.phase.validationCompleted(); - return a; - }); - // When all validations complete, submit sync tasks - Future<List<SyncStat>> syncResults = treeResponses.flatMap(session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, taskExecutor); + // Run validations and the creation of sync tasks in the scheduler, so it can limit the number of Merkle trees + // that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler. - ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync( - session.validationScheduler.schedule(() -> createSyncTasks(allSnapshotTasks, allEndpoints), taskExecutor), - this::executeTasks, taskExecutor); ++ Future<List<SyncStat>> syncResults = session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, allSnapshotTasks, allEndpoints), taskExecutor) ++ .flatMap(this::executeTasks, taskExecutor); // When all sync complete, set the final result - Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>() + syncResults.addCallback(new FutureCallback<List<SyncStat>>() { @Override public void onSuccess(List<SyncStat> stats) @@@ -248,24 -170,46 +236,52 @@@ }, taskExecutor); } - private ListenableFuture<List<SyncTask>> createSyncTasks(ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks, List<InetAddressAndPort> allEndpoints) ++ private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints) + { - ListenableFuture<List<TreeResponse>> validations; ++ Future<List<TreeResponse>> treeResponses; + if (allSnapshotTasks != null) + { + // When all snapshot complete, send validation requests - validations = Futures.transformAsync(allSnapshotTasks, endpoints -> { ++ treeResponses = allSnapshotTasks.flatMap(endpoints -> { + if (parallelismDegree == RepairParallelism.SEQUENTIAL) - return sendSequentialValidationRequest(endpoints); ++ return sendSequentialValidationRequest(allEndpoints); + else - return sendDCAwareValidationRequest(endpoints); ++ return sendDCAwareValidationRequest(allEndpoints); + }, taskExecutor); + } + else + { + // If not sequential, just send validation request to all replica - validations = sendValidationRequest(allEndpoints); ++ treeResponses = paxosRepair.flatMap(input -> sendValidationRequest(allEndpoints)); + } + - return Futures.transform(validations, - session.optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, - taskExecutor); ++ treeResponses = treeResponses.map(a -> { ++ state.phase.validationCompleted(); ++ return a; ++ }); ++ ++ return treeResponses.map(session.optimiseStreams && !session.pullRepair ++ ? this::createOptimisedSyncingSyncTasks ++ : this::createStandardSyncTasks, taskExecutor); + } + private boolean isTransient(InetAddressAndPort ep) { - return session.commonRange.transEndpoints.contains(ep); + return session.state.commonRange.transEndpoints.contains(ep); } - private Future<List<SyncStat>> standardSyncing(List<TreeResponse> trees) - private List<SyncTask> standardSyncing(List<TreeResponse> trees) ++ private List<SyncTask> createStandardSyncTasks(List<TreeResponse> trees) { - state.phase.streamSubmitted(); - List<SyncTask> syncTasks = createStandardSyncTasks(desc, - trees, - FBUtilities.getLocalAddressAndPort(), - this::isTransient, - session.isIncremental, - session.pullRepair, - session.previewKind); - return executeTasks(syncTasks); + return createStandardSyncTasks(desc, + trees, + FBUtilities.getLocalAddressAndPort(), + this::isTransient, + session.isIncremental, + session.pullRepair, + session.previewKind); } ++ @VisibleForTesting static List<SyncTask> createStandardSyncTasks(RepairJobDesc desc, List<TreeResponse> trees, InetAddressAndPort local, @@@ -333,58 -277,35 +349,55 @@@ return syncTasks; } - private Future<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees) - { - state.phase.streamSubmitted(); - List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc, - trees, - FBUtilities.getLocalAddressAndPort(), - this::isTransient, - this::getDC, - session.isIncremental, - session.previewKind); - - return executeTasks(syncTasks); - } - - private List<SyncTask> optimisedSyncing(List<TreeResponse> trees) + @VisibleForTesting + Future<List<SyncStat>> executeTasks(List<SyncTask> tasks) + { + try + { + ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); + syncTasks.addAll(tasks); + + for (SyncTask task : tasks) + { + if (!task.isLocal()) + session.trackSyncCompletion(Pair.create(desc, task.nodePair()), (CompletableRemoteSyncTask) task); + taskExecutor.execute(task); + } + + return FutureCombiner.allOf(tasks); + } + catch (NoSuchRepairSessionException e) + { + return ImmediateFuture.failure(new NoSuchRepairSessionExceptionWrapper(e)); + } + } + + // provided so we can throw NoSuchRepairSessionException from executeTasks without + // having to make it unchecked. Required as this is called as from standardSyncing/ + // optimisedSyncing passed as a Function to transform merkle tree responses and so + // can't throw checked exceptions. These are unwrapped in the onFailure callback of + // that transformation so as to not pollute the checked usage of + // NoSuchRepairSessionException in the rest of the codebase. + private static class NoSuchRepairSessionExceptionWrapper extends RuntimeException + { + private final NoSuchRepairSessionException wrapped; + private NoSuchRepairSessionExceptionWrapper(NoSuchRepairSessionException wrapped) + { + this.wrapped = wrapped; + } + } + ++ private List<SyncTask> createOptimisedSyncingSyncTasks(List<TreeResponse> trees) + { + return createOptimisedSyncingSyncTasks(desc, + trees, + FBUtilities.getLocalAddressAndPort(), + this::isTransient, + this::getDC, + session.isIncremental, + session.previewKind); + } + - @SuppressWarnings("UnstableApiUsage") - @VisibleForTesting - ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> tasks) - { - // this throws if the parent session has failed - ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId); - syncTasks.addAll(tasks); - - for (SyncTask task : tasks) - { - if (!task.isLocal()) - session.trackSyncCompletion(Pair.create(desc, task.nodePair()), (CompletableRemoteSyncTask) task); - taskExecutor.submit(task); - } - - return Futures.allAsList(tasks); - } - static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc, List<TreeResponse> trees, InetAddressAndPort local, diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index 7607b045fa,14fab59741..a56a32a53d --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -93,25 -98,37 +93,27 @@@ public class RepairRunnable implements { private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); + private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(1); + + public final CoordinatorState state; private final StorageService storageService; - private final int cmd; - private final RepairOption options; - private final String keyspace; private final String tag; - private final AtomicInteger progressCounter = new AtomicInteger(); - private final int totalProgress; - - private final long creationTimeMillis = System.currentTimeMillis(); - private final UUID parentSession = UUIDGen.getTimeUUID(); private final List<ProgressListener> listeners = new ArrayList<>(); - - private static final AtomicInteger threadCounter = new AtomicInteger(1); private final AtomicReference<Throwable> firstError = new AtomicReference<>(null); - private final Scheduler validationScheduler; ++ final Scheduler validationScheduler; private TraceState traceState; public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) { + this.state = new CoordinatorState(cmd, keyspace, options); this.storageService = storageService; - this.cmd = cmd; - this.options = options; - this.keyspace = keyspace; + this.validationScheduler = Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests()); this.tag = "repair:" + cmd; - // get valid column families, calculate neighbors, validation, prepare for repair + number of ranges to repair - this.totalProgress = 4 + options.getRanges().size(); + ActiveRepairService.instance.register(state); } @Override @@@ -415,27 -418,133 +417,27 @@@ } else { - normalRepair(parentSession, - creationTimeMillis, - traceState, - neighborsAndRanges.filterCommonRanges(keyspace, cfnames), - neighborsAndRanges.participants, - cfnames); + task = new NormalRepairTask(state.options, state.keyspace, this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), cfnames); } - } - - private void normalRepair(UUID parentSession, - long startTime, - TraceState traceState, - List<CommonRange> commonRanges, - Set<InetAddressAndPort> preparedEndpoints, - String... cfnames) - { - - // Set up RepairJob executor for this repair command. - ListeningExecutorService executor = createExecutor(); - // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the repairedAt times to be preserved across streamed sstables - final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, validationScheduler, commonRanges, cfnames); - - // After all repair sessions completes(successful or not), - // run anticompaction if necessary and send finish notice back to client - final Collection<Range<Token>> successfulRanges = new ArrayList<>(); - final AtomicBoolean hasFailure = new AtomicBoolean(); - ListenableFuture repairResult = Futures.transformAsync(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>() - { - @SuppressWarnings("unchecked") - public ListenableFuture apply(List<RepairSessionResult> results) + ExecutorPlus executor = createExecutor(); + state.phase.repairSubmitted(); - Future<CoordinatedRepairResult> f = task.perform(executor); ++ Future<CoordinatedRepairResult> f = task.perform(executor, validationScheduler); + f.addCallback((result, failure) -> { + state.phase.repairCompleted(); + try { - logger.debug("Repair result: {}", results); - // filter out null(=failed) results and get successful ranges - for (RepairSessionResult sessionResult : results) + if (failure != null) { - if (sessionResult != null) - { - // don't record successful repair if we had to skip ranges - if (!sessionResult.skippedReplicas) - { - successfulRanges.addAll(sessionResult.ranges); - } - } - else - { - hasFailure.compareAndSet(false, true); - } + notifyError(failure); + fail(failure.getMessage()); } - return Futures.immediateFuture(null); - } - }, MoreExecutors.directExecutor()); - Futures.addCallback(repairResult, - new RepairCompleteCallback(parentSession, - successfulRanges, - preparedEndpoints, - startTime, - traceState, - hasFailure, - executor), - MoreExecutors.directExecutor()); - } - - private void incrementalRepair(UUID parentSession, - long startTime, - TraceState traceState, - NeighborsAndRanges neighborsAndRanges, - Set<InetAddressAndPort> preparedEndpoints, - String... cfnames) - { - // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted - Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder() - .addAll(neighborsAndRanges.participants) - .add(FBUtilities.getBroadcastAddressAndPort()) - .build(); - // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints. - List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); - - CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants); - ListeningExecutorService executor = createExecutor(); - AtomicBoolean hasFailure = new AtomicBoolean(false); - ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, validationScheduler, allRanges, cfnames), - hasFailure); - Collection<Range<Token>> ranges = new HashSet<>(); - for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges)) - { - ranges.addAll(range); - } - Futures.addCallback(repairResult, - new RepairCompleteCallback(parentSession, ranges, preparedEndpoints, startTime, traceState, hasFailure, executor), - MoreExecutors.directExecutor()); - } - - private void previewRepair(UUID parentSession, - long startTime, - List<CommonRange> commonRanges, - Set<InetAddressAndPort> preparedEndpoints, - String... cfnames) - { - - logger.debug("Starting preview repair for {}", parentSession); - // Set up RepairJob executor for this repair command. - ListeningExecutorService executor = createExecutor(); - - final ListenableFuture<List<RepairSessionResult>> allSessions = submitRepairSessions(parentSession, false, executor, validationScheduler, commonRanges, cfnames); - - Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() - { - public void onSuccess(List<RepairSessionResult> results) - { - try + else { - if (results == null || results.stream().anyMatch(s -> s == null)) + maybeStoreParentRepairSuccess(result.successfulRanges); + if (result.hasFailed()) { - // something failed fail(null); - return; - } - PreviewKind previewKind = options.getPreviewKind(); - Preconditions.checkState(previewKind != PreviewKind.NONE, "Preview is NONE"); - SyncStatSummary summary = new SyncStatSummary(true); - summary.consumeSessionResults(results); - - final String message; - if (summary.isEmpty()) - { - message = previewKind == PreviewKind.REPAIRED ? "Repaired data is in sync" : "Previewed data was in sync"; } else { diff --cc src/java/org/apache/cassandra/repair/RepairSession.java index 6fb455b47a,1036f00f74..5e86a3922b --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@@ -125,8 -111,9 +125,9 @@@ public class RepairSession extends Asyn private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor - public final ListeningExecutorService taskExecutor; + public final ExecutorPlus taskExecutor; public final boolean optimiseStreams; + public final Scheduler validationScheduler; private volatile boolean terminated = false; @@@ -137,11 -125,11 +138,12 @@@ * @param keyspace name of keyspace * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) + * @param repairPaxos true if incomplete paxos operations should be completed as part of repair + * @param paxosOnly true if we should only complete paxos operations, not run a normal repair * @param cfnames names of columnfamilies */ - public RepairSession(UUID parentRepairSession, - UUID id, + public RepairSession(TimeUUID parentRepairSession, + Scheduler validationScheduler, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, @@@ -149,15 -137,17 +151,16 @@@ boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, + boolean repairPaxos, + boolean paxosOnly, String... cfnames) { - assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; - + this.validationScheduler = validationScheduler; - this.parentRepairSession = parentRepairSession; - this.id = id; + this.repairPaxos = repairPaxos; + this.paxosOnly = paxosOnly; + assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; + this.state = new SessionState(parentRepairSession, keyspace, cfnames, commonRange); this.parallelismDegree = parallelismDegree; - this.keyspace = keyspace; - this.cfnames = cfnames; - this.commonRange = commonRange; this.isIncremental = isIncremental; this.previewKind = previewKind; this.pullRepair = pullRepair; diff --cc src/java/org/apache/cassandra/repair/RepairTask.java index dc71d6e1b5,0000000000..8e3c0bfb64 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/RepairTask.java +++ b/src/java/org/apache/cassandra/repair/RepairTask.java @@@ -1,48 -1,0 +1,48 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; + +public interface RepairTask +{ + String name(); + + default String successMessage() + { + return name() + " completed successfully"; + } + - Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) throws Exception; ++ Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) throws Exception; + - default Future<CoordinatedRepairResult> perform(ExecutorPlus executor) ++ default Future<CoordinatedRepairResult> perform(ExecutorPlus executor, Scheduler validationScheduler) + { + try + { - return performUnsafe(executor); ++ return performUnsafe(executor, validationScheduler); + } + catch (Exception | Error e) + { + JVMStabilityInspector.inspectThrowable(e); + return ImmediateFuture.failure(e); + } + } +} diff --cc src/java/org/apache/cassandra/repair/Scheduler.java index 0000000000,c15113ffe1..cc08a0666c mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/repair/Scheduler.java +++ b/src/java/org/apache/cassandra/repair/Scheduler.java @@@ -1,0 -1,142 +1,118 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.repair; + + import java.util.LinkedList; + import java.util.Queue; + import java.util.concurrent.Executor; + import java.util.function.Supplier; + import javax.annotation.concurrent.GuardedBy; + -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; - + import org.apache.cassandra.utils.Pair; ++import org.apache.cassandra.utils.concurrent.AsyncFuture; ++import org.apache.cassandra.utils.concurrent.Future; + + /** + * Task scheduler that limits the number of concurrent tasks across multiple executors. + */ + public interface Scheduler + { - default <T> ListenableFuture<T> schedule(Supplier<ListenableFuture<T>> task, Executor executor) ++ default <T> Future<T> schedule(Supplier<Future<T>> task, Executor executor) + { - return schedule(new Task<>(task, executor), executor); ++ return schedule(new Task<>(task), executor); + } + + <T> Task<T> schedule(Task<T> task, Executor executor); + + static Scheduler build(int concurrentValidations) + { + return concurrentValidations <= 0 + ? new NoopScheduler() + : new LimitedConcurrentScheduler(concurrentValidations); + } + + final class NoopScheduler implements Scheduler + { + @Override + public <T> Task<T> schedule(Task<T> task, Executor executor) + { + executor.execute(task); + return task; + } + } + + final class LimitedConcurrentScheduler implements Scheduler + { + private final int concurrentValidations; + @GuardedBy("this") + private int inflight = 0; + @GuardedBy("this") + private final Queue<Pair<Task<?>, Executor>> tasks = new LinkedList<>(); + + LimitedConcurrentScheduler(int concurrentValidations) + { + this.concurrentValidations = concurrentValidations; + } + + @Override + public synchronized <T> Task<T> schedule(Task<T> task, Executor executor) + { + tasks.offer(Pair.create(task, executor)); + maybeSchedule(); + return task; + } + + private synchronized void onDone() + { + inflight--; + maybeSchedule(); + } + + private void maybeSchedule() + { + if (inflight == concurrentValidations || tasks.isEmpty()) + return; + inflight++; + Pair<Task<?>, Executor> pair = tasks.poll(); - Futures.addCallback(pair.left, new FutureCallback<Object>() { - @Override - public void onSuccess(Object result) - { - onDone(); - } - - @Override - public void onFailure(Throwable t) - { - onDone(); - } - }, pair.right); ++ pair.left.addCallback((s, f) -> onDone()); + pair.right.execute(pair.left); + } + } + - class Task<T> extends AbstractFuture<T> implements Runnable ++ class Task<T> extends AsyncFuture<T> implements Runnable + { - private final Supplier<ListenableFuture<T>> supplier; - private final Executor executor; ++ private final Supplier<Future<T>> supplier; + - public Task(Supplier<ListenableFuture<T>> supplier, Executor executor) ++ public Task(Supplier<Future<T>> supplier) + { + this.supplier = supplier; - this.executor = executor; + } + + @Override + public void run() + { - Futures.addCallback(supplier.get(), new FutureCallback<T>() { - @Override - public void onSuccess(T result) - { - set(result); - } - - @Override - public void onFailure(Throwable t) - { - setException(t); - } - }, executor); ++ supplier.get().addCallback((s, f) -> { ++ if (f != null) ++ tryFailure(f); ++ else ++ trySuccess(s); ++ }); + } + } + } diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index a0716b17df,99c93a1d7b..eed52b78f8 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -38,27 -32,15 +38,28 @@@ import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.AbstractFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.db.compaction.CompactionManager; ++import org.apache.cassandra.repair.Scheduler; +import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.EndpointsByRange; import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.utils.ExecutorUtils; +import org.apache.cassandra.repair.state.CoordinatorState; +import org.apache.cassandra.repair.state.ParticipateState; +import org.apache.cassandra.repair.state.ValidationState; +import org.apache.cassandra.utils.Simulate; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -282,30 -248,18 +283,43 @@@ public class ActiveRepairService implem consistent.local.cancelSession(sessionID, force); } - @Override + @Deprecated public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes) { - DatabaseDescriptor.setRepairSessionSpaceInMegabytes(sizeInMegabytes); + DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMegabytes); } - @Override + @Deprecated public int getRepairSessionSpaceInMegabytes() { - return DatabaseDescriptor.getRepairSessionSpaceInMegabytes(); + return DatabaseDescriptor.getRepairSessionSpaceInMiB(); + } + + @Override + public void setRepairSessionSpaceInMebibytes(int sizeInMebibytes) + { + DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMebibytes); + } + + @Override + public int getRepairSessionSpaceInMebibytes() + { + return DatabaseDescriptor.getRepairSessionSpaceInMiB(); + } + ++ @Override ++ public int getConcurrentMerkleTreeRequests() ++ { ++ return DatabaseDescriptor.getConcurrentMerkleTreeRequests(); ++ } ++ ++ @Override ++ public void setConcurrentMerkleTreeRequests(int value) ++ { ++ logger.info("Setting concurrent_merkle_tree_requests to {}", value); ++ DatabaseDescriptor.setConcurrentMerkleTreeRequests(value); + } + public List<CompositeData> getRepairStats(List<String> schemaArgs, String rangeString) { List<CompositeData> stats = new ArrayList<>(); @@@ -384,24 -338,20 +398,25 @@@ boolean pullRepair, PreviewKind previewKind, boolean optimiseStreams, - ListeningExecutorService executor, + boolean repairPaxos, + boolean paxosOnly, + ExecutorPlus executor, + Scheduler validationScheduler, String... cfnames) { + if (repairPaxos && previewKind != PreviewKind.NONE) + throw new IllegalArgumentException("cannot repair paxos in a preview repair"); + if (range.endpoints.isEmpty()) return null; if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, range, keyspace, - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), - validationScheduler, range, keyspace, ++ final RepairSession session = new RepairSession(parentRepairSession, validationScheduler, range, keyspace, parallelismDegree, isIncremental, pullRepair, - previewKind, optimiseStreams, cfnames); + previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames); + repairs.getIfPresent(parentRepairSession).register(session.state); sessions.put(session.getId(), session); // register listeners diff --cc src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java index 009ad562af,b68cb6f507..532b17c6f5 --- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java @@@ -29,14 -29,9 +29,18 @@@ public interface ActiveRepairServiceMBe public List<Map<String, String>> getSessions(boolean all, String rangesStr); public void failSession(String session, boolean force); + @Deprecated public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes); + @Deprecated public int getRepairSessionSpaceInMegabytes(); + public void setRepairSessionSpaceInMebibytes(int sizeInMebibytes); + public int getRepairSessionSpaceInMebibytes(); + ++ int getConcurrentMerkleTreeRequests(); ++ ++ void setConcurrentMerkleTreeRequests(int value); ++ public boolean getUseOffheapMerkleTrees(); public void setUseOffheapMerkleTrees(boolean value); diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java index c7c68c4689,8d3b6a7099..b164b4f2e8 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@@ -122,22 -106,19 +122,22 @@@ public class RepairJobTes { private final List<Callable<?>> syncCompleteCallbacks = new ArrayList<>(); - public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace, + public MeasureableRepairSession(TimeUUID parentRepairSession, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, - PreviewKind previewKind, boolean optimiseStreams, String... cfnames) + PreviewKind previewKind, boolean optimiseStreams, boolean repairPaxos, boolean paxosOnly, + String... cfnames) { - super(parentRepairSession, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, - previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames); - super(parentRepairSession, id, Scheduler.build(0), commonRange, keyspace, parallelismDegree, isIncremental, - pullRepair, previewKind, optimiseStreams, cfnames); ++ super(parentRepairSession, new Scheduler.NoopScheduler(), commonRange, keyspace, parallelismDegree, ++ isIncremental, pullRepair, previewKind, optimiseStreams, repairPaxos, paxosOnly, cfnames); } - protected DebuggableThreadPoolExecutor createExecutor() + protected ExecutorPlus createExecutor() { - DebuggableThreadPoolExecutor executor = super.createExecutor(); - executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - return executor; + return ExecutorFactory.Global.executorFactory() + .configurePooled("RepairJobTask", Integer.MAX_VALUE) + .withDefaultThreadGroup() + .withKeepAlive(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS) + .build(); } @Override @@@ -314,34 -291,6 +314,34 @@@ .containsOnly(Verb.SYNC_REQ); } + @Test + public void testValidationFailure() throws InterruptedException, TimeoutException + { + Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>(); + mockTrees.put(addr1, createInitialTree(false)); + mockTrees.put(addr2, createInitialTree(false)); + mockTrees.put(addr3, null); + + interceptRepairMessages(mockTrees, new ArrayList<>()); + + try + { + job.run(); + job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS); + fail("The repair job should have failed on a simulated validation error."); + } + catch (ExecutionException e) + { - Assertions.assertThat(e.getCause()).isInstanceOf(RepairException.class); ++ Assertions.assertThat(e).hasRootCauseInstanceOf(RepairException.class); + } + + // When the job fails, all three outstanding validation tasks should be aborted. + List<ValidationTask> tasks = job.validationTasks; + assertEquals(3, tasks.size()); + assertFalse(tasks.stream().anyMatch(ValidationTask::isActive)); + assertFalse(tasks.stream().allMatch(ValidationTask::isDone)); + } + @Test public void testCreateStandardSyncTasks() { diff --cc test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 4db6efb680,77b7102117..88eff6ca0a --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@@ -63,7 -63,7 +63,7 @@@ public class RepairSessionTes IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, - RepairSession session = new RepairSession(parentSessionId, sessionId, Scheduler.build(0), ++ RepairSession session = new RepairSession(parentSessionId, new Scheduler.NoopScheduler(), new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)), "Keyspace1", RepairParallelism.SEQUENTIAL, false, false, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
