This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 6cfea18dda Avoid duplicate hardlink error upon forceful taking of ephemeral snapshots during repair 6cfea18dda is described below commit 6cfea18ddaa23022a919d3251f161151be81ebf0 Author: Stefan Miklosovic <smikloso...@apache.org> AuthorDate: Thu Mar 27 14:31:54 2025 +0100 Avoid duplicate hardlink error upon forceful taking of ephemeral snapshots during repair patch by Stefan Miklosovic; reviewed by Francisco Guerrero for CASSANDRA-20490 --- CHANGES.txt | 1 + .../db/repair/CassandraTableRepairManager.java | 4 +- .../cassandra/io/sstable/format/SSTableReader.java | 31 +++- .../org/apache/cassandra/io/util/FileUtils.java | 14 +- .../snapshot/SnapshotDetailsTabularData.java | 5 +- .../service/snapshot/SnapshotManager.java | 31 +++- .../service/snapshot/SnapshotOptions.java | 34 ++--- .../cassandra/service/snapshot/TableSnapshot.java | 13 +- .../service/snapshot/TakeSnapshotTask.java | 32 +++- .../distributed/test/EphemeralSnapshotTest.java | 167 +++++++++++++++++++-- .../cassandra/distributed/test/SnapshotsTest.java | 90 ++++++++++- .../cassandra/service/ActiveRepairServiceTest.java | 25 ++- 12 files changed, 395 insertions(+), 52 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0fc414363c..a030675ce9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Avoid duplicate hardlink error upon forceful taking of ephemeral snapshots during repair (CASSANDRA-20490) * When a custom disk error handler fails to initiate, fail the startup of a node instead of using the no-op handler (CASSANDRA-20614) * Rewrite constraint framework to remove column specification from constraint definition, introduce SQL-like NOT NULL (CASSANDRA-20563) * Fix a bug in AutoRepair duration metric calculation if schedule finishes quickly (CASSANDRA-20622) diff --git a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java index 3b6535752e..bf5ab5a0e1 100644 --- a/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java +++ b/src/java/org/apache/cassandra/db/repair/CassandraTableRepairManager.java @@ -87,7 +87,9 @@ public class CassandraTableRepairManager implements TableRepairManager !sstable.metadata().isIndex() && // exclude SSTables from 2i new Bounds<>(sstable.getFirst().getToken(), sstable.getLast().getToken()).intersects(ranges); - SnapshotOptions options = SnapshotOptions.systemSnapshot(name, SnapshotType.REPAIR, predicate, cfs.getKeyspaceTableName()).ephemeral().build(); + SnapshotOptions options = SnapshotOptions.systemSnapshot(name, SnapshotType.REPAIR, predicate, cfs.getKeyspaceTableName()) + .ephemeral() + .build(); SnapshotManager.instance.takeSnapshot(options); } }).get(); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 3e011f8314..1055070cd8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -92,6 +92,7 @@ import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.FileUtils.DuplicateHardlinkException; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.schema.SchemaConstants; @@ -1127,15 +1128,29 @@ public abstract class SSTableReader extends SSTable implements UnfilteredSource, public void createLinks(String snapshotDirectoryPath, RateLimiter rateLimiter) { - createLinks(descriptor, components, snapshotDirectoryPath, rateLimiter); + createLinks(snapshotDirectoryPath, rateLimiter, false); + } + + public void createLinks(String snapshotDirectoryPath, RateLimiter rateLimiter, boolean ephemeralSnapshot) + { + createLinks(descriptor, components, snapshotDirectoryPath, rateLimiter, ephemeralSnapshot); } public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath) { - createLinks(descriptor, components, snapshotDirectoryPath, null); + createLinks(descriptor, components, snapshotDirectoryPath, null, false); } - public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath, RateLimiter limiter) + /** + * Create hardlinks for given set of components + * + * @param descriptor descriptor to use + * @param components components to create links for + * @param snapshotDirectoryPath directory path for snapshot + * @param limiter rate limiter to use + * @param force if true, if target link file exists, do not fail, otherwise throw RTE + */ + public static void createLinks(Descriptor descriptor, Set<Component> components, String snapshotDirectoryPath, RateLimiter limiter, boolean force) { for (Component component : components) { @@ -1145,7 +1160,15 @@ public abstract class SSTableReader extends SSTable implements UnfilteredSource, if (null != limiter) limiter.acquire(); File targetLink = new File(snapshotDirectoryPath, sourceFile.name()); - FileUtils.createHardLink(sourceFile, targetLink); + try + { + FileUtils.createHardLink(sourceFile, targetLink); + } + catch (DuplicateHardlinkException ex) + { + if (!force) + throw new RuntimeException(ex.getMessage()); + } } } diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java index d5ea8dcde1..c8c605d7da 100644 --- a/src/java/org/apache/cassandra/io/util/FileUtils.java +++ b/src/java/org/apache/cassandra/io/util/FileUtils.java @@ -166,7 +166,7 @@ public final class FileUtils public static void createHardLink(File from, File to) { if (to.exists()) - throw new RuntimeException("Tried to create duplicate hard link to " + to); + throw new DuplicateHardlinkException("Tried to create duplicate hard link from " + from + " to " + to); if (!from.exists()) throw new RuntimeException("Tried to hard link to file that does not exist " + from); @@ -195,6 +195,10 @@ public final class FileUtils { throw ex; } + catch (DuplicateHardlinkException ex) + { + throw new RuntimeException(ex.getMessage()); + } catch (Throwable t) { throw new RuntimeException(String.format("Unable to hardlink from %s to %s", from, to), t); @@ -818,4 +822,12 @@ public final class FileUtils f.tryDelete(); } } + + public static class DuplicateHardlinkException extends RuntimeException + { + public DuplicateHardlinkException(String message) + { + super(message); + } + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotDetailsTabularData.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotDetailsTabularData.java index 3d9609f2e7..7b7635b0a4 100644 --- a/src/java/org/apache/cassandra/service/snapshot/SnapshotDetailsTabularData.java +++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotDetailsTabularData.java @@ -81,7 +81,10 @@ public class SnapshotDetailsTabularData { try { - String totalSize = FileUtils.stringifyFileSize(details.computeSizeOnDiskBytes()); + // in case of forcibly taken snapshots, ephemeral snapshot might get more sstables + // which would inflate sizes which are otherwise cached, + // hence, refresh sizes, but for ephemeral snapshots only + String totalSize = FileUtils.stringifyFileSize(details.computeSizeOnDiskBytes(details.isEphemeral())); long trueSizeBytes = details.computeTrueSizeBytes(files); String liveSize = FileUtils.stringifyFileSize(trueSizeBytes); String createdAt = safeToString(details.getCreatedAt()); diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java index b8b81915ce..f079f42a9a 100644 --- a/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java +++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotManager.java @@ -19,6 +19,8 @@ package org.apache.cassandra.service.snapshot; import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -568,18 +570,33 @@ public class SnapshotManager implements SnapshotManagerMBean, INotificationConsu private synchronized void prePopulateSnapshots(TakeSnapshotTask task) { Map<ColumnFamilyStore, TableSnapshot> snapshotsToCreate = task.getSnapshotsToCreate(); - for (Map.Entry<ColumnFamilyStore, TableSnapshot> toCreateEntry : snapshotsToCreate.entrySet()) + Map<ColumnFamilyStore, TableSnapshot> snapshotsToOverwrite = new HashMap<>(); + List<TableSnapshot> toCreate = new ArrayList<>(snapshotsToCreate.values()); + + for (TableSnapshot existingSnapshot : snapshots) { - if (snapshots.contains(toCreateEntry.getValue())) + for (Map.Entry<ColumnFamilyStore, TableSnapshot> toCreateEntry : snapshotsToCreate.entrySet()) { - throw new RuntimeException(format("Snapshot %s for %s.%s already exists.", - toCreateEntry.getValue().getTag(), - toCreateEntry.getValue().getKeyspaceName(), - toCreateEntry.getValue().getTableName())); + TableSnapshot snapshotToCreate = toCreateEntry.getValue(); + if (existingSnapshot.equals(toCreateEntry.getValue())) + { + if (!task.options.ephemeral) + { + throw new RuntimeException(format("Snapshot %s for %s.%s already exists.", + snapshotToCreate.getTag(), + snapshotToCreate.getKeyspaceName(), + snapshotToCreate.getTableName())); + } + + toCreate.remove(toCreateEntry.getValue()); + snapshotsToOverwrite.put(toCreateEntry.getKey(), existingSnapshot); + } } } - snapshots.addAll(snapshotsToCreate.values()); + snapshotsToCreate.putAll(snapshotsToOverwrite); + + snapshots.addAll(toCreate); } private static ScheduledExecutorPlus createSnapshotCleanupExecutor() diff --git a/src/java/org/apache/cassandra/service/snapshot/SnapshotOptions.java b/src/java/org/apache/cassandra/service/snapshot/SnapshotOptions.java index 409288f8bf..8f487d2453 100644 --- a/src/java/org/apache/cassandra/service/snapshot/SnapshotOptions.java +++ b/src/java/org/apache/cassandra/service/snapshot/SnapshotOptions.java @@ -50,27 +50,18 @@ public class SnapshotOptions public final Predicate<SSTableReader> sstableFilter; public final ColumnFamilyStore cfs; - private SnapshotOptions(SnapshotType type, - String tag, - DurationSpec.IntSecondsBound ttl, - Instant creationTime, - boolean skipFlush, - boolean ephemeral, - String[] entities, - RateLimiter rateLimiter, - Predicate<SSTableReader> sstableFilter, - ColumnFamilyStore cfs) + private SnapshotOptions(Builder builder) { - this.type = type; - this.tag = tag; - this.ttl = ttl; - this.creationTime = creationTime; - this.skipFlush = skipFlush; - this.ephemeral = ephemeral; - this.entities = entities; - this.rateLimiter = rateLimiter; - this.sstableFilter = sstableFilter; - this.cfs = cfs; + this.type = builder.type; + this.tag = builder.tag; + this.ttl = builder.ttl; + this.creationTime = builder.creationTime; + this.skipFlush = builder.skipFlush; + this.ephemeral = builder.ephemeral; + this.entities = builder.entities; + this.rateLimiter = builder.rateLimiter; + this.sstableFilter = builder.sstableFilter; + this.cfs = builder.cfs; } public static Builder systemSnapshot(String tag, SnapshotType type, String... entities) @@ -214,8 +205,7 @@ public class SnapshotOptions if (rateLimiter == null) rateLimiter = DatabaseDescriptor.getSnapshotRateLimiter(); - return new SnapshotOptions(type, tag, ttl, creationTime, skipFlush, ephemeral, entities, rateLimiter, - sstableFilter, cfs); + return new SnapshotOptions(this); } } diff --git a/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java b/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java index 7698ea1e3d..a23aa7a2b1 100644 --- a/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java +++ b/src/java/org/apache/cassandra/service/snapshot/TableSnapshot.java @@ -207,10 +207,21 @@ public class TableSnapshot } public long computeSizeOnDiskBytes() + { + return computeSizeOnDiskBytes(false); + } + + /** + * + * @param refresh true if a caller wants to recompute otherwise cached size + * @return on disk bytes + */ + public long computeSizeOnDiskBytes(boolean refresh) { long sum = sizeOnDisk; - if (sum == 0) + if (sum == 0 || refresh) { + sum = 0; for (File snapshotDir : snapshotDirs) sum += FileUtils.folderSize(snapshotDir); diff --git a/src/java/org/apache/cassandra/service/snapshot/TakeSnapshotTask.java b/src/java/org/apache/cassandra/service/snapshot/TakeSnapshotTask.java index a0d81a685c..236a3ab783 100644 --- a/src/java/org/apache/cassandra/service/snapshot/TakeSnapshotTask.java +++ b/src/java/org/apache/cassandra/service/snapshot/TakeSnapshotTask.java @@ -163,7 +163,7 @@ public class TakeSnapshotTask extends AbstractSnapshotTask<List<TableSnapshot>> for (SSTableReader ssTable : currentView.sstables) { File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); - ssTable.createLinks(snapshotDirectory.path(), options.rateLimiter); // hard links + ssTable.createLinks(snapshotDirectory.path(), options.rateLimiter, options.ephemeral); // hard links if (logger.isTraceEnabled()) logger.trace("Snapshot for {} keyspace data file {} created in {}", cfs.keyspace, ssTable.getFilename(), snapshotDirectory); sstables.add(ssTable); @@ -268,12 +268,40 @@ public class TakeSnapshotTask extends AbstractSnapshotTask<List<TableSnapshot>> } + private SnapshotManifest createSnapshotManifest(SnapshotManifest manifest, File manifestFile) + { + SnapshotManifest oldManifest = null; + if (manifestFile.exists()) + { + try + { + oldManifest = SnapshotManifest.deserializeFromJsonFile(manifestFile); + } + catch (Throwable t) + { + logger.warn("Unable to read the content of old manifest {}", manifestFile); + } + } + + if (oldManifest != null) + { + Set<String> deduplicates = new HashSet<>(); // set to deduplicate + deduplicates.addAll(oldManifest.getFiles()); + deduplicates.addAll(manifest.files); + + return new SnapshotManifest(new ArrayList<>(deduplicates), options.ttl, creationTime, options.ephemeral); + } + + return manifest; + } + private void writeSnapshotManifest(SnapshotManifest manifest, File manifestFile) { try { + SnapshotManifest toCreate = createSnapshotManifest(manifest, manifestFile); manifestFile.parent().tryCreateDirectories(); - manifest.serializeToJsonFile(manifestFile); + toCreate.serializeToJsonFile(manifestFile); } catch (IOException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/EphemeralSnapshotTest.java b/test/distributed/org/apache/cassandra/distributed/test/EphemeralSnapshotTest.java index f4d423cf23..4a4d9ccf8d 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/EphemeralSnapshotTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/EphemeralSnapshotTest.java @@ -18,13 +18,22 @@ package org.apache.cassandra.distributed.test; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; import com.google.common.util.concurrent.Futures; import org.junit.Test; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInvokableInstance; @@ -32,6 +41,8 @@ import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.service.snapshot.SnapshotManager; import org.apache.cassandra.service.snapshot.SnapshotManifest; +import org.apache.cassandra.service.snapshot.SnapshotOptions; +import org.apache.cassandra.service.snapshot.SnapshotType; import org.apache.cassandra.utils.Pair; import static java.lang.String.format; @@ -40,6 +51,8 @@ import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.schema.SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES; +import static org.apache.cassandra.schema.SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -58,7 +71,7 @@ public class EphemeralSnapshotTest extends TestBaseImpl .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) .start())) { - Pair<String, String[]> initialisationData = initialise(c); + Pair<String, String[]> initialisationData = initialise(c, tableName); rewriteManifestToEphemeral(initialisationData.left, initialisationData.right); @@ -76,7 +89,7 @@ public class EphemeralSnapshotTest extends TestBaseImpl .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) .start())) { - Pair<String, String[]> initialisationData = initialise(c); + Pair<String, String[]> initialisationData = initialise(c, tableName); String tableId = initialisationData.left; String[] dataDirs = initialisationData.right; @@ -106,7 +119,7 @@ public class EphemeralSnapshotTest extends TestBaseImpl { IInvokableInstance instance = c.get(1); - Pair<String, String[]> initialisationData = initialise(c); + Pair<String, String[]> initialisationData = initialise(c, tableName); rewriteManifestToEphemeral(initialisationData.left, initialisationData.right); c.get(1).runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> SnapshotManager.instance.restart(true)); @@ -130,7 +143,7 @@ public class EphemeralSnapshotTest extends TestBaseImpl { IInvokableInstance instance = c.get(1); - Pair<String, String[]> initialisationData = initialise(c); + Pair<String, String[]> initialisationData = initialise(c, tableName); rewriteManifestToEphemeral(initialisationData.left, initialisationData.right); @@ -142,13 +155,104 @@ public class EphemeralSnapshotTest extends TestBaseImpl } } - private Pair<String, String[]> initialise(Cluster c) + /** + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-20490">CASSANDRA-20490</a> + */ + @Test + public void testForceEphemeralSnapshotWhenAlreadyExists() throws Exception + { + try (Cluster c = init(builder().withNodes(1) + .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)) + .start())) + { + IInvokableInstance instance = c.get(1); + + c.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s." + tableName + " (cityid int PRIMARY KEY, name text)")); + c.coordinator(1).execute(withKeyspace("INSERT INTO %s." + tableName + "(cityid, name) VALUES (1, 'Canberra');"), ONE); + + instance.flush(KEYSPACE); + + takeEphemeralSnapshotForcibly(c, KEYSPACE, tableName, snapshotName); + assertTrue(instance.nodetoolResult("listsnapshots", "-e").getStdout().contains(snapshotName)); + float firstSnapshotSize = getSnapshotSizeOnDisk(c, KEYSPACE, tableName, snapshotName); + + SnapshotManifest snapshotManifest = SnapshotManifest.deserializeFromJsonFile(new File(findManifest(getDataDirs(c), getTableId(c, KEYSPACE, tableName)))); + assertEquals(1, snapshotManifest.getFiles().size()); + + // list sstables + List<String> snapshotFilesAfterFirstSnapshot = getSnapshotFiles(c, snapshotName); + assertFalse(snapshotFilesAfterFirstSnapshot.isEmpty()); + + // add more data + insertData(c, tableName); + + takeEphemeralSnapshotForcibly(c, KEYSPACE, tableName, snapshotName); + assertTrue(instance.nodetoolResult("listsnapshots", "-e").getStdout().contains(snapshotName)); + SnapshotManifest secondSnapshotManifest = SnapshotManifest.deserializeFromJsonFile(new File(findManifest(getDataDirs(c), getTableId(c, KEYSPACE, tableName)))); + assertEquals(2, secondSnapshotManifest.getFiles().size()); + + List<String> snapshotFilesAfterSecondSnapshot = getSnapshotFiles(c, snapshotName); + assertFalse(snapshotFilesAfterSecondSnapshot.isEmpty()); + + // list again and check it is superset of previous listing + assertTrue(snapshotFilesAfterSecondSnapshot.size() > snapshotFilesAfterFirstSnapshot.size()); + assertTrue(snapshotFilesAfterSecondSnapshot.containsAll(snapshotFilesAfterFirstSnapshot)); + assertTrue(secondSnapshotManifest.getFiles().containsAll(snapshotManifest.getFiles())); + + float secondSnapshotSize = getSnapshotSizeOnDisk(c, KEYSPACE, tableName, snapshotName); + + assertTrue(secondSnapshotSize > firstSnapshotSize); + } + } + + private Float getSnapshotSizeOnDisk(Cluster c, String keyspace, String table, String snapshotName) + { + return c.get(1).applyOnInstance((IIsolatedExecutor.SerializableTriFunction<String, String, String, Float>) (ks, tb, name) -> { + + Map<String, TabularData> stringTabularDataMap = SnapshotManager.instance.listSnapshots(Map.of("include_ephemeral", "true")); + + TabularDataSupport tabularData = (TabularDataSupport) stringTabularDataMap.get(name); + for (Object value : tabularData.values()) + { + CompositeDataSupport cds = (CompositeDataSupport) value; + return Float.parseFloat(((String) cds.get("Size on disk")).split(" ")[0]); + } + + return 0F; + }, keyspace, table, snapshotName); + } + + private void takeEphemeralSnapshotForcibly(Cluster c, String keyspace, String table, String snapshotName) + { + c.get(1).applyOnInstance((IIsolatedExecutor.SerializableTriFunction<String, String, String, Void>) (ks, tb, name) -> + { + ColumnFamilyStore cfs = Keyspace.getValidKeyspace(ks).getColumnFamilyStore(tb); + try + { + SnapshotManager.instance.takeSnapshot(SnapshotOptions.systemSnapshot(name, SnapshotType.REPAIR, (sstable) -> true, cfs.getKeyspaceTableName()) + .ephemeral() + .build()); + } + catch (Throwable t) + { + throw new RuntimeException(t.getMessage()); + } + return null; + }, keyspace, table, snapshotName); + } + + private void insertData(Cluster c, String tableName) { c.schemaChange(withKeyspace("CREATE TABLE IF NOT EXISTS %s." + tableName + " (cityid int PRIMARY KEY, name text)")); c.coordinator(1).execute(withKeyspace("INSERT INTO %s." + tableName + "(cityid, name) VALUES (1, 'Canberra');"), ONE); IInvokableInstance instance = c.get(1); - instance.flush(KEYSPACE); + } + + private Pair<String, String[]> initialise(Cluster c, String tableName) + { + insertData(c, tableName); + IInvokableInstance instance = c.get(1); assertEquals(0, instance.nodetool("snapshot", "-kt", withKeyspace("%s." + tableName), "-t", snapshotName)); waitForSnapshot(instance, snapshotName); @@ -158,15 +262,17 @@ public class EphemeralSnapshotTest extends TestBaseImpl assertEquals(0, instance.nodetool("snapshot", "-kt", withKeyspace("%s." + tableName), "-t", snapshotName2)); waitForSnapshot(instance, snapshotName2); - String tableId = instance.callOnInstance((IIsolatedExecutor.SerializableCallable<String>) () -> { - return Keyspace.open(KEYSPACE).getMetadata().tables.get(tableName).get().id.asUUID().toString().replaceAll("-", ""); - }); + String tableId = getTableId(c, KEYSPACE, tableName); - String[] dataDirs = (String[]) instance.config().get("data_file_directories"); + String[] dataDirs = getDataDirs(c); return Pair.create(tableId, dataDirs); } + private String[] getDataDirs(Cluster c) + { + return (String[]) c.get(1).config().get("data_file_directories"); + } private void verify(IInvokableInstance instance) { @@ -220,4 +326,45 @@ public class EphemeralSnapshotTest extends TestBaseImpl throw new IllegalStateException("Unable to find manifest!"); } + + private List<String> getSnapshotFiles(Cluster cluster, String snapshotName) + { + return cluster.get(1).applyOnInstance((IIsolatedExecutor.SerializableFunction<String, List<String>>) (name) -> { + List<String> result = new ArrayList<>(); + + for (Keyspace keyspace : Keyspace.all()) + { + if (LOCAL_SYSTEM_KEYSPACE_NAMES.contains(keyspace.getName()) || REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(keyspace.getName())) + continue; + + for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) + { + for (String dataDir : DatabaseDescriptor.getAllDataFileLocations()) + { + File snapshotDir = new File(dataDir, format("%s/%s-%s/snapshots/%s", keyspace.getName(), cfs.name, cfs.metadata().id.toHexString(), name)); + if (snapshotDir.exists()) + { + try + { + Files.list(snapshotDir.toPath()).forEach(p -> result.add(p.toString())); + } + catch (IOException e) + { + throw new RuntimeException("Unable to list " + snapshotDir.toPath(), e); + } + } + } + } + } + + return result; + }, snapshotName); + } + + private String getTableId(Cluster c, String keyspace, String tableName) + { + return c.get(1).applyOnInstance((IIsolatedExecutor.SerializableBiFunction<String, String, String>) (ks, tb) -> { + return Keyspace.open(ks).getMetadata().tables.get(tb).get().id.asUUID().toString().replaceAll("-", ""); + }, keyspace, tableName); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTest.java b/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTest.java index 7227d6613f..4d2c42bcb6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SnapshotsTest.java @@ -24,6 +24,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.regex.Pattern; import org.junit.After; @@ -43,6 +44,9 @@ import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.io.util.File; import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.service.snapshot.SnapshotManifest; +import org.apache.cassandra.service.snapshot.SnapshotOptions; +import org.apache.cassandra.service.snapshot.SnapshotType; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.FBUtilities; @@ -199,7 +203,7 @@ public class SnapshotsTest extends TestBaseImpl cluster.get(1).shutdown(true); // remove manifest only in the first data dir - removeAllManifests(new String[] {dataDirs[0]}, paths); + removeAllManifests(new String[]{ dataDirs[0]}, paths); // they will be still created for that first dir cluster.get(1).startup(); @@ -555,6 +559,90 @@ public class SnapshotsTest extends TestBaseImpl assertTrue(snapshots.get(0).contains("tagks1tbl")); } + @Test + public void testForcedSnapshot() throws Throwable + { + try (Cluster cluster = init(Cluster.build(1) + .withDataDirCount(3) // 3 dirs to disperse SSTables among different dirs + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk uuid primary key)"); + + cluster.get(1).runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> { + Keyspace.open("distributed_test_keyspace").getColumnFamilyStore("tbl").disableAutoCompaction(); + }); + + for (int i = 0; i < 10; i++) + { + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) values (?)", UUID.randomUUID()); + cluster.get(1).flush(KEYSPACE); + } + + takeEphemeralSnapshotWithSameName(cluster); + List<File> manifests1 = getManifests(cluster); + List<String> ssTablesFromManifest1 = getSSTablesFromManifest(manifests1.get(0)); + + for (int i = 0; i < 10; i++) + { + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) values (?)", UUID.randomUUID()); + cluster.get(1).flush(KEYSPACE); + } + takeEphemeralSnapshotWithSameName(cluster); + List<File> manifests2 = getManifests(cluster); + List<String> ssTablesFromManifest2 = getSSTablesFromManifest(manifests2.get(0)); + + assertEquals(manifests1, manifests2); + assertTrue(ssTablesFromManifest1.size() < ssTablesFromManifest2.size()); + assertTrue(ssTablesFromManifest2.containsAll(ssTablesFromManifest1)); + } + } + + private List<String> getSSTablesFromManifest(File manifest) throws Throwable + { + SnapshotManifest snapshotManifest = SnapshotManifest.deserializeFromJsonFile(manifest); + return snapshotManifest.getFiles(); + } + + private List<File> getManifests(Cluster cluster) + { + List<String> manifestsPaths = cluster.get(1).callOnInstance((SerializableCallable<List<String>>) () -> { + ColumnFamilyStore cfs = Keyspace.open("distributed_test_keyspace").getColumnFamilyStore("tbl"); + + List<String> allManifests = new ArrayList<>(); + for (File file : cfs.getDirectories().getSnapshotDirsWithoutCreation("a_snapshot")) + { + File maybeManifest = new File(file, "manifest.json"); + if (maybeManifest.exists()) + allManifests.add(maybeManifest.absolutePath()); + } + + assertEquals(3, allManifests.size()); // 3 because 3 data dirs + return allManifests; + }); + + List<File> manifests = new ArrayList<>(); + for (String manifest : manifestsPaths) + manifests.add(new File(manifest)); + + return manifests; + } + + private void takeEphemeralSnapshotWithSameName(Cluster cluster) + { + cluster.get(1).runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> { + try + { + SnapshotManager.instance.takeSnapshot(SnapshotOptions.systemSnapshot("a_snapshot", SnapshotType.REPAIR, (r) -> true, "distributed_test_keyspace.tbl") + .ephemeral() + .build()); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + }); + } + private void populate(Cluster cluster) { for (int i = 0; i < 100; i++) diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 5293149b1b..ee4fa293a2 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; - import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -63,6 +62,7 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.disk.usage.DiskUsageMonitor; +import org.apache.cassandra.service.snapshot.SnapshotManager; import org.apache.cassandra.service.snapshot.TableSnapshot; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.tcm.ClusterMetadata; @@ -76,7 +76,7 @@ import org.apache.cassandra.utils.concurrent.Condition; import org.apache.cassandra.utils.concurrent.Refs; import org.mockito.Mock; -import static org.apache.cassandra.ServerTestUtils.*; +import static org.apache.cassandra.ServerTestUtils.resetCMS; import static org.apache.cassandra.config.CassandraRelevantProperties.ORG_APACHE_CASSANDRA_DISABLE_MBEAN_REGISTRATION; import static org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY; import static org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY; @@ -307,6 +307,26 @@ public class ActiveRepairServiceTest } } + @Test + public void testForcedSnapshot() throws Throwable + { + ColumnFamilyStore store = prepareColumnFamilyStore(); + TimeUUID prsId = nextTimeUUID(); + Collection<Range<Token>> ranges = Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())); + ActiveRepairService.instance().registerParentRepairSession(prsId, FBUtilities.getBroadcastAddressAndPort(), Collections.singletonList(store), + ranges, true, System.currentTimeMillis(), false, PreviewKind.NONE); + + // snapshot twice, would not be possible before CASSANDRA-20490 + store.getRepairManager().snapshot(prsId.toString(), ranges, true); + store.getRepairManager().snapshot(prsId.toString(), ranges, true); + + List<TableSnapshot> snapshots = SnapshotManager.instance.getSnapshots(p -> p.getKeyspaceName().equals(store.getKeyspaceName()) && p.getTableName().equals(store.getTableName())); + Assert.assertEquals(1, snapshots.size()); + TableSnapshot snapshot = snapshots.get(0); + Assert.assertTrue(snapshot.isEphemeral()); + Assert.assertEquals(prsId.toString(), snapshot.getTag()); + } + private ColumnFamilyStore prepareColumnFamilyStore() { Keyspace keyspace = Keyspace.open(KEYSPACE5); @@ -314,6 +334,7 @@ public class ActiveRepairServiceTest store.truncateBlocking(); store.disableAutoCompaction(); createSSTables(store, 10); + SnapshotManager.instance.clearAllSnapshots(); return store; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org