This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 89a8155916ff7f94db9436de3f096aa22e047e35
Merge: d9dbd5e1d0 505f5af645
Author: Andrés de la Peña <[email protected]>
AuthorDate: Thu Feb 8 11:11:07 2024 +0000

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |  36 +++++-
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../cassandra/repair/AbstractRepairTask.java       |   6 +-
 .../cassandra/repair/IncrementalRepairTask.java    |   4 +-
 .../apache/cassandra/repair/NormalRepairTask.java  |   4 +-
 .../apache/cassandra/repair/PreviewRepairTask.java |   4 +-
 .../org/apache/cassandra/repair/RepairJob.java     | 106 ++++++++--------
 .../apache/cassandra/repair/RepairRunnable.java    |   4 +-
 .../org/apache/cassandra/repair/RepairSession.java |   3 +
 .../org/apache/cassandra/repair/RepairTask.java    |   6 +-
 .../org/apache/cassandra/repair/Scheduler.java     | 118 ++++++++++++++++++
 .../cassandra/service/ActiveRepairService.java     |  17 ++-
 .../service/ActiveRepairServiceMBean.java          |   4 +
 .../test/OptimiseStreamsRepairTest.java            |  18 +--
 .../repair/ConcurrentValidationRequestsTest.java   | 136 +++++++++++++++++++++
 .../org/apache/cassandra/repair/RepairJobTest.java |   6 +-
 .../apache/cassandra/repair/RepairSessionTest.java |   2 +-
 19 files changed, 403 insertions(+), 84 deletions(-)

diff --cc CHANGES.txt
index 4b8393ebc2,baeb9b7f61..44ca779885
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,11 +1,14 @@@
 -4.0.13
 +4.1.4
 + * Memoize Cassandra verion and add a backoff interval for failed schema 
pulls (CASSANDRA-18902)
 + * Fix StackOverflowError on ALTER after many previous schema changes 
(CASSANDRA-19166)
 + * Fixed the inconsistency between distributedKeyspaces and 
distributedAndLocalKeyspaces (CASSANDRA-18747)
 + * Internode legacy SSL storage port certificate is not hot reloaded on 
update (CASSANDRA-18681)
 + * Nodetool paxos-only repair is no longer incremental (CASSANDRA-18466)
 + * Waiting indefinitely on ReceivedMessage response in 
StreamSession#receive() can cause deadlock (CASSANDRA-18733)
 + * Allow empty keystore_password in encryption_options (CASSANDRA-18778)
 + * Skip ColumnFamilyStore#topPartitions initialization when client or tool 
mode (CASSANDRA-18697)
 +Merged from 4.0:
+  * Add new concurrent_merkle_tree_requests config property to prevent OOM 
during multi-range and/or multi-table repairs (CASSANDRA-19336)
 -Merged from 3.11:
 -Merged from 3.0:
 - * Backport CASSANDRA-16418 to 3.x (CASSANDRA-18824)
 -
 -
 -4.0.12
   * Skip version check if an endpoint is dead state in 
Gossiper#upgradeFromVersionSupplier (CASSANDRA-19187)
   * Fix Gossiper::hasMajorVersion3Nodes to return false during minor upgrade 
(CASSANDRA-18999)
   * Revert unnecessary read lock acquisition when reading ring version in 
TokenMetadata introduced in CASSANDRA-16286 (CASSANDRA-19107)
diff --cc conf/cassandra.yaml
index 4b2711cfb7,7f162749d2..1986d6fa29
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -652,9 -565,29 +655,30 @@@ memtable_allocation_type: heap_buffer
  #
  # For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
  #
 -# repair_session_space_in_mb:
 +# Min unit: MiB
 +# repair_session_space:
  
+ # The number of simultaneous Merkle tree requests during repairs that can
+ # be performed by a repair command. The size of each validation request is
+ # limited by the repair_session_space property, so setting this to 1 will make
+ # sure that a repair command doesn't exceed that limit, even if the repair
+ # command is repairing multiple tables or multiple virtual nodes.
+ #
+ # There isn't a limit by default for backwards compatibility, but this can
+ # produce OOM for  commands repairing multiple tables or multiple virtual 
nodes.
+ # A limit of just 1 simultaneous Merkle tree request is generally recommended
+ # with no virtual nodes so repair_session_space, and thereof the Merkle tree
+ # resolution, can be high. For virtual nodes a value of 1 with the default
+ # repair_session_space value will produce higher resolution Merkle trees
+ # at the expense of speed. Alternatively, when working with virtual nodes it
+ # can make sense to reduce the repair_session_space and increase the value of
+ # concurrent_merkle_tree_requests because each range will contain fewer data.
+ #
+ # For more details see https://issues.apache.org/jira/browse/CASSANDRA-19336.
+ #
+ # A zero value means no limit.
+ # concurrent_merkle_tree_requests: 0
+ 
  # Total space to use for commit logs on disk.
  #
  # If space gets above this value, Cassandra will flush every dirty CF
diff --cc src/java/org/apache/cassandra/config/Config.java
index 8a59ca2cda,d7517124df..298903acf3
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -192,9 -132,12 +192,11 @@@ public class Confi
      // Limit the maximum depth of repair session merkle trees
      @Deprecated
      public volatile Integer repair_session_max_tree_depth = null;
 -    public volatile Integer repair_session_space_in_mb = null;
 -
 -    public volatile long repair_request_timeout_in_ms = 
TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES);
 +    @Replaces(oldName = "repair_session_space_in_mb", converter = 
Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
 +    public volatile DataStorageSpec.IntMebibytesBound repair_session_space = 
null;
  
+     public volatile int concurrent_merkle_tree_requests = 0;
+ 
      public volatile boolean use_offheap_merkle_trees = true;
  
      public int storage_port = 7000;
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a04e85c1bc,377f67117d..0805b1f9ec
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -3400,34 -2863,33 +3400,44 @@@ public class DatabaseDescripto
          conf.repair_session_max_tree_depth = depth;
      }
  
 -    public static int getRepairSessionSpaceInMegabytes()
 +    public static int getRepairSessionSpaceInMiB()
      {
 -        return conf.repair_session_space_in_mb;
 +        return conf.repair_session_space.toMebibytes();
      }
  
 -    public static void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
 +    public static void setRepairSessionSpaceInMiB(int sizeInMiB)
      {
 -        if (sizeInMegabytes < 1)
 -            throw new ConfigurationException("Cannot set 
repair_session_space_in_mb to " + sizeInMegabytes +
 -                                             " < 1 megabyte");
 -        else if (sizeInMegabytes > (int) (Runtime.getRuntime().maxMemory() / 
(4 * 1048576)))
 -            logger.warn("A repair_session_space_in_mb of " + 
conf.repair_session_space_in_mb +
 -                        " megabytes is likely to cause heap pressure.");
 +        if (sizeInMiB < 1)
 +            throw new ConfigurationException("Cannot set repair_session_space 
to " + sizeInMiB +
 +                                             " < 1 mebibyte");
 +        else if (sizeInMiB > (int) (Runtime.getRuntime().maxMemory() / (4 * 
1048576)))
 +            logger.warn("A repair_session_space of " + 
conf.repair_session_space +
 +                        " is likely to cause heap pressure.");
  
 -        conf.repair_session_space_in_mb = sizeInMegabytes;
 +        conf.repair_session_space = new 
DataStorageSpec.IntMebibytesBound(sizeInMiB);
      }
  
