This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 5e37fcde46d3d9b1925f11a745da46e048e47675 Merge: 8670d98498 366ee9da6c Author: Bereng <[email protected]> AuthorDate: Wed Sep 18 08:43:39 2024 +0200 Merge branch 'cassandra-4.0' into cassandra-4.1 * cassandra-4.0: Add extra compaction junit .../cassandra/repair/consistent/LocalSessions.java | 2 +- .../db/compaction/AbstractPendingRepairTest.java | 7 ++ ...CompactionStrategyManagerPendingRepairTest.java | 93 ++++++++++++++++++++++ .../repair/consistent/LocalSessionTest.java | 2 +- 4 files changed, 102 insertions(+), 2 deletions(-) diff --cc test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java index b0bb23cc64,3b45a9d841..834ea3e264 --- a/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/AbstractPendingRepairTest.java @@@ -20,7 -20,9 +20,8 @@@ package org.apache.cassandra.db.compact import java.io.IOException; import java.util.HashSet; + import java.util.List; import java.util.Set; -import java.util.UUID; import org.junit.Before; import org.junit.BeforeClass; @@@ -122,4 -122,10 +123,10 @@@ public class AbstractPendingRepairTest { mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient); } + - public static void mutateRepaired(List<SSTableReader> sstables, UUID pendingRepair, boolean isTransient) ++ public static void mutateRepaired(List<SSTableReader> sstables, TimeUUID pendingRepair, boolean isTransient) + { - for (SSTableReader sstable: sstables) ++ for (SSTableReader sstable : sstables) + mutateRepaired(sstable, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, isTransient); + } } diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java index 76bbd88e3f,c5ce83b9ae..45011f1e47 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java @@@ -18,8 -18,12 +18,9 @@@ package org.apache.cassandra.db.compaction; -import java.io.IOException; + import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.UUID; import com.google.common.collect.Iterables; import org.junit.Assert; @@@ -30,7 -35,7 +32,8 @@@ import org.apache.cassandra.notificatio import org.apache.cassandra.notifications.SSTableDeletingNotification; import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.repair.NoSuchRepairSessionException; + import org.apache.cassandra.repair.consistent.LocalSession; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; @@@ -307,6 -311,99 +310,96 @@@ public class CompactionStrategyManagerP Assert.assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt); } + /** + * Tests that finalized repairs racing with compactions on the same set of sstables don't leave unrepaired sstables behind + * + * This test checks that when a repair has been finalized but there are still pending sstables a finalize repair + * compaction task is issued for that repair session. + */ + @Test - public void testFinalizedAndCompactionRace() throws IOException ++ public void testFinalizedAndCompactionRace() throws NoSuchRepairSessionException + { - UUID repairID = registerSession(cfs, true, true); ++ TimeUUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + int numberOfSStables = 4; + List<SSTableReader> sstables = new ArrayList<>(numberOfSStables); + for (int i = 0; i < numberOfSStables; i++) + { + SSTableReader sstable = makeSSTable(true); + sstables.add(sstable); + Assert.assertFalse(sstable.isRepaired()); + Assert.assertFalse(sstable.isPendingRepair()); + } + + // change to pending repair + mutateRepaired(sstables, repairID, false); + csm.handleNotification(new SSTableAddedNotification(sstables, null), cfs.getTracker()); + for (SSTableReader sstable : sstables) + { + Assert.assertFalse(sstable.isRepaired()); + Assert.assertTrue(sstable.isPendingRepair()); + Assert.assertEquals(repairID, sstable.getPendingRepair()); + } + + // Get a compaction taks based on the sstables marked as pending repair + cfs.getCompactionStrategyManager().enable(); + for (SSTableReader sstable : sstables) + pendingContains(sstable); + AbstractCompactionTask compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); + Assert.assertNotNull(compactionTask); + + // Finalize the repair session + LocalSessionAccessor.finalizeUnsafe(repairID); + LocalSession session = ARS.consistent.local.getSession(repairID); + ARS.consistent.local.sessionCompleted(session); + Assert.assertTrue(hasPendingStrategiesFor(repairID)); + + // run the compaction + compactionTask.execute(ActiveCompactionsTracker.NOOP); + + // The repair session is finalized but there is an sstable left behind pending repair! + SSTableReader compactedSSTable = cfs.getLiveSSTables().iterator().next(); - if (!compactedSSTable.isRepaired()) ++ Assert.assertEquals(repairID, compactedSSTable.getPendingRepair()); ++ Assert.assertEquals(1, cfs.getLiveSSTables().size()); ++ ++ System.out.println("*********************************************************************************************"); ++ System.out.println(compactedSSTable); ++ System.out.println("Pending repair UUID: " + compactedSSTable.getPendingRepair()); ++ System.out.println("Repaired at: " + compactedSSTable.getRepairedAt()); ++ System.out.println("Creation time: " + compactedSSTable.getCreationTimeFor(Component.DATA)); ++ System.out.println("Live sstables: " + cfs.getLiveSSTables().size()); ++ System.out.println("*********************************************************************************************"); ++ ++ // Run compaction again. It should pick up the pending repair sstable ++ compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); ++ if (compactionTask != null) + { - Assert.assertEquals(repairID, compactedSSTable.getPendingRepair()); - Assert.assertEquals(1, cfs.getLiveSSTables().size()); - - System.out.println("*********************************************************************************************"); - System.out.println(compactedSSTable); - System.out.println("Pending repair UUID: " + compactedSSTable.getPendingRepair()); - System.out.println("Repaired at: " + compactedSSTable.getRepairedAt()); - System.out.println("Creation time: " + compactedSSTable.getCreationTimeFor(Component.DATA)); - System.out.println("Live sstables: " + cfs.getLiveSSTables().size()); - System.out.println("*********************************************************************************************"); - - // Run compaction again. It should pick up the pending repair sstable - compactionTask = csm.getNextBackgroundTask(FBUtilities.nowInSeconds()); - if (compactionTask != null) - { - Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); - compactionTask.execute(ActiveCompactionsTracker.NOOP); - } ++ Assert.assertSame(PendingRepairManager.RepairFinishedCompactionTask.class, compactionTask.getClass()); ++ compactionTask.execute(ActiveCompactionsTracker.NOOP); + } + + System.out.println("*********************************************************************************************"); + System.out.println(compactedSSTable); + System.out.println("Pending repair UUID: " + compactedSSTable.getPendingRepair()); + System.out.println("Repaired at: " + compactedSSTable.getRepairedAt()); + System.out.println("Creation time: " + compactedSSTable.getCreationTimeFor(Component.DATA)); + System.out.println("Live sstables: " + cfs.getLiveSSTables().size()); + System.out.println("*********************************************************************************************"); + + Assert.assertEquals(1, cfs.getLiveSSTables().size()); + Assert.assertFalse(hasPendingStrategiesFor(repairID)); + Assert.assertFalse(hasTransientStrategiesFor(repairID)); + Assert.assertTrue(repairedContains(compactedSSTable)); + Assert.assertFalse(unrepairedContains(compactedSSTable)); + Assert.assertFalse(pendingContains(compactedSSTable)); + // sstable should have pendingRepair cleared, and repairedAt set correctly + long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt; + Assert.assertFalse(compactedSSTable.isPendingRepair()); + Assert.assertTrue(compactedSSTable.isRepaired()); + Assert.assertEquals(expectedRepairedAt, compactedSSTable.getSSTableMetadata().repairedAt); + } + @Test public void finalizedSessionTransientCleanup() { diff --cc test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index c83335bfbd,2f8dae6a1d..745edfec52 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@@ -194,11 -192,11 +194,11 @@@ public class LocalSessionTest extends A return true; } - public Map<UUID, Integer> completedSessions = new HashMap<>(); + public Map<TimeUUID, Integer> completedSessions = new HashMap<>(); - protected void sessionCompleted(LocalSession session) + public void sessionCompleted(LocalSession session) { - UUID sessionID = session.sessionID; + TimeUUID sessionID = session.sessionID; int calls = completedSessions.getOrDefault(sessionID, 0); completedSessions.put(sessionID, calls + 1); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
