aweisberg commented on code in PR #4696:
URL: https://github.com/apache/cassandra/pull/4696#discussion_r3040771625
##########
src/java/org/apache/cassandra/repair/RepairCoordinator.java:
##########
@@ -496,26 +552,60 @@ private Future<?> prepare(List<ColumnFamilyStore>
columnFamilies, Set<InetAddres
private Future<Pair<CoordinatedRepairResult, Supplier<String>>>
repair(String[] cfnames, NeighborsAndRanges neighborsAndRanges)
{
- RepairTask task;
+ ExecutorPlus executor = createExecutor();
+ state.phase.repairSubmitted();
+
if (state.options.isPreview())
{
- task = new PreviewRepairTask(this, state.id,
neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames),
neighborsAndRanges.shouldExcludeDeadParticipants, cfnames);
+ RepairTask task = new PreviewRepairTask(this, state.id,
neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames),
neighborsAndRanges.shouldExcludeDeadParticipants, cfnames);
+ return task.perform(executor, validationScheduler)
+ .<Pair<CoordinatedRepairResult, Supplier<String>>>map(r
-> Pair.create(r, task::successMessage))
+ .addCallback((s, f) -> executor.shutdown());
+ }
+ else if (useMutationTracking)
+ {
+ RepairTask mtTask = new
MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges,
cfnames);
+ if (mutationTrackingMigrationInProgress)
+ {
+ // During migration, run incremental repair first, then
mutation tracking sync.
+ // Propagate the IR result on success since it drives
migration advancement.
+ RepairTask incrementalTask = new IncrementalRepairTask(this,
state.id, neighborsAndRanges, cfnames);
+ AsyncPromise<Pair<CoordinatedRepairResult, Supplier<String>>>
result = new AsyncPromise<>();
+ logger.info("Migration to mutation tracking in progress for
{}; running incremental repair before MT sync", state.keyspace);
+ incrementalTask.perform(executor,
validationScheduler).addCallback(
+ irResult -> {
+ logger.info("Incremental repair completed for
migration keyspace {}: hasFailed={}", state.keyspace, irResult.hasFailed());
+ Pair<CoordinatedRepairResult, Supplier<String>>
irPair = Pair.create(irResult, incrementalTask::successMessage);
+ mtTask.perform(executor, validationScheduler)
+ .addCallback(
+ mtResult ->
result.trySuccess(irPair),
Review Comment:
A failure at any step is a failure of the entire thing since we didn't
complete the entire repair. That is what this should be doing which is return
failure immediately once any step fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]