+     public static int getConcurrentMerkleTreeRequests()
+     {
+         return conf.concurrent_merkle_tree_requests;
+     }
+ 
+     public static void setConcurrentMerkleTreeRequests(int value)
+     {
+         conf.concurrent_merkle_tree_requests = value;
+     }
+ 
 +    public static int getPaxosRepairParallelism()
 +    {
 +        return conf.paxos_repair_parallelism;
 +    }
 +
 +    public static void setPaxosRepairParallelism(int v)
 +    {
 +        Preconditions.checkArgument(v > 0);
 +        conf.paxos_repair_parallelism = v;
 +    }
 +
      public static Float getMemtableCleanupThreshold()
      {
          return conf.memtable_cleanup_threshold;
diff --cc src/java/org/apache/cassandra/repair/AbstractRepairTask.java
index d2a6f1a786,0000000000..f2449f703a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/AbstractRepairTask.java
@@@ -1,124 -1,0 +1,128 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Objects;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.FutureCallback;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.FutureCombiner;
 +
 +public abstract class AbstractRepairTask implements RepairTask
 +{
 +    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractRepairTask.class);
 +
 +    protected final RepairOption options;
 +    protected final String keyspace;
 +    protected final RepairNotifier notifier;
 +
 +    protected AbstractRepairTask(RepairOption options, String keyspace, 
RepairNotifier notifier)
 +    {
 +        this.options = Objects.requireNonNull(options);
 +        this.keyspace = Objects.requireNonNull(keyspace);
 +        this.notifier = Objects.requireNonNull(notifier);
 +    }
 +
 +    private List<RepairSession> submitRepairSessions(TimeUUID parentSession,
 +                                                     boolean isIncremental,
 +                                                     ExecutorPlus executor,
++                                                     Scheduler 
validationScheduler,
 +                                                     List<CommonRange> 
commonRanges,
 +                                                     String... cfnames)
 +    {
 +        List<RepairSession> futures = new 
ArrayList<>(options.getRanges().size());
 +
 +        for (CommonRange commonRange : commonRanges)
 +        {
 +            logger.info("Starting RepairSession for {}", commonRange);
 +            RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                                              
       commonRange,
 +                                                                              
       keyspace,
 +                                                                              
       options.getParallelism(),
 +                                                                              
       isIncremental,
 +                                                                              
       options.isPullRepair(),
 +                                                                              
       options.getPreviewKind(),
 +                                                                              
       options.optimiseStreams(),
 +                                                                              
       options.repairPaxos(),
 +                                                                              
       options.paxosOnly(),
 +                                                                              
       executor,
++                                                                              
       validationScheduler,
 +                                                                              
       cfnames);
++
 +            if (session == null)
 +                continue;
 +            session.addCallback(new RepairSessionCallback(session));
 +            futures.add(session);
 +        }
 +        return futures;
 +    }
 +
 +    protected Future<CoordinatedRepairResult> runRepair(TimeUUID 
parentSession,
 +                                                        boolean isIncremental,
 +                                                        ExecutorPlus executor,
++                                                        Scheduler 
validationScheduler,
 +                                                        List<CommonRange> 
commonRanges,
 +                                                        String... cfnames)
 +    {
-         List<RepairSession> allSessions = submitRepairSessions(parentSession, 
isIncremental, executor, commonRanges, cfnames);
++        List<RepairSession> allSessions = submitRepairSessions(parentSession, 
isIncremental, executor, validationScheduler, commonRanges, cfnames);
 +        List<Collection<Range<Token>>> ranges = Lists.transform(allSessions, 
RepairSession::ranges);
 +        Future<List<RepairSessionResult>> f = 
FutureCombiner.successfulOf(allSessions);
 +        return f.map(results -> {
 +            logger.debug("Repair result: {}", results);
 +            return CoordinatedRepairResult.create(ranges, results);
 +        });
 +    }
 +
 +    private class RepairSessionCallback implements 
