This is an automated email from the ASF dual-hosted git repository. tolbertam pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new f2bf017e6d Fix AutoRepair Flaky InJvm dtest f2bf017e6d is described below commit f2bf017e6dd20fb6ba7e42a9a6e5cae4e92f8a0c Author: jaydeepkumar1984 <chovatia.jayd...@gmail.com> AuthorDate: Mon May 5 11:23:26 2025 -0700 Fix AutoRepair Flaky InJvm dtest Patch by Jaydeepkumar Chovatia; Reviewed by Andy Tolbert, Chris Lohfink for CASSANDRA-20620 --- CHANGES.txt | 1 + .../cassandra/repair/autorepair/AutoRepair.java | 29 +++++++++++----------- ...owParallelReplicaRepairAcrossSchedulesTest.java | 4 ++- .../test/repair/AutoRepairSchedulerTest.java | 15 ++++++----- 4 files changed, 28 insertions(+), 21 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f214877478..46e0afd6b0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Fix AutoRepair flaky InJvm dtest (CASSANDRA-20620) * Increasing default for auto_repair.sstable_upper_threshold considering large Cassandra tables & revert three lines removed from CHANGES.txt due to a merge mistake (CASSANDRA-20586) * Fix token restrictions with MIN_TOKEN (CASSANDRO-20557) * Upgrade logback version to 1.5.18 and slf4j dependencies to 2.0.17 (CASSANDRA-20429) diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java index 8c08ce7c80..09e4a62a48 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java @@ -196,7 +196,7 @@ public class AutoRepair boolean primaryRangeOnly = config.getRepairPrimaryTokenRangeOnly(repairType) && turn != MY_TURN_FORCE_REPAIR; - long startTime = timeFunc.get(); + long startTimeInMillis = timeFunc.get(); logger.info("My host id: {}, my turn to run repair...repair primary-ranges only? {}", myId, config.getRepairPrimaryTokenRangeOnly(repairType)); AutoRepairUtils.updateStartAutoRepairHistory(repairType, myId, timeFunc.get(), turn); @@ -242,7 +242,7 @@ public class AutoRepair repairKeyspace(repairType, primaryRangeOnly, repairAssignments.getKeyspaceName(), repairAssignments.getRepairAssignments(), collectedRepairStats); } - cleanupAndUpdateStats(turn, repairType, repairState, myId, startTime, collectedRepairStats); + cleanupAndUpdateStats(turn, repairType, repairState, myId, startTimeInMillis, collectedRepairStats); } else { @@ -318,8 +318,8 @@ public class AutoRepair while (retryCount <= config.getRepairMaxRetries(repairType)) { RepairCoordinator task = repairState.getRepairRunnable(keyspaceName, - Lists.newArrayList(curRepairAssignment.getTableNames()), - ranges, primaryRangeOnly); + Lists.newArrayList(curRepairAssignment.getTableNames()), + ranges, primaryRangeOnly); RepairProgressListener listener = new RepairProgressListener(repairType); task.addProgressListener(listener); f = repairRunnableExecutors.get(repairType).submit(task); @@ -449,7 +449,7 @@ public class AutoRepair } private void cleanupAndUpdateStats(RepairTurn turn, AutoRepairConfig.RepairType repairType, AutoRepairState repairState, UUID myId, - long startTime, CollectedRepairStats collectedRepairStats) throws InterruptedException + long startTimeInMillis, CollectedRepairStats collectedRepairStats) throws InterruptedException { //if it was due to priority then remove it now if (turn == MY_TURN_DUE_TO_PRIORITY) @@ -457,12 +457,19 @@ public class AutoRepair logger.info("Remove current host from priority list"); AutoRepairUtils.removePriorityStatus(repairType, myId); } - + long repairScheduleElapsedInMillis = timeFunc.get() - startTimeInMillis; + if (repairScheduleElapsedInMillis < SLEEP_IF_REPAIR_FINISHES_QUICKLY.toMilliseconds()) + { + //If repair finished quickly, happens for Cassndra cluster with empty (or tiny) data, in such cases, + //wait for some duration so that the JMX metrics can detect the repairInProgress + logger.info("Wait for {}ms for repair type {}.", SLEEP_IF_REPAIR_FINISHES_QUICKLY.toMilliseconds() - repairScheduleElapsedInMillis, repairType); + Thread.sleep(SLEEP_IF_REPAIR_FINISHES_QUICKLY.toMilliseconds() - repairScheduleElapsedInMillis); + } repairState.setFailedTokenRangesCount(collectedRepairStats.failedTokenRanges); repairState.setSucceededTokenRangesCount(collectedRepairStats.succeededTokenRanges); repairState.setSkippedTokenRangesCount(collectedRepairStats.skippedTokenRanges); repairState.setSkippedTablesCount(collectedRepairStats.skippedTables); - repairState.setNodeRepairTimeInSec((int) TimeUnit.MILLISECONDS.toSeconds(timeFunc.get() - startTime)); + repairState.setNodeRepairTimeInSec((int) TimeUnit.MILLISECONDS.toSeconds(repairScheduleElapsedInMillis)); long timeInHours = TimeUnit.SECONDS.toHours(repairState.getNodeRepairTimeInSec()); logger.info("Local {} repair time {} hour(s), stats: repairKeyspaceCount {}, " + "repairTokenRangesSuccessCount {}, repairTokenRangesFailureCount {}, " + @@ -477,13 +484,7 @@ public class AutoRepair TimeUnit.SECONDS.toDays(repairState.getClusterRepairTimeInSec())); } repairState.setLastRepairTime(timeFunc.get()); - if (timeInHours == 0 && SLEEP_IF_REPAIR_FINISHES_QUICKLY.toSeconds() > 0) - { - //If repair finished quickly, happens for an empty instance, in such case - //wait for some duration so that the JMX metrics can detect the repairInProgress - logger.info("Wait for {} for repair type {}.", SLEEP_IF_REPAIR_FINISHES_QUICKLY, repairType); - Thread.sleep(SLEEP_IF_REPAIR_FINISHES_QUICKLY.toMilliseconds()); - } + repairState.setRepairInProgress(false); AutoRepairUtils.updateFinishAutoRepairHistory(repairType, myId, timeFunc.get()); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerDisallowParallelReplicaRepairAcrossSchedulesTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerDisallowParallelReplicaRepairAcrossSchedulesTest.java index a00e713cf2..e1ccbcb8da 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerDisallowParallelReplicaRepairAcrossSchedulesTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerDisallowParallelReplicaRepairAcrossSchedulesTest.java @@ -36,6 +36,7 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.cassandra.repair.autorepair.AutoRepairConfig; import org.apache.cassandra.service.AutoRepairService; +import org.apache.cassandra.utils.FBUtilities; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; @@ -114,7 +115,8 @@ public class AutoRepairSchedulerDisallowParallelReplicaRepairAcrossSchedulesTest cluster.forEach(i -> i.runOnInstance(() -> { // Expect contention on incremental repair across schedules AutoRepairMetrics incrementalMetrics = AutoRepairMetricsManager.getMetrics(AutoRepairConfig.RepairType.INCREMENTAL); - Util.spinAssert("AutoRepair has not observed any replica contention in INCREMENTAL repair", + Util.spinAssert(String.format("%s: AutoRepair has not observed any replica contention in INCREMENTAL repair", + FBUtilities.getJustBroadcastAddress().toString()), greaterThan(0L), incrementalMetrics.repairDelayedBySchedule::getCount, 5, diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerTest.java index e726dc1a17..4df58213aa 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairSchedulerTest.java @@ -44,6 +44,7 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.cassandra.repair.autorepair.AutoRepairConfig; import org.apache.cassandra.service.AutoRepairService; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME; import static org.hamcrest.Matchers.greaterThan; @@ -78,11 +79,11 @@ public class AutoRepairSchedulerTest extends TestBaseImpl ImmutableMap.of( "initial_scheduler_delay", "5s", "enabled", "true", - "parallel_repair_count", "2", + "parallel_repair_count", "3", // Allow parallel replica repair to allow replicas // to execute full repair at same time. "allow_parallel_replica_repair", "true", - "min_repair_interval", "15s"), + "min_repair_interval", "5s"), AutoRepairConfig.RepairType.INCREMENTAL.getConfigName(), ImmutableMap.of( "initial_scheduler_delay", "5s", @@ -137,19 +138,21 @@ public class AutoRepairSchedulerTest extends TestBaseImpl // validate that the repair ran on all nodes cluster.forEach(i -> i.runOnInstance(() -> { + String broadcastAddress = FBUtilities.getJustBroadcastAddress().toString(); + // Reduce sleeping if repair finishes quickly to speed up test but make it non-zero to provoke some // contention. - AutoRepair.SLEEP_IF_REPAIR_FINISHES_QUICKLY = new DurationSpec.IntSecondsBound("1s"); + AutoRepair.SLEEP_IF_REPAIR_FINISHES_QUICKLY = new DurationSpec.IntSecondsBound("2s"); AutoRepairMetrics incrementalMetrics = AutoRepairMetricsManager.getMetrics(AutoRepairConfig.RepairType.INCREMENTAL); - Util.spinAssert("AutoRepair has not yet completed one INCREMENTAL repair cycle", + Util.spinAssert(String.format("%s: AutoRepair has not yet completed one INCREMENTAL repair cycle", broadcastAddress), greaterThan(0L), () -> incrementalMetrics.nodeRepairTimeInSec.getValue().longValue(), 5, TimeUnit.MINUTES); // Expect some contention on incremental repair. - Util.spinAssert("AutoRepair has not observed any replica contention in INCREMENTAL repair", + Util.spinAssert(String.format("%s: AutoRepair has not observed any replica contention in INCREMENTAL repair", broadcastAddress), greaterThan(0L), incrementalMetrics.repairDelayedByReplica::getCount, 5, @@ -159,7 +162,7 @@ public class AutoRepairSchedulerTest extends TestBaseImpl assertEquals(0L, incrementalMetrics.repairDelayedBySchedule.getCount()); AutoRepairMetrics fullMetrics = AutoRepairMetricsManager.getMetrics(AutoRepairConfig.RepairType.FULL); - Util.spinAssert("AutoRepair has not yet completed one FULL repair cycle", + Util.spinAssert(String.format("%s: AutoRepair has not yet completed one FULL repair cycle", broadcastAddress), greaterThan(0L), () -> fullMetrics.nodeRepairTimeInSec.getValue().longValue(), 5, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org