This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new 366ee9da6c Add extra compaction junit 366ee9da6c is described below commit 366ee9da6c70dce35498d944f92a8c2a2db0c234 Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Tue Sep 10 10:02:37 2024 +0200 Add extra compaction junit patch by Berenguer Blasi; reviewed by Branimir Lambov for CASSANDRA-19863 --- .../cassandra/repair/consistent/LocalSessions.java | 2 +- .../db/compaction/AbstractPendingRepairTest.java | 7 ++ ...CompactionStrategyManagerPendingRepairTest.java | 98 ++++++++++++++++++++++ .../repair/consistent/LocalSessionTest.java | 2 +- 4 files changed, 107 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index e61ad47003..d527c962bf 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -930,7 +930,7 @@ public class LocalSessions } @VisibleForTesting - protected void sessionCompleted(LocalSession session) + public void sessionCompleted(LocalSession session) { for (TableId tid: session.tableIds) { diff --git a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java index de7ddfcb1f..3b45a9d841 100644 --- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.UUID; @@ -121,4 +122,10 @@ public class AbstractPendingRepairTest extends AbstractRepairTest { mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient); } + + public static void mutateRepaired(List<SSTableReader> sstables, UUID pendingRepair, boolean isTransient) + { + for (SSTableReader sstable: sstables) + mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient); + } } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index 9f2bc2ea75..c5ce83b9ae 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@ -18,6 +18,9 @@ package org.apache.cassandra.db.compaction; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -26,11 +29,13 @@ import com.google.common.collect.Iterables; import org.junit.Assert; import org.junit.Test; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableDeletingNotification; import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.repair.consistent.LocalSession; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; @@ -306,6 +311,99 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); } + /** + * Tests that finalized repairs racing with compactions on the same set of sstables don't leave unrepaired sstables behind + * + * This test checks that when a repair has been finalized but there are still pending sstables a finalize repair + * compaction task is issued for that repair session. + */ + @Test + public void testFinalizedAndCompactionRace() throws IOException + { + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + int numberOfSStables = 4; + List<SSTableReader> sstables = new ArrayList<>(numberOfSStables); + for (int i = 0; i < numberOfSStables; i++) + { + SSTableReader sstable = makeSSTable(true); + sstables.add(sstable); + Assert.assertFalse(sstable.isRepaired()); + Assert.assertFalse(sstable.isPendingRepair()); + } + + // change to pending repair + mutateRepaired(sstables, repairID, false); + csm.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); + for (SSTableReader sstable : sstables) + { + Assert.assertFalse(sstable.isRepaired()); + Assert.assertTrue(sstable.isPendingRepair()); + Assert.assertEquals(repairID, sstable.getPendingRepair()); + } + + // Get a compaction taks based on the sstables marked as pending repair + cfs.getCompactionStrategyManager().enable(); + for (SSTableReader sstable : sstables) + pendingContains(sstable); + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + + // Finalize the repair session + LocalSessionAccessor.finalizeUnsafe(repairID); + LocalSession session = ARS.consistent.local.getSession(repairID); + ARS.consistent.local.sessionCompleted(session); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + + // run the compaction + compactionTask.execute(ActiveCompactionsTracker.NOOP); + + // The repair session is finalized but there is an sstable left behind pending repair! + SSTableReader compactedSSTable = cfs.getLiveSSTables().iterator().next(); + if (!compactedSSTable.isRepaired()) + { + Assert.assertEquals(repairID, compactedSSTable.getPendingRepair()); + Assert.assertEquals(1, cfs.getLiveSSTables().size()); + + System.out.println("*********************************************************************************************"); + System.out.println(compactedSSTable); + System.out.println("Pending repair UUID: " + compactedSSTable.getPendingRepair()); + System.out.println("Repaired at: " + compactedSSTable.getRepairedAt()); + System.out.println("Creation time: " + compactedSSTable.getCreationTimeFor(Component.DATA)); + System.out.println("Live sstables: " + cfs.getLiveSSTables().size()); + System.out.println("*********************************************************************************************"); + + // Run compaction again. It should pick up the pending repair sstable + compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + if (compactionTask != null) + { + Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); + compactionTask.execute(ActiveCompactionsTracker.NOOP); + } + } + + System.out.println("*********************************************************************************************"); + System.out.println(compactedSSTable); + System.out.println("Pending repair UUID: " + compactedSSTable.getPendingRepair()); + System.out.println("Repaired at: " + compactedSSTable.getRepairedAt()); + System.out.println("Creation time: " + compactedSSTable.getCreationTimeFor(Component.DATA)); + System.out.println("Live sstables: " + cfs.getLiveSSTables().size()); + System.out.println("*********************************************************************************************"); + + Assert.assertEquals(1, cfs.getLiveSSTables().size()); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(repairedContains(compactedSSTable)); + Assert.assertFalse(unrepairedContains(compactedSSTable)); + Assert.assertFalse(pendingContains(compactedSSTable)); + // sstable should have pendingRepair cleared, and repairedAt set correctly + long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt; + Assert.assertFalse(compactedSSTable.isPendingRepair()); + Assert.assertTrue(compactedSSTable.isRepaired()); + Assert.assertEquals(expectedRepairedAt, compactedSSTable.getSSTableMetadata().repairedAt); + } + @Test public void finalizedSessionTransientCleanup() { diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index 80a12c0f30..2f8dae6a1d 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -194,7 +194,7 @@ public class LocalSessionTest extends AbstractRepairTest public Map<UUID, Integer> completedSessions = new HashMap<>(); - protected void sessionCompleted(LocalSession session) + public void sessionCompleted(LocalSession session) { UUID sessionID = session.sessionID; int calls = completedSessions.getOrDefault(sessionID, 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org