FutureCallback<RepairSessionResult>
 +    {
 +        private final RepairSession session;
 +
 +        public RepairSessionCallback(RepairSession session)
 +        {
 +            this.session = session;
 +        }
 +
 +        public void onSuccess(RepairSessionResult result)
 +        {
 +            String message = String.format("Repair session %s for range %s 
finished", session.getId(),
 +                                           session.ranges().toString());
 +            notifier.notifyProgress(message);
 +        }
 +
 +        public void onFailure(Throwable t)
 +        {
 +            String message = String.format("Repair session %s for range %s 
failed with error %s",
 +                                           session.getId(), 
session.ranges().toString(), t.getMessage());
 +            notifier.notifyError(new RuntimeException(message, t));
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
index af1a234d34,0000000000..6cfd6ff84b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/IncrementalRepairTask.java
@@@ -1,75 -1,0 +1,75 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.util.List;
 +import java.util.Set;
 +
 +import com.google.common.collect.ImmutableSet;
 +
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.repair.consistent.CoordinatorSession;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.Future;
 +
 +public class IncrementalRepairTask extends AbstractRepairTask
 +{
 +    private final TimeUUID parentSession;
 +    private final RepairRunnable.NeighborsAndRanges neighborsAndRanges;
 +    private final String[] cfnames;
 +
 +    protected IncrementalRepairTask(RepairOption options,
 +                                    String keyspace,
 +                                    RepairNotifier notifier,
 +                                    TimeUUID parentSession,
 +                                    RepairRunnable.NeighborsAndRanges 
neighborsAndRanges,
 +                                    String[] cfnames)
 +    {
 +        super(options, keyspace, notifier);
 +        this.parentSession = parentSession;
 +        this.neighborsAndRanges = neighborsAndRanges;
 +        this.cfnames = cfnames;
 +    }
 +
 +    @Override
 +    public String name()
 +    {
 +        return "Repair";
 +    }
 +
 +    @Override
-     public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor) throws Exception
++    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor, Scheduler validationScheduler) throws Exception
 +    {
 +        // the local node also needs to be included in the set of 
participants, since coordinator sessions aren't persisted
 +        Set<InetAddressAndPort> allParticipants = 
ImmutableSet.<InetAddressAndPort>builder()
 +                                                  
.addAll(neighborsAndRanges.participants)
 +                                                  
.add(FBUtilities.getBroadcastAddressAndPort())
 +                                                  .build();
 +        // Not necessary to include self for filtering. The common ranges 
only contains neighbhor node endpoints.
 +        List<CommonRange> allRanges = 
neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
 +
 +        CoordinatorSession coordinatorSession = 
ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
 allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
 +
-         return coordinatorSession.execute(() -> runRepair(parentSession, 
true, executor, allRanges, cfnames));
++        return coordinatorSession.execute(() -> runRepair(parentSession, 
true, executor, validationScheduler, allRanges, cfnames));
 +
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/NormalRepairTask.java
index 56a03f8816,0000000000..a42b38012e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/NormalRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/NormalRepairTask.java
@@@ -1,57 -1,0 +1,57 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.util.List;
 +
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.Future;
 +
 +public class NormalRepairTask extends AbstractRepairTask
 +{
 +    private final TimeUUID parentSession;
 +    private final List<CommonRange> commonRanges;
 +    private final String[] cfnames;
 +
 +    protected NormalRepairTask(RepairOption options,
 +                               String keyspace,
 +                               RepairNotifier notifier,
 +                               TimeUUID parentSession,
 +                               List<CommonRange> commonRanges,
 +                               String[] cfnames)
 +    {
 +        super(options, keyspace, notifier);
 +        this.parentSession = parentSession;
 +        this.commonRanges = commonRanges;
 +        this.cfnames = cfnames;
 +    }
 +
 +    @Override
 +    public String name()
 +    {
 +        return "Repair";
 +    }
 +
 +    @Override
-     public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor)
++    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor, Scheduler validationScheduler)
 +    {
-         return runRepair(parentSession, false, executor, commonRanges, 
cfnames);
++        return runRepair(parentSession, false, executor, validationScheduler, 
commonRanges, cfnames);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/PreviewRepairTask.java
index 7ce7d1fbba,0000000000..c97632ef8f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairTask.java
@@@ -1,153 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.RepairMetrics;
 +import org.apache.cassandra.repair.consistent.SyncStatSummary;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.DiagnosticSnapshotService;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.Future;
 +
 +public class PreviewRepairTask extends AbstractRepairTask
 +{
 +    private final TimeUUID parentSession;
 +    private final List<CommonRange> commonRanges;
 +    private final String[] cfnames;
 +    private volatile String successMessage = name() + " completed 
successfully";
 +
 +    protected PreviewRepairTask(RepairOption options, String keyspace, 
RepairNotifier notifier, TimeUUID parentSession, List<CommonRange> 
commonRanges, String[] cfnames)
 +    {
 +        super(options, keyspace, notifier);
 +        this.parentSession = parentSession;
 +        this.commonRanges = commonRanges;
 +        this.cfnames = cfnames;
 +    }
 +
 +    @Override
 +    public String name()
 +    {
 +        return "Repair preview";
 +    }
 +
 +    @Override
 +    public String successMessage()
 +    {
 +        return successMessage;
 +    }
 +
 +    @Override
-     public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor)
++    public Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus 
executor, Scheduler validationScheduler)
 +    {
-         Future<CoordinatedRepairResult> f = runRepair(parentSession, false, 
executor, commonRanges, cfnames);
++        Future<CoordinatedRepairResult> f = runRepair(parentSession, false, 
executor, validationScheduler, commonRanges, cfnames);
 +        return f.map(result -> {
 +            if (result.hasFailed())
 +                return result;
 +
 +            PreviewKind previewKind = options.getPreviewKind();
 +            Preconditions.checkState(previewKind != PreviewKind.NONE, 
"Preview is NONE");
 +            SyncStatSummary summary = new SyncStatSummary(true);
 +            summary.consumeSessionResults(result.results);
 +
 +            final String message;
 +            if (summary.isEmpty())
 +            {
 +                message = previewKind == PreviewKind.REPAIRED ? "Repaired 
data is in sync" : "Previewed data was in sync";
 +            }
 +            else
 +            {
 +                message = (previewKind == PreviewKind.REPAIRED ? "Repaired 
data is inconsistent\n" : "Preview complete\n") + summary;
 +                RepairMetrics.previewFailures.inc();
 +                if (previewKind == PreviewKind.REPAIRED)
 +                    maybeSnapshotReplicas(parentSession, keyspace, 
result.results.get()); // we know its present as summary used it
 +            }
 +            successMessage += "; " + message;
 +            notifier.notification(message);
 +
 +            return result;
 +        });
 +    }
 +
 +    private void maybeSnapshotReplicas(TimeUUID parentSession, String 
keyspace, List<RepairSessionResult> results)
 +    {
 +        if (!DatabaseDescriptor.snapshotOnRepairedDataMismatch())
 +            return;
 +
 +        try
 +        {
 +            Set<String> mismatchingTables = new HashSet<>();
 +            Set<InetAddressAndPort> nodes = new HashSet<>();
 +            for (RepairSessionResult sessionResult : results)
 +            {
 +                for (RepairResult repairResult : 
emptyIfNull(sessionResult.repairJobResults))
 +                {
 +                    for (SyncStat stat : emptyIfNull(repairResult.stats))
 +                    {
 +                        if (stat.numberOfDifferences > 0)
 +                            
mismatchingTables.add(repairResult.desc.columnFamily);
 +                        // snapshot all replicas, even if they don't have any 
differences
 +                        nodes.add(stat.nodes.coordinator);
 +                        nodes.add(stat.nodes.peer);
 +                    }
 +                }
 +            }
 +
 +            String snapshotName = 
DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
 +            for (String table : mismatchingTables)
 +            {
 +                // we can just check snapshot existence locally since the 
repair coordinator is always a replica (unlike in the read case)
 +                if 
(!Keyspace.open(keyspace).getColumnFamilyStore(table).snapshotExists(snapshotName))
 +                {
 +                    logger.info("{} Snapshotting {}.{} for preview repair 
mismatch with tag {} on instances {}",
 +                                
options.getPreviewKind().logPrefix(parentSession),
 +                                keyspace, table, snapshotName, nodes);
 +                    
DiagnosticSnapshotService.repairedDataMismatch(Keyspace.open(keyspace).getColumnFamilyStore(table).metadata(),
 nodes);
 +                }
 +                else
 +                {
 +                    logger.info("{} Not snapshotting {}.{} - snapshot {} 
exists",
 +                                
options.getPreviewKind().logPrefix(parentSession),
 +                                keyspace, table, snapshotName);
 +                }
 +            }
 +        }
 +        catch (Exception e)
 +        {
 +            logger.error("{} Failed snapshotting replicas", 
options.getPreviewKind().logPrefix(parentSession), e);
 +        }
 +    }
 +
 +    private static <T> Iterable<T> emptyIfNull(Iterable<T> iter)
 +    {
 +        if (iter == null)
 +            return Collections.emptyList();
 +        return iter;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index aba8bd8c85,95fe84f550..d6f8820e56
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -120,49 -100,13 +120,48 @@@ public class RepairJob extends AsyncFut
          Keyspace ks = Keyspace.open(desc.keyspace);
          ColumnFamilyStore cfs = ks.getColumnFamilyStore(desc.columnFamily);
          cfs.metric.repairsStarted.inc();
 -        List<InetAddressAndPort> allEndpoints = new 
ArrayList<>(session.commonRange.endpoints);
 +        List<InetAddressAndPort> allEndpoints = new 
ArrayList<>(session.state.commonRange.endpoints);
          allEndpoints.add(FBUtilities.getBroadcastAddressAndPort());
  
-         Future<List<TreeResponse>> treeResponses;
 +        Future<Void> paxosRepair;
 +        if (paxosRepairEnabled() && ((useV2() && session.repairPaxos) || 
session.paxosOnly))
 +        {
 +            logger.info("{} {}.{} starting paxos repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
 +            TableMetadata metadata = 
Schema.instance.getTableMetadata(desc.keyspace, desc.columnFamily);
 +            paxosRepair = PaxosCleanup.cleanup(allEndpoints, metadata, 
desc.ranges, session.state.commonRange.hasSkippedReplicas, taskExecutor);
 +        }
 +        else
 +        {
 +            logger.info("{} {}.{} not running paxos repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
 +            paxosRepair = ImmediateFuture.success(null);
 +        }
 +
 +        if (session.paxosOnly)
 +        {
 +            paxosRepair.addCallback(new FutureCallback<Void>()
 +            {
 +                public void onSuccess(Void v)
 +                {
 +                    logger.info("{} {}.{} paxos repair completed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
 +                    trySuccess(new RepairResult(desc, 
Collections.emptyList()));
 +                }
 +
 +                /**
 +                 * Snapshot, validation and sync failures are all handled here
 +                 */
 +                public void onFailure(Throwable t)
 +                {
 +                    logger.warn("{} {}.{} paxos repair failed", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
 +                    tryFailure(t);
 +                }
 +            }, taskExecutor);
 +            return;
 +        }
 +
          // Create a snapshot at all nodes unless we're using pure parallel 
repairs
 -        ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks;
++        final Future<?> allSnapshotTasks;
          if (parallelismDegree != RepairParallelism.PARALLEL)
          {
-             Future<?> allSnapshotTasks;
              if (session.isIncremental)
              {
                  // consistent repair does it's own "snapshotting"
@@@ -171,45 -115,29 +170,34 @@@
              else
              {
                  // Request snapshot to all replica
 -                List<ListenableFuture<InetAddressAndPort>> snapshotTasks = 
new ArrayList<>(allEndpoints.size());
 -                for (InetAddressAndPort endpoint : allEndpoints)
 -                {
 -                    SnapshotTask snapshotTask = new SnapshotTask(desc, 
endpoint);
 -                    snapshotTasks.add(snapshotTask);
 -                    taskExecutor.execute(snapshotTask);
 -                }
 -                allSnapshotTasks = Futures.allAsList(snapshotTasks);
 +                allSnapshotTasks = paxosRepair.flatMap(input -> {
 +                    List<Future<InetAddressAndPort>> snapshotTasks = new 
ArrayList<>(allEndpoints.size());
 +                    state.phase.snapshotsSubmitted();
 +                    for (InetAddressAndPort endpoint : allEndpoints)
 +                    {
 +                        SnapshotTask snapshotTask = new SnapshotTask(desc, 
endpoint);
 +                        snapshotTasks.add(snapshotTask);
 +                        taskExecutor.execute(snapshotTask);
 +                    }
 +                    return FutureCombiner.allOf(snapshotTasks).map(a -> {
 +                        state.phase.snapshotsCompleted();
 +                        return a;
 +                    });
 +                });
              }
- 
-             // When all snapshot complete, send validation requests
-             treeResponses = allSnapshotTasks.flatMap(endpoints -> {
-                 if (parallelismDegree == RepairParallelism.SEQUENTIAL)
-                     return sendSequentialValidationRequest(allEndpoints);
-                 else
-                     return sendDCAwareValidationRequest(allEndpoints);
-                 }, taskExecutor);
          }
          else
          {
-             // If not sequential, just send validation request to all replica
-             treeResponses = paxosRepair.flatMap(input -> 
sendValidationRequest(allEndpoints));
+             allSnapshotTasks = null;
          }
-         treeResponses = treeResponses.map(a -> {
-             state.phase.validationCompleted();
-             return a;
-         });
  
-         // When all validations complete, submit sync tasks
-         Future<List<SyncStat>> syncResults = 
treeResponses.flatMap(session.optimiseStreams && !session.pullRepair ? 
this::optimisedSyncing : this::standardSyncing, taskExecutor);
+         // Run validations and the creation of sync tasks in the scheduler, 
so it can limit the number of Merkle trees
+         // that there are in memory at once. When all validations complete, 
submit sync tasks out of the scheduler.
 -        ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(
 -                session.validationScheduler.schedule(() -> 
createSyncTasks(allSnapshotTasks, allEndpoints), taskExecutor),
 -                this::executeTasks, taskExecutor);
++        Future<List<SyncStat>> syncResults = 
session.validationScheduler.schedule(() -> createSyncTasks(paxosRepair, 
allSnapshotTasks, allEndpoints), taskExecutor)
++                                                                        
.flatMap(this::executeTasks, taskExecutor);
  
          // When all sync complete, set the final result
 -        Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>()
 +        syncResults.addCallback(new FutureCallback<List<SyncStat>>()
          {
              @Override
              public void onSuccess(List<SyncStat> stats)
@@@ -248,24 -170,46 +236,52 @@@
          }, taskExecutor);
      }
  
 -    private ListenableFuture<List<SyncTask>> 
createSyncTasks(ListenableFuture<List<InetAddressAndPort>> allSnapshotTasks, 
List<InetAddressAndPort> allEndpoints)
++    private Future<List<SyncTask>> createSyncTasks(Future<Void> paxosRepair, 
Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
+     {
 -        ListenableFuture<List<TreeResponse>> validations;
++        Future<List<TreeResponse>> treeResponses;
+         if (allSnapshotTasks != null)
+         {
+             // When all snapshot complete, send validation requests
 -            validations = Futures.transformAsync(allSnapshotTasks, endpoints 
-> {
++            treeResponses = allSnapshotTasks.flatMap(endpoints -> {
+                 if (parallelismDegree == RepairParallelism.SEQUENTIAL)
 -                    return sendSequentialValidationRequest(endpoints);
++                    return sendSequentialValidationRequest(allEndpoints);
+                 else
 -                    return sendDCAwareValidationRequest(endpoints);
++                    return sendDCAwareValidationRequest(allEndpoints);
+             }, taskExecutor);
+         }
+         else
+         {
+             // If not sequential, just send validation request to all replica
 -            validations = sendValidationRequest(allEndpoints);
++            treeResponses = paxosRepair.flatMap(input -> 
sendValidationRequest(allEndpoints));
+         }
+ 
 -        return Futures.transform(validations,
 -                                 session.optimiseStreams && 
!session.pullRepair ? this::optimisedSyncing : this::standardSyncing,
 -                                 taskExecutor);
++        treeResponses = treeResponses.map(a -> {
++            state.phase.validationCompleted();
++            return a;
++        });
++
++        return treeResponses.map(session.optimiseStreams && 
!session.pullRepair
++                                 ? this::createOptimisedSyncingSyncTasks
++                                 : this::createStandardSyncTasks, 
taskExecutor);
+     }
+ 
      private boolean isTransient(InetAddressAndPort ep)
      {
 -        return session.commonRange.transEndpoints.contains(ep);
 +        return session.state.commonRange.transEndpoints.contains(ep);
      }
  
-     private Future<List<SyncStat>> standardSyncing(List<TreeResponse> trees)
 -    private List<SyncTask> standardSyncing(List<TreeResponse> trees)
++    private List<SyncTask> createStandardSyncTasks(List<TreeResponse> trees)
      {
-         state.phase.streamSubmitted();
-         List<SyncTask> syncTasks = createStandardSyncTasks(desc,
-                                                            trees,
-                                                            
FBUtilities.getLocalAddressAndPort(),
-                                                            this::isTransient,
-                                                            
session.isIncremental,
-                                                            session.pullRepair,
-                                                            
session.previewKind);
-         return executeTasks(syncTasks);
+         return createStandardSyncTasks(desc,
+                                        trees,
+                                        FBUtilities.getLocalAddressAndPort(),
+                                        this::isTransient,
+                                        session.isIncremental,
+                                        session.pullRepair,
+                                        session.previewKind);
      }
  
++    @VisibleForTesting
      static List<SyncTask> createStandardSyncTasks(RepairJobDesc desc,
                                                    List<TreeResponse> trees,
                                                    InetAddressAndPort local,
@@@ -333,58 -277,35 +349,55 @@@
          return syncTasks;
      }
  
-     private Future<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees)
-     {
-         state.phase.streamSubmitted();
-         List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc,
-                                                                    trees,
-                                                                    
FBUtilities.getLocalAddressAndPort(),
-                                                                    
this::isTransient,
-                                                                    
this::getDC,
-                                                                    
session.isIncremental,
-                                                                    
session.previewKind);
- 
-         return executeTasks(syncTasks);
-     }
- 
 -    private List<SyncTask> optimisedSyncing(List<TreeResponse> trees)
 +    @VisibleForTesting
 +    Future<List<SyncStat>> executeTasks(List<SyncTask> tasks)
 +    {
 +        try
 +        {
 +            
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
 +            syncTasks.addAll(tasks);
 +
 +            for (SyncTask task : tasks)
 +            {
 +                if (!task.isLocal())
 +                    session.trackSyncCompletion(Pair.create(desc, 
task.nodePair()), (CompletableRemoteSyncTask) task);
 +                taskExecutor.execute(task);
 +            }
 +
 +            return FutureCombiner.allOf(tasks);
 +        }
 +        catch (NoSuchRepairSessionException e)
 +        {
 +            return ImmediateFuture.failure(new 
NoSuchRepairSessionExceptionWrapper(e));
 +        }
 +    }
 +
 +    // provided so we can throw NoSuchRepairSessionException from 
executeTasks without
 +    // having to make it unchecked. Required as this is called as from 
standardSyncing/
 +    // optimisedSyncing passed as a Function to transform merkle tree 
responses and so
 +    // can't throw checked exceptions. These are unwrapped in the onFailure 
callback of
 +    // that transformation so as to not pollute the checked usage of
 +    // NoSuchRepairSessionException in the rest of the codebase.
 +    private static class NoSuchRepairSessionExceptionWrapper extends 
RuntimeException
 +    {
 +        private final NoSuchRepairSessionException wrapped;
 +        private 
NoSuchRepairSessionExceptionWrapper(NoSuchRepairSessionException wrapped)
 +        {
 +            this.wrapped = wrapped;
 +        }
 +    }
 +
++    private List<SyncTask> createOptimisedSyncingSyncTasks(List<TreeResponse> 
trees)
+     {
+         return createOptimisedSyncingSyncTasks(desc,
+                                                trees,
+                                                
FBUtilities.getLocalAddressAndPort(),
+                                                this::isTransient,
+                                                this::getDC,
+                                                session.isIncremental,
+                                                session.previewKind);
+     }
+ 
 -    @SuppressWarnings("UnstableApiUsage")
 -    @VisibleForTesting
 -    ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> tasks)
 -    {
 -        // this throws if the parent session has failed
 -        
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
 -        syncTasks.addAll(tasks);
 -
 -        for (SyncTask task : tasks)
 -        {
 -            if (!task.isLocal())
 -                session.trackSyncCompletion(Pair.create(desc, 
task.nodePair()), (CompletableRemoteSyncTask) task);
 -            taskExecutor.submit(task);
 -        }
 -
 -        return Futures.allAsList(tasks);
 -    }
 -
      static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
                                                            List<TreeResponse> 
trees,
                                                            InetAddressAndPort 
local,
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7607b045fa,14fab59741..a56a32a53d
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -93,25 -98,37 +93,27 @@@ public class RepairRunnable implements 
  {
      private static final Logger logger = 
LoggerFactory.getLogger(RepairRunnable.class);
  
 +    private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(1);
 +
 +    public final CoordinatorState state;
      private final StorageService storageService;
 -    private final int cmd;
 -    private final RepairOption options;
 -    private final String keyspace;
  
      private final String tag;
 -    private final AtomicInteger progressCounter = new AtomicInteger();
 -    private final int totalProgress;
 -
 -    private final long creationTimeMillis = System.currentTimeMillis();
 -    private final UUID parentSession = UUIDGen.getTimeUUID();
  
      private final List<ProgressListener> listeners = new ArrayList<>();
 -
 -    private static final AtomicInteger threadCounter = new AtomicInteger(1);
      private final AtomicReference<Throwable> firstError = new 
AtomicReference<>(null);
 -    private final Scheduler validationScheduler;
++    final Scheduler validationScheduler;
  
      private TraceState traceState;
  
      public RepairRunnable(StorageService storageService, int cmd, 
RepairOption options, String keyspace)
      {
 +        this.state = new CoordinatorState(cmd, keyspace, options);
          this.storageService = storageService;
 -        this.cmd = cmd;
 -        this.options = options;
 -        this.keyspace = keyspace;
+         this.validationScheduler = 
Scheduler.build(DatabaseDescriptor.getConcurrentMerkleTreeRequests());
  
          this.tag = "repair:" + cmd;
 -        // get valid column families, calculate neighbors, validation, 
prepare for repair + number of ranges to repair
 -        this.totalProgress = 4 + options.getRanges().size();
 +        ActiveRepairService.instance.register(state);
      }
  
      @Override
@@@ -415,27 -418,133 +417,27 @@@
          }
          else
          {
 -            normalRepair(parentSession,
 -                         creationTimeMillis,
 -                         traceState,
 -                         neighborsAndRanges.filterCommonRanges(keyspace, 
cfnames),
 -                         neighborsAndRanges.participants,
 -                         cfnames);
 +            task = new NormalRepairTask(state.options, state.keyspace, this, 
state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), 
cfnames);
          }
 -    }
 -
 -    private void normalRepair(UUID parentSession,
 -                              long startTime,
 -                              TraceState traceState,
 -                              List<CommonRange> commonRanges,
 -                              Set<InetAddressAndPort> preparedEndpoints,
 -                              String... cfnames)
 -    {
 -
 -        // Set up RepairJob executor for this repair command.
 -        ListeningExecutorService executor = createExecutor();
  
 -        // Setting the repairedAt time to UNREPAIRED_SSTABLE causes the 
repairedAt times to be preserved across streamed sstables
 -        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, false, executor, validationScheduler, 
commonRanges, cfnames);
 -
 -        // After all repair sessions completes(successful or not),
 -        // run anticompaction if necessary and send finish notice back to 
client
 -        final Collection<Range<Token>> successfulRanges = new ArrayList<>();
 -        final AtomicBoolean hasFailure = new AtomicBoolean();
 -        ListenableFuture repairResult = Futures.transformAsync(allSessions, 
new AsyncFunction<List<RepairSessionResult>, Object>()
 -        {
 -            @SuppressWarnings("unchecked")
 -            public ListenableFuture apply(List<RepairSessionResult> results)
 +        ExecutorPlus executor = createExecutor();
 +        state.phase.repairSubmitted();
-         Future<CoordinatedRepairResult> f = task.perform(executor);
++        Future<CoordinatedRepairResult> f = task.perform(executor, 
validationScheduler);
 +        f.addCallback((result, failure) -> {
 +            state.phase.repairCompleted();
 +            try
              {
 -                logger.debug("Repair result: {}", results);
 -                // filter out null(=failed) results and get successful ranges
 -                for (RepairSessionResult sessionResult : results)
 +                if (failure != null)
                  {
 -                    if (sessionResult != null)
 -                    {
 -                        // don't record successful repair if we had to skip 
ranges
 -                        if (!sessionResult.skippedReplicas)
 -                        {
 -                            successfulRanges.addAll(sessionResult.ranges);
 -                        }
 -                    }
 -                    else
 -                    {
 -                        hasFailure.compareAndSet(false, true);
 -                    }
 +                    notifyError(failure);
 +                    fail(failure.getMessage());
                  }
 -                return Futures.immediateFuture(null);
 -            }
 -        }, MoreExecutors.directExecutor());
 -        Futures.addCallback(repairResult,
 -                            new RepairCompleteCallback(parentSession,
 -                                                       successfulRanges,
 -                                                       preparedEndpoints,
 -                                                       startTime,
 -                                                       traceState,
 -                                                       hasFailure,
 -                                                       executor),
 -                            MoreExecutors.directExecutor());
 -    }
 -
 -    private void incrementalRepair(UUID parentSession,
 -                                   long startTime,
 -                                   TraceState traceState,
 -                                   NeighborsAndRanges neighborsAndRanges,
 -                                   Set<InetAddressAndPort> preparedEndpoints,
 -                                   String... cfnames)
 -    {
 -        // the local node also needs to be included in the set of 
participants, since coordinator sessions aren't persisted
 -        Set<InetAddressAndPort> allParticipants = 
ImmutableSet.<InetAddressAndPort>builder()
 -                                                  
.addAll(neighborsAndRanges.participants)
 -                                                  
.add(FBUtilities.getBroadcastAddressAndPort())
 -                                                  .build();
 -        // Not necessary to include self for filtering. The common ranges 
only contains neighbhor node endpoints.
 -        List<CommonRange> allRanges = 
neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
 -
 -        CoordinatorSession coordinatorSession = 
ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
 allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
 -        ListeningExecutorService executor = createExecutor();
 -        AtomicBoolean hasFailure = new AtomicBoolean(false);
 -        ListenableFuture repairResult = coordinatorSession.execute(() -> 
submitRepairSessions(parentSession, true, executor, validationScheduler, 
allRanges, cfnames),
 -                                                                   
hasFailure);
 -        Collection<Range<Token>> ranges = new HashSet<>();
 -        for (Collection<Range<Token>> range : Iterables.transform(allRanges, 
cr -> cr.ranges))
 -        {
 -            ranges.addAll(range);
 -        }
 -        Futures.addCallback(repairResult,
 -                            new RepairCompleteCallback(parentSession, ranges, 
preparedEndpoints, startTime, traceState, hasFailure, executor),
 -                            MoreExecutors.directExecutor());
 -    }
 -
 -    private void previewRepair(UUID parentSession,
 -                               long startTime,
 -                               List<CommonRange> commonRanges,
 -                               Set<InetAddressAndPort> preparedEndpoints,
 -                               String... cfnames)
 -    {
 -
 -        logger.debug("Starting preview repair for {}", parentSession);
 -        // Set up RepairJob executor for this repair command.
 -        ListeningExecutorService executor = createExecutor();
 -
 -        final ListenableFuture<List<RepairSessionResult>> allSessions = 
submitRepairSessions(parentSession, false, executor, validationScheduler, 
commonRanges, cfnames);
 -
 -        Futures.addCallback(allSessions, new 
FutureCallback<List<RepairSessionResult>>()
 -        {
 -            public void onSuccess(List<RepairSessionResult> results)
 -            {
 -                try
 +                else
                  {
 -                    if (results == null || results.stream().anyMatch(s -> s 
== null))
 +                    maybeStoreParentRepairSuccess(result.successfulRanges);
 +                    if (result.hasFailed())
                      {
 -                        // something failed
                          fail(null);
 -                        return;
 -                    }
 -                    PreviewKind previewKind = options.getPreviewKind();
 -                    Preconditions.checkState(previewKind != PreviewKind.NONE, 
"Preview is NONE");
 -                    SyncStatSummary summary = new SyncStatSummary(true);
 -                    summary.consumeSessionResults(results);
 -
 -                    final String message;
 -                    if (summary.isEmpty())
 -                    {
 -                        message = previewKind == PreviewKind.REPAIRED ? 
"Repaired data is in sync" : "Previewed data was in sync";
                      }
                      else
                      {
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index 6fb455b47a,1036f00f74..5e86a3922b
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -125,8 -111,9 +125,9 @@@ public class RepairSession extends Asyn
      private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, 
CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
  
      // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
 -    public final ListeningExecutorService taskExecutor;
 +    public final ExecutorPlus taskExecutor;
      public final boolean optimiseStreams;
+     public final Scheduler validationScheduler;
  
      private volatile boolean terminated = false;
  
@@@ -137,11 -125,11 +138,12 @@@
       * @param keyspace name of keyspace
       * @param parallelismDegree specifies the degree of parallelism when 
calculating the merkle trees
       * @param pullRepair true if the repair should be one way (from remote 
host to this host and only applicable between two hosts--see RepairOption)
 +     * @param repairPaxos true if incomplete paxos operations should be 
completed as part of repair
 +     * @param paxosOnly true if we should only complete paxos operations, not 
run a normal repair
       * @param cfnames names of columnfamilies
       */
 -    public RepairSession(UUID parentRepairSession,
 -                         UUID id,
 +    public RepairSession(TimeUUID parentRepairSession,
+                          Scheduler validationScheduler,
                           CommonRange commonRange,
                           String keyspace,
                           RepairParallelism parallelismDegree,
@@@ -149,15 -137,17 +151,16 @@@
                           boolean pullRepair,
                           PreviewKind previewKind,
                           boolean optimiseStreams,
 +                         boolean repairPaxos,
 +                         boolean paxosOnly,
                           String... cfnames)
      {
 -        assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
 -
+         this.validationScheduler = validationScheduler;
 -        this.parentRepairSession = parentRepairSession;
 -        this.id = id;
 +        this.repairPaxos = repairPaxos;
 +        this.paxosOnly = paxosOnly;
 +        assert cfnames.length > 0 : "Repairing no column families seems 
pointless, doesn't it";
 +        this.state = new SessionState(parentRepairSession, keyspace, cfnames, 
commonRange);
          this.parallelismDegree = parallelismDegree;
 -        this.keyspace = keyspace;
 -        this.cfnames = cfnames;
 -        this.commonRange = commonRange;
          this.isIncremental = isIncremental;
          this.previewKind = previewKind;
          this.pullRepair = pullRepair;
diff --cc src/java/org/apache/cassandra/repair/RepairTask.java
index dc71d6e1b5,0000000000..8e3c0bfb64
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairTask.java
+++ b/src/java/org/apache/cassandra/repair/RepairTask.java
@@@ -1,48 -1,0 +1,48 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.concurrent.Future;
 +import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 +
 +public interface RepairTask
 +{
 +    String name();
 +
 +    default String successMessage()
 +    {
 +        return name() + " completed successfully";
 +    }
 +
-     Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor) 
throws Exception;
++    Future<CoordinatedRepairResult> performUnsafe(ExecutorPlus executor, 
Scheduler validationScheduler) throws Exception;
 +
-     default Future<CoordinatedRepairResult> perform(ExecutorPlus executor)
++    default Future<CoordinatedRepairResult> perform(ExecutorPlus executor, 
Scheduler validationScheduler)
 +    {
 +        try
 +        {
-             return performUnsafe(executor);
++            return performUnsafe(executor, validationScheduler);
 +        }
 +        catch (Exception | Error e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            return ImmediateFuture.failure(e);
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/repair/Scheduler.java
index 0000000000,c15113ffe1..cc08a0666c
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/repair/Scheduler.java
+++ b/src/java/org/apache/cassandra/repair/Scheduler.java
@@@ -1,0 -1,142 +1,118 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.repair;
+ 
+ import java.util.LinkedList;
+ import java.util.Queue;
+ import java.util.concurrent.Executor;
+ import java.util.function.Supplier;
+ import javax.annotation.concurrent.GuardedBy;
+ 
 -import com.google.common.util.concurrent.AbstractFuture;
 -import com.google.common.util.concurrent.FutureCallback;
 -import com.google.common.util.concurrent.Futures;
 -import com.google.common.util.concurrent.ListenableFuture;
 -
+ import org.apache.cassandra.utils.Pair;
++import org.apache.cassandra.utils.concurrent.AsyncFuture;
++import org.apache.cassandra.utils.concurrent.Future;
+ 
+ /**
+  * Task scheduler that limits the number of concurrent tasks across multiple 
executors.
+  */
+ public interface Scheduler
+ {
 -    default <T> ListenableFuture<T> schedule(Supplier<ListenableFuture<T>> 
task, Executor executor)
++    default <T> Future<T> schedule(Supplier<Future<T>> task, Executor 
executor)
+     {
 -        return schedule(new Task<>(task, executor), executor);
++        return schedule(new Task<>(task), executor);
+     }
+ 
+     <T> Task<T> schedule(Task<T> task, Executor executor);
+ 
+     static Scheduler build(int concurrentValidations)
+     {
+         return concurrentValidations <= 0
+                ? new NoopScheduler()
+                : new LimitedConcurrentScheduler(concurrentValidations);
+     }
+ 
+     final class NoopScheduler implements Scheduler
+     {
+         @Override
+         public <T> Task<T> schedule(Task<T> task, Executor executor)
+         {
+             executor.execute(task);
+             return task;
+         }
+     }
+ 
+     final class LimitedConcurrentScheduler implements Scheduler
+     {
+         private final int concurrentValidations;
+         @GuardedBy("this")
+         private int inflight = 0;
+         @GuardedBy("this")
+         private final Queue<Pair<Task<?>, Executor>> tasks = new 
LinkedList<>();
+ 
+         LimitedConcurrentScheduler(int concurrentValidations)
+         {
+             this.concurrentValidations = concurrentValidations;
+         }
+ 
+         @Override
+         public synchronized <T> Task<T> schedule(Task<T> task, Executor 
executor)
+         {
+             tasks.offer(Pair.create(task, executor));
+             maybeSchedule();
+             return task;
+         }
+ 
+         private synchronized void onDone()
+         {
+             inflight--;
+             maybeSchedule();
+         }
+ 
+         private void maybeSchedule()
+         {
+             if (inflight == concurrentValidations || tasks.isEmpty())
+                 return;
+             inflight++;
+             Pair<Task<?>, Executor> pair = tasks.poll();
 -            Futures.addCallback(pair.left, new FutureCallback<Object>() {
 -                @Override
 -                public void onSuccess(Object result)
 -                {
 -                    onDone();
 -                }
 -
 -                @Override
 -                public void onFailure(Throwable t)
 -                {
 -                    onDone();
 -                }
 -            }, pair.right);
++            pair.left.addCallback((s, f) -> onDone());
+             pair.right.execute(pair.left);
+         }
+     }
+ 
 -    class Task<T> extends AbstractFuture<T> implements Runnable
++    class Task<T> extends AsyncFuture<T> implements Runnable
+     {
 -        private final Supplier<ListenableFuture<T>> supplier;
 -        private final Executor executor;
++        private final Supplier<Future<T>> supplier;
+ 
 -        public Task(Supplier<ListenableFuture<T>> supplier, Executor executor)
++        public Task(Supplier<Future<T>> supplier)
+         {
+             this.supplier = supplier;
 -            this.executor = executor;
+         }
+ 
+         @Override
+         public void run()
+         {
 -            Futures.addCallback(supplier.get(), new FutureCallback<T>() {
 -                @Override
 -                public void onSuccess(T result)
 -                {
 -                    set(result);
 -                }
 -
 -                @Override
 -                public void onFailure(Throwable t)
 -                {
 -                    setException(t);
 -                }
 -            }, executor);
++            supplier.get().addCallback((s, f) -> {
++                if (f != null)
++                    tryFailure(f);
++                else
++                    trySuccess(s);
++            });
+         }
+     }
+ }
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index a0716b17df,99c93a1d7b..eed52b78f8
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -38,27 -32,15 +38,28 @@@ import com.google.common.cache.Cache
  import com.google.common.cache.CacheBuilder;
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
  import com.google.common.collect.Multimap;
 -import com.google.common.util.concurrent.AbstractFuture;
 -import com.google.common.util.concurrent.ListeningExecutorService;
 -import com.google.common.util.concurrent.MoreExecutors;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ExecutorPlus;
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DurationSpec;
  import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.repair.Scheduler;
 +import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.locator.EndpointsByRange;
  import org.apache.cassandra.locator.EndpointsForRange;
 +import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.repair.state.CoordinatorState;
 +import org.apache.cassandra.repair.state.ParticipateState;
 +import org.apache.cassandra.repair.state.ValidationState;
 +import org.apache.cassandra.utils.Simulate;
 +import org.apache.cassandra.locator.EndpointsForToken;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.TimeUUID;
 +import org.apache.cassandra.utils.concurrent.CountDownLatch;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -282,30 -248,18 +283,43 @@@ public class ActiveRepairService implem
          consistent.local.cancelSession(sessionID, force);
      }
  
 -    @Override
 +    @Deprecated
      public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
      {
 -        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(sizeInMegabytes);
 +        DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMegabytes);
      }
  
 -    @Override
 +    @Deprecated
      public int getRepairSessionSpaceInMegabytes()
      {
 -        return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
 +        return DatabaseDescriptor.getRepairSessionSpaceInMiB();
 +    }
 +
 +    @Override
 +    public void setRepairSessionSpaceInMebibytes(int sizeInMebibytes)
 +    {
 +        DatabaseDescriptor.setRepairSessionSpaceInMiB(sizeInMebibytes);
 +    }
 +
 +    @Override
 +    public int getRepairSessionSpaceInMebibytes()
 +    {
 +        return DatabaseDescriptor.getRepairSessionSpaceInMiB();
 +    }
 +
++    @Override
++    public int getConcurrentMerkleTreeRequests()
++    {
++        return DatabaseDescriptor.getConcurrentMerkleTreeRequests();
++    }
++
++    @Override
++    public void setConcurrentMerkleTreeRequests(int value)
++    {
++        logger.info("Setting concurrent_merkle_tree_requests to {}", value);
++        DatabaseDescriptor.setConcurrentMerkleTreeRequests(value);
+     }
+ 
      public List<CompositeData> getRepairStats(List<String> schemaArgs, String 
rangeString)
      {
          List<CompositeData> stats = new ArrayList<>();
@@@ -384,24 -338,20 +398,25 @@@
                                               boolean pullRepair,
                                               PreviewKind previewKind,
                                               boolean optimiseStreams,
 -                                             ListeningExecutorService 
executor,
 +                                             boolean repairPaxos,
 +                                             boolean paxosOnly,
 +                                             ExecutorPlus executor,
+                                              Scheduler validationScheduler,
                                               String... cfnames)
      {
 +        if (repairPaxos && previewKind != PreviewKind.NONE)
 +            throw new IllegalArgumentException("cannot repair paxos in a 
preview repair");
 +
          if (range.endpoints.isEmpty())
              return null;
  
          if (cfnames.length == 0)
              return null;
  
-         final RepairSession session = new RepairSession(parentRepairSession, 
range, keyspace,
 -        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(),
 -                                                        validationScheduler, 
range, keyspace,
++        final RepairSession session = new RepairSession(parentRepairSession, 
validationScheduler, range, keyspace,
                                                          parallelismDegree, 
isIncremental, pullRepair,
 -                                                        previewKind, 
optimiseStreams, cfnames);
 +                                                        previewKind, 
optimiseStreams, repairPaxos, paxosOnly, cfnames);
 +        repairs.getIfPresent(parentRepairSession).register(session.state);
  
          sessions.put(session.getId(), session);
          // register listeners
diff --cc src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 009ad562af,b68cb6f507..532b17c6f5
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@@ -29,14 -29,9 +29,18 @@@ public interface ActiveRepairServiceMBe
      public List<Map<String, String>> getSessions(boolean all, String 
rangesStr);
      public void failSession(String session, boolean force);
  
 +    @Deprecated
      public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
 +    @Deprecated
      public int getRepairSessionSpaceInMegabytes();
  
 +    public void setRepairSessionSpaceInMebibytes(int sizeInMebibytes);
 +    public int getRepairSessionSpaceInMebibytes();
 +
++    int getConcurrentMerkleTreeRequests();
++
++    void setConcurrentMerkleTreeRequests(int value);
++
      public boolean getUseOffheapMerkleTrees();
      public void setUseOffheapMerkleTrees(boolean value);
  
diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java
index c7c68c4689,8d3b6a7099..b164b4f2e8
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@@ -122,22 -106,19 +122,22 @@@ public class RepairJobTes
      {
          private final List<Callable<?>> syncCompleteCallbacks = new 
ArrayList<>();
  
 -        public MeasureableRepairSession(UUID parentRepairSession, UUID id, 
CommonRange commonRange, String keyspace,
 +        public MeasureableRepairSession(TimeUUID parentRepairSession, 
CommonRange commonRange, String keyspace,
                                          RepairParallelism parallelismDegree, 
boolean isIncremental, boolean pullRepair,
 -                                        PreviewKind previewKind, boolean 
optimiseStreams, String... cfnames)
 +                                        PreviewKind previewKind, boolean 
optimiseStreams, boolean repairPaxos, boolean paxosOnly,
 +                                        String... cfnames)
          {
-             super(parentRepairSession, commonRange, keyspace, 
parallelismDegree, isIncremental, pullRepair,
-                   previewKind, optimiseStreams, repairPaxos, paxosOnly, 
cfnames);
 -            super(parentRepairSession, id, Scheduler.build(0), commonRange, 
keyspace, parallelismDegree, isIncremental,
 -                  pullRepair, previewKind, optimiseStreams, cfnames);
++            super(parentRepairSession, new Scheduler.NoopScheduler(), 
commonRange, keyspace, parallelismDegree,
++                  isIncremental, pullRepair, previewKind, optimiseStreams, 
repairPaxos, paxosOnly, cfnames);
          }
  
 -        protected DebuggableThreadPoolExecutor createExecutor()
 +        protected ExecutorPlus createExecutor()
          {
 -            DebuggableThreadPoolExecutor executor = super.createExecutor();
 -            executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
 -            return executor;
 +            return ExecutorFactory.Global.executorFactory()
 +                    .configurePooled("RepairJobTask", Integer.MAX_VALUE)
 +                    .withDefaultThreadGroup()
 +                    .withKeepAlive(THREAD_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)
 +                    .build();
          }
  
          @Override
@@@ -314,34 -291,6 +314,34 @@@
              .containsOnly(Verb.SYNC_REQ);
      }
  
 +    @Test
 +    public void testValidationFailure() throws InterruptedException, 
TimeoutException
 +    {
 +        Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>();
 +        mockTrees.put(addr1, createInitialTree(false));
 +        mockTrees.put(addr2, createInitialTree(false));
 +        mockTrees.put(addr3, null);
 +
 +        interceptRepairMessages(mockTrees, new ArrayList<>());
 +
 +        try 
 +        {
 +            job.run();
 +            job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
 +            fail("The repair job should have failed on a simulated validation 
error.");
 +        }
 +        catch (ExecutionException e)
 +        {
-             
Assertions.assertThat(e.getCause()).isInstanceOf(RepairException.class);
++            
Assertions.assertThat(e).hasRootCauseInstanceOf(RepairException.class);
 +        }
 +
 +        // When the job fails, all three outstanding validation tasks should 
be aborted.
 +        List<ValidationTask> tasks = job.validationTasks;
 +        assertEquals(3, tasks.size());
 +        assertFalse(tasks.stream().anyMatch(ValidationTask::isActive));
 +        assertFalse(tasks.stream().allMatch(ValidationTask::isDone));
 +    }
 +
      @Test
      public void testCreateStandardSyncTasks()
      {
diff --cc test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 4db6efb680,77b7102117..88eff6ca0a
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@@ -63,7 -63,7 +63,7 @@@ public class RepairSessionTes
          IPartitioner p = Murmur3Partitioner.instance;
          Range<Token> repairRange = new 
Range<>(p.getToken(ByteBufferUtil.bytes(0)), 
p.getToken(ByteBufferUtil.bytes(100)));
          Set<InetAddressAndPort> endpoints = Sets.newHashSet(remote);
-         RepairSession session = new RepairSession(parentSessionId,
 -        RepairSession session = new RepairSession(parentSessionId, sessionId, 
Scheduler.build(0),
++        RepairSession session = new RepairSession(parentSessionId, new 
Scheduler.NoopScheduler(),
                                                    new CommonRange(endpoints, 
Collections.emptySet(), Arrays.asList(repairRange)),
                                                    "Keyspace1", 
RepairParallelism.SEQUENTIAL,
                                                    false, false,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to