This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e81b2f54b4cb05ea25720a8a481ec951a20b809a Merge: aa644c9dfa 016dd6ca37 Author: Stefan Miklosovic <[email protected]> AuthorDate: Mon Jan 22 12:44:59 2024 +0100 Merge branch 'cassandra-5.0' into trunk CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 18 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 88 +- .../cassandra/db/ColumnFamilyStoreMBean.java | 41 +- .../org/apache/cassandra/db/SSTableImporter.java | 72 +- .../apache/cassandra/db/memtable/TrieMemtable.java | 19 +- .../db/streaming/CassandraStreamReceiver.java | 2 +- src/java/org/apache/cassandra/index/Index.java | 13 +- .../cassandra/index/SecondaryIndexManager.java | 7 +- .../cassandra/index/sai/SSTableContextManager.java | 2 +- .../index/sai/StorageAttachedIndexBuilder.java | 2 +- .../index/sai/StorageAttachedIndexGroup.java | 12 +- .../index/sai/disk/format/IndexDescriptor.java | 35 +- .../sai/disk/v1/segment/SegmentTrieBuffer.java | 4 +- .../cassandra/index/sai/view/IndexViewManager.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 13 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 171 ++- src/java/org/apache/cassandra/tools/NodeProbe.java | 12 +- .../apache/cassandra/tools/nodetool/Import.java | 15 +- test/unit/org/apache/cassandra/db/ImportTest.java | 222 +++- .../org/apache/cassandra/index/sai/SAITester.java | 4 +- .../io/sstable/CQLSSTableWriterClientTest.java | 78 +- .../sstable/CQLSSTableWriterConcurrencyTest.java | 2 +- .../io/sstable/CQLSSTableWriterDaemonTest.java | 44 + .../cassandra/io/sstable/CQLSSTableWriterTest.java | 1209 ++++++++++++-------- 25 files changed, 1406 insertions(+), 682 deletions(-) diff --cc CHANGES.txt index ebf48c2315,a7859ee9ec..290185e085 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,5 +1,14 @@@ -5.0-beta2 +5.1 + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688) + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104) + * Make nodetool tablestats use number of significant digits for time and average values consistently (CASSANDRA-19015) + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875) + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330) + * Add ELAPSED command to cqlsh (CASSANDRA-18861) + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781) + * Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787) +Merged from 5.0: + * Make CQLSSTableWriter to support building of SAI indexes (CASSANDRA-18714) * Append additional JVM options when using JDK17+ (CASSANDRA-19001) * Upgrade Python driver to 3.29.0 (CASSANDRA-19245) * Creating a SASI index after creating an SAI index does not break secondary index queries (CASSANDRA-18939) diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 1dc2687c40,bcf4dc7073..be6136dd4a --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -389,20 -383,22 +391,21 @@@ public class ColumnFamilyStore implemen // only update these runtime-modifiable settings if they have not been modified. if (!minCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.minCompactionThreshold = new DefaultValue(metadata().params.compaction.minCompactionThreshold()); + cfs.minCompactionThreshold = new DefaultValue<>(tableMetadata.params.compaction.minCompactionThreshold()); if (!maxCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.maxCompactionThreshold = new DefaultValue(metadata().params.compaction.maxCompactionThreshold()); + cfs.maxCompactionThreshold = new DefaultValue<>(tableMetadata.params.compaction.maxCompactionThreshold()); if (!crcCheckChance.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.crcCheckChance = new DefaultValue(metadata().params.crcCheckChance); - - compactionStrategyManager.maybeReloadParamsFromSchema(metadata().params.compaction); + cfs.crcCheckChance = new DefaultValue<>(tableMetadata.params.crcCheckChance); - indexManager.reload(); + compactionStrategyManager.maybeReloadParamsFromSchema(tableMetadata.params.compaction); - memtableFactory = metadata().params.memtable.factory(); + indexManager.reload(tableMetadata); + memtableFactory = tableMetadata.params.memtable.factory(); - switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, tableMetadata, Memtable::metadataUpdated); + if (DatabaseDescriptor.isDaemonInitialized()) - switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, Memtable::metadataUpdated); ++ switchMemtableOrNotify(FlushReason.SCHEMA_CHANGE, tableMetadata, Memtable::metadataUpdated); } public static Runnable getBackgroundCompactionTaskSubmitter() diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 60e2179c0f,7e503829aa..3211b9576a --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@@ -44,6 -47,10 +47,9 @@@ import org.apache.cassandra.cql3.statem import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; import org.apache.cassandra.db.Clustering; + import org.apache.cassandra.db.ColumnFamilyStore; + import org.apache.cassandra.db.Directories; -import org.apache.cassandra.db.Directories.DataDirectory; + import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.Slices; import org.apache.cassandra.db.marshal.AbstractType; @@@ -55,8 -63,9 +62,10 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; + import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; ++import org.apache.cassandra.schema.SchemaTransformation; import org.apache.cassandra.schema.SchemaTransformations; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; @@@ -65,7 -74,6 +74,9 @@@ import org.apache.cassandra.schema.Type import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; ++import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; ++import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JavaDriverUtils; @@@ -603,28 -638,54 +642,72 @@@ public class CQLSSTableWriter implement synchronized (CQLSSTableWriter.class) { String keyspaceName = schemaStatement.keyspace(); - - Schema.instance.transform(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(keyspaceName, - KeyspaceParams.simple(1), - Tables.none(), - Views.none(), - Types.none(), - UserFunctions.none()), true)); - - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspaceName); - - TableMetadata tableMetadata = ksm.tables.getNullable(schemaStatement.table()); ++ String tableName = schemaStatement.table(); + + Schema.instance.submit(SchemaTransformations.addKeyspace(KeyspaceMetadata.create(keyspaceName, + KeyspaceParams.simple(1), + Tables.none(), + Views.none(), + Types.none(), + UserFunctions.none()), true)); + + KeyspaceMetadata ksm = KeyspaceMetadata.create(keyspaceName, + KeyspaceParams.simple(1), + Tables.none(), + Views.none(), + Types.none(), + UserFunctions.none()); + - TableMetadata tableMetadata = ksm.tables.getNullable(schemaStatement.table()); ++ TableMetadata tableMetadata = Schema.instance.getTableMetadata(keyspaceName, tableName); if (tableMetadata == null) { Types types = createTypes(keyspaceName); - Schema.instance.transform(SchemaTransformations.addTypes(types, true)); + Schema.instance.submit(SchemaTransformations.addTypes(types, true)); tableMetadata = createTable(types); + Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true)); + + if (buildIndexes && !indexStatements.isEmpty()) + { - tableMetadata = applyIndexes(ksm.withSwapped(ksm.tables.with(tableMetadata))); - Keyspace ks = Keyspace.openWithoutSSTables(keyspaceName); - Directories directories = new Directories(tableMetadata, Collections.singleton(new DataDirectory(new File(directory.toPath())))); - ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(ks, - tableMetadata.name, - TableMetadataRef.forOfflineTools(tableMetadata), - directories, - false, - false, - true); - ks.initCfCustom(cfs); - - // this is the empty directory / leftover from times we initialized ColumnFamilyStore - // it will automatically create directories for keyspace and table on disk after initialization - // we set that directory to the destination of generated SSTables so we just remove empty directories here - try - { - new File(directory, keyspaceName).deleteRecursive(); - } - catch (UncheckedIOException ex) ++ // we need to commit keyspace metadata first so applyIndexes sees that keyspace from TCM ++ commitKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(tableMetadata))); ++ applyIndexes(keyspaceName); ++ } ++ ++ KeyspaceMetadata keyspaceMetadata = ClusterMetadata.current().schema.getKeyspaceMetadata(keyspaceName); ++ tableMetadata = keyspaceMetadata.tables.getNullable(tableName); ++ ++ Schema.instance.submit(SchemaTransformations.addTable(tableMetadata, true)); ++ } ++ ++ ColumnFamilyStore cfs = null; ++ if (buildIndexes && !indexStatements.isEmpty()) ++ { ++ KeyspaceMetadata keyspaceMetadata = ClusterMetadata.current().schema.getKeyspaceMetadata(keyspaceName); ++ Keyspace keyspace = Keyspace.mockKS(keyspaceMetadata); ++ Directories directories = new Directories(tableMetadata, Collections.singleton(new Directories.DataDirectory(new File(directory.toPath())))); ++ cfs = ColumnFamilyStore.createColumnFamilyStore(keyspace, ++ tableName, ++ tableMetadata, ++ directories, ++ false, ++ false); ++ ++ keyspace.initCfCustom(cfs); ++ ++ // this is the empty directory / leftover from times we initialized ColumnFamilyStore ++ // it will automatically create directories for keyspace and table on disk after initialization ++ // we set that directory to the destination of generated SSTables so we just remove empty directories here ++ try ++ { ++ new File(directory, keyspaceName).deleteRecursive(); ++ } ++ catch (UncheckedIOException ex) ++ { ++ if (!(ex.getCause() instanceof NoSuchFileException)) + { - if (!(ex.getCause() instanceof NoSuchFileException)) - { - throw ex; - } ++ throw ex; + } + } - - Schema.instance.transform(SchemaTransformations.addTable(tableMetadata, true)); } ModificationStatement preparedModificationStatement = prepareModificationStatement(); @@@ -637,6 -698,13 +720,13 @@@ if (format != null) writer.setSSTableFormatType(format); - if (buildIndexes && !indexStatements.isEmpty()) ++ if (buildIndexes && !indexStatements.isEmpty() && cfs != null) + { - StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(Schema.instance.getColumnFamilyStoreInstance(tableMetadata.id)); ++ StorageAttachedIndexGroup saiGroup = StorageAttachedIndexGroup.getIndexGroup(cfs); + if (saiGroup != null) + writer.addIndexGroup(saiGroup); + } + return new CQLSSTableWriter(writer, preparedModificationStatement, preparedModificationStatement.getBindVariables()); } } @@@ -654,6 -722,23 +744,29 @@@ return builder.build(); } + /** + * Applies any provided index definitions to the target table + * - * @param ksm the KeyspaceMetadata object that has the table defined - * @return an updated TableMetadata instance with the indexe create statements applied ++ * @param keyspaceName name of the keyspace to apply indexes for ++ * @return table metadata reflecting applied indexes + */ - private TableMetadata applyIndexes(KeyspaceMetadata ksm) ++ private void applyIndexes(String keyspaceName) + { + ClientState state = ClientState.forInternalCalls(); - Keyspaces keyspaces = Keyspaces.of(ksm); + + for (CreateIndexStatement.Raw statement : indexStatements) - keyspaces = statement.prepare(state).apply(keyspaces); ++ { ++ Keyspaces keyspaces = statement.prepare(state).apply(ClusterMetadata.current()); ++ commitKeyspaceMetadata(keyspaces.getNullable(keyspaceName)); ++ } ++ } + - return keyspaces.get(ksm.name).get().tables.get(schemaStatement.table()).get(); ++ private void commitKeyspaceMetadata(KeyspaceMetadata keyspaceMetadata) ++ { ++ SchemaTransformation schemaTransformation = metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(keyspaceMetadata); ++ ClusterMetadataService.instance().commit(new AlterSchema(schemaTransformation, Schema.instance)); + } + /** * Creates the table according to schema statement * diff --cc test/unit/org/apache/cassandra/db/ImportTest.java index 5cd0f15bdb,8f9c5de6fd..f6383b5ef8 --- a/test/unit/org/apache/cassandra/db/ImportTest.java +++ b/test/unit/org/apache/cassandra/db/ImportTest.java @@@ -45,14 -53,16 +53,18 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.PathUtils; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.TokenMetadata; + import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.transformations.Register; +import org.apache.cassandra.tcm.transformations.UnsafeJoin; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; + import static org.apache.lucene.codecs.CodecUtil.FOOTER_MAGIC; + import static org.apache.lucene.codecs.CodecUtil.writeBEInt; + import static org.apache.lucene.codecs.CodecUtil.writeBELong; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@@ -702,6 -721,215 +713,215 @@@ public class ImportTest extends CQLTest } } + @Test + public void mustNotFailOnBuiltSAIIndexesWhenRequiredTest() throws Throwable + { + try + { + schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, "sai_test")); - createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test"), "idx1"); ++ schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test")); + + for (int i = 0; i < 10; i++) + execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, "sai_test"), i, i); + + ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, "sai_test"); + Util.flush(cfs); + + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + cfs.clearUnsafe(); + + File backupDir = moveToBackupDir(sstables); + + assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + + SSTableImporter importer = new SSTableImporter(cfs); + SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()) + .copyData(true) + .failOnMissingIndex(true) + .build(); + assertTrue(importer.importNewSSTables(options).isEmpty()); + assertEquals(10, execute(String.format("SELECT * FROM %s.%s WHERE d >= 0", KEYSPACE, "sai_test")).size()); + } + finally + { + execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, "sai_test")); + } + } + + @Test + public void mustNotFailOnMissingSAIIndexWhenSAIDoesNotExistTest() throws Throwable + { + try + { + schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, "sai_less_test")); + + for (int i = 0; i < 10; i++) + execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, "sai_less_test"), i, i); + + ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, "sai_less_test"); + Util.flush(cfs); + + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + cfs.clearUnsafe(); + + File backupDir = moveToBackupDir(sstables); + + assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_less_test")).size()); + + SSTableImporter importer = new SSTableImporter(cfs); + SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()) + .copyData(true) + // this does not mean anything + // because our table does not have any SAI index + .failOnMissingIndex(true) + .build(); + assertTrue(importer.importNewSSTables(options).isEmpty()); + assertEquals(10, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_less_test")).size()); + } + finally + { + execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, "sai_less_test")); + } + } + + @Test + public void mustFailOnMissingSAIWhenRequiredTest() throws Throwable + { + File backupDir = null; + try + { + schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, "sai_test")); + + for (int i = 0; i < 10; i++) + execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, "sai_test"), i, i); + + ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, "sai_test"); + Util.flush(cfs); + + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + cfs.clearUnsafe(); + + backupDir = moveToBackupDir(sstables); + + assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + + // create index and load sstables, they will be without indexes (because we created + // data when index was not created yet) - createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test"), "idx1"); ++ schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test")); + + SSTableImporter importer = new SSTableImporter(cfs); + SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()) + .copyData(true) + .failOnMissingIndex(true) + .build(); + assertFalse(importer.importNewSSTables(options).isEmpty()); + assertEquals(0, execute(String.format("SELECT * FROM %s.%s WHERE d >= 0", KEYSPACE, "sai_test")).size()); + } + finally + { + if (backupDir != null) + { + backupDir.deleteRecursive(); + backupDir.parent().deleteRecursive(); + } + + execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, "sai_test")); + } + } + + @Test + public void skipIndexChecksumOnSAITest() throws Throwable + { + try + { + schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, "sai_test")); - createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test"), "idx1"); ++ schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test")); + + for (int i = 0; i < 10; i++) + execute(String.format("INSERT INTO %s.%s (id, d) values (?, ?)", KEYSPACE, "sai_test"), i, i); + + ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, "sai_test"); + Util.flush(cfs); + + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + cfs.clearUnsafe(); + + File backupDir = moveToBackupDir(sstables); + + File[] dataFiles = backupDir.list(f -> f.name().endsWith('-' + BigFormat.Components.DATA.type.repr)); + + IndexDescriptor indexDescriptor = IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]), + Murmur3Partitioner.instance, + Schema.instance.getTableMetadata(KEYSPACE, "sai_test").comparator); + IndexIdentifier indexIdentifier = new IndexIdentifier(KEYSPACE, "sai_test", "idx1"); + + // corrupt one of index files + try (IndexOutputWriter output = indexDescriptor.openPerIndexOutput(IndexComponent.COLUMN_COMPLETION_MARKER, indexIdentifier)) + { + SAICodecUtils.writeHeader(output); + output.writeByte((byte) 0); + // taken from SAICodecUtils#writeFooter + writeBEInt(output, FOOTER_MAGIC); + writeBEInt(output, 0); + writeBELong(output, 123); // some garbage checksum value to prove the point + } + + assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + + SSTableImporter importer = new SSTableImporter(cfs); + SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()) + .copyData(true) + .failOnMissingIndex(true) + .validateIndexChecksum(false) + .build(); + + // even with corrupted column completion marker (wrong checksum), it will import + assertTrue(importer.importNewSSTables(options).isEmpty()); + assertEquals(10, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + } + finally + { + execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, "sai_test")); + } + } + + @Test + public void skipEmptyIndexChecksumOnSAITest() throws Throwable + { + try + { + schemaChange(String.format("CREATE TABLE %s.%s (id int primary key, d int)", KEYSPACE, "sai_test")); - createIndexAndWait(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test"), "idx1"); ++ schemaChange(String.format("CREATE INDEX idx1 ON %s.%s (d) USING 'sai'", KEYSPACE, "sai_test")); + + // no data in indexed column = empty index + for (int i = 0; i < 10; i++) + execute(String.format("INSERT INTO %s.%s (id) values (?)", KEYSPACE, "sai_test"), i); + + ColumnFamilyStore cfs = getColumnFamilyStore(KEYSPACE, "sai_test"); + Util.flush(cfs); + + Set<SSTableReader> sstables = cfs.getLiveSSTables(); + cfs.clearUnsafe(); + + File backupDir = moveToBackupDir(sstables); + + assertEquals(0, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + + SSTableImporter importer = new SSTableImporter(cfs); + SSTableImporter.Options options = SSTableImporter.Options.options(backupDir.toString()) + .copyData(true) + .failOnMissingIndex(true) + .validateIndexChecksum(true) + .build(); + assertTrue(importer.importNewSSTables(options).isEmpty()); + assertEquals(10, execute(String.format("SELECT * FROM %s.%s", KEYSPACE, "sai_test")).size()); + } + finally + { + execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, "sai_test")); + } + } + private static class MockCFS extends ColumnFamilyStore { public MockCFS(ColumnFamilyStore cfs, Directories dirs) diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java index 7764da0184,a402059ed4..b7322aa1a3 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java @@@ -17,80 -17,37 +17,48 @@@ */ package org.apache.cassandra.io.sstable; + - import java.io.IOException; - import java.util.function.BiPredicate; - +import com.google.common.io.Files; - import org.apache.cassandra.io.util.File; import org.junit.After; import org.junit.Before; - import org.junit.Test; + import org.apache.cassandra.config.CassandraRelevantProperties; + import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.exceptions.InvalidRequestException; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.dht.ByteOrderedPartitioner; + import org.apache.cassandra.dht.IPartitioner; ++import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; - import static org.junit.Assert.assertEquals; - - public class CQLSSTableWriterClientTest + public class CQLSSTableWriterClientTest extends CQLSSTableWriterTest { + private File testDirectory; + private IPartitioner oldPartitioner; @Before -- public void setUp() - { - this.testDirectory = new File(Files.createTempDir()); - DatabaseDescriptor.clientInitialization(); - } - - @Test - public void testMultipleWritersWithDistinctTables() throws IOException ++ public void setup() { - testWriterInClientMode("table1", "table2"); - } ++ // setting this to true will execute a CQL query to table ++ // and this path is not enabled in client mode ++ verifyDataAfterLoading = false; + - @Test - public void testMultipleWritersWithSameTable() throws IOException - { - testWriterInClientMode("table1", "table1"); - } - - public void testWriterInClientMode(String table1, String table2) throws IOException, InvalidRequestException - { - String schema = "CREATE TABLE client_test.%s (" - + " k int PRIMARY KEY," - + " v1 text," - + " v2 int" - + ")"; - String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)"; - - CQLSSTableWriter writer = CQLSSTableWriter.builder() - .inDirectory(this.testDirectory) - .forTable(String.format(schema, table1)) - .using(String.format(insert, table1)).build(); - - CQLSSTableWriter writer2 = CQLSSTableWriter.builder() - .inDirectory(this.testDirectory) - .forTable(String.format(schema, table2)) - .using(String.format(insert, table2)).build(); - - writer.addRow(0, "A", 0); - writer2.addRow(0, "A", 0); - writer.addRow(1, "B", 1); - writer2.addRow(1, "B", 1); - writer.close(); - writer2.close(); - - BiPredicate<File, String> filter = (dir, name) -> name.endsWith("-Data.db"); - - File[] dataFiles = this.testDirectory.tryList(filter); - assertEquals(2, dataFiles.length); ++ this.testDirectory = new File(Files.createTempDir()); + DatabaseDescriptor.clientInitialization(true, + () -> { + Config config = new Config(); - config.data_file_directories = new String[]{ dataDir.absolutePath() }; ++ config.data_file_directories = new String[]{ testDirectory.absolutePath() }; + return config; + }); + CassandraRelevantProperties.FORCE_LOAD_LOCAL_KEYSPACES.setBoolean(true); + oldPartitioner = DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance); + Keyspace.setInitialized(); } @After public void tearDown() { + FileUtils.deleteRecursive(this.testDirectory); + DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner); } } diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java index 0000000000,8e96073705..64c617667c mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterDaemonTest.java @@@ -1,0 -1,40 +1,44 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -package org.apache.cassandra.io.sstable; + ++package org.apache.cassandra.io.sstable; + + import org.junit.BeforeClass; + + import org.apache.cassandra.SchemaLoader; ++import org.apache.cassandra.ServerTestUtils; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.commitlog.CommitLog; ++import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.service.StorageService; + + public class CQLSSTableWriterDaemonTest extends CQLSSTableWriterTest + { + @BeforeClass + public static void setup() throws Exception + { + DatabaseDescriptor.daemonInitialization(); + CommitLog.instance.start(); + SchemaLoader.cleanupAndLeaveDirs(); + Keyspace.setInitialized(); ++ ServerTestUtils.prepareServerNoRegister(); ++ MessagingService.instance().waitUntilListeningUnchecked(); + StorageService.instance.initServer(); + } + } diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index ac0d1d682a,de592283cc..03fbf965ae --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@@ -15,13 -15,20 +15,20 @@@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.io.sstable; + import java.io.IOException; import java.nio.ByteBuffer; - import java.util.*; + import java.util.Arrays; + import java.util.Collections; + import java.util.Iterator; + import java.util.LinkedHashMap; + import java.util.LinkedHashSet; + import java.util.List; + import java.util.Map; -import java.util.Set; + import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; @@@ -41,22 -44,29 +44,36 @@@ import org.junit.Test import org.junit.rules.TemporaryFolder; import com.datastax.driver.core.utils.UUIDs; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; - import org.apache.cassandra.config.*; - import org.apache.cassandra.cql3.*; - import org.apache.cassandra.cql3.functions.types.*; - import org.apache.cassandra.db.Keyspace; - import org.apache.cassandra.db.commitlog.CommitLog; + import org.apache.cassandra.cql3.QueryProcessor; + import org.apache.cassandra.cql3.UntypedResultSet; + import org.apache.cassandra.cql3.functions.types.DataType; + import org.apache.cassandra.cql3.functions.types.LocalDate; + import org.apache.cassandra.cql3.functions.types.TypeCodec; + import org.apache.cassandra.cql3.functions.types.UDTValue; + import org.apache.cassandra.cql3.functions.types.UserType; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.UTF8Type; - import org.apache.cassandra.dht.*; - import org.apache.cassandra.exceptions.*; - import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.dht.ByteOrderedPartitioner; + import org.apache.cassandra.dht.Murmur3Partitioner; ++import org.apache.cassandra.dht.Range; ++import org.apache.cassandra.dht.Token; + import org.apache.cassandra.exceptions.InvalidRequestException; + import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; + import org.apache.cassandra.index.sai.utils.IndexIdentifier; + import org.apache.cassandra.io.sstable.format.big.BigFormat; + import org.apache.cassandra.io.util.File; + import org.apache.cassandra.io.util.PathUtils; ++import org.apache.cassandra.locator.RangesAtEndpoint; ++import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; ++import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; - import org.apache.cassandra.service.StorageService; ++import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.transport.ProtocolVersion; - import org.apache.cassandra.utils.*; + import org.apache.cassandra.utils.ByteBufferUtil; ++import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JavaDriverUtils; ++import org.apache.cassandra.utils.OutputHandler; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.junit.Assert.assertEquals; @@@ -65,33 -75,20 +82,21 @@@ import static org.junit.Assert.assertNo import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - public class CQLSSTableWriterTest + @Ignore + public abstract class CQLSSTableWriterTest { private static final AtomicInteger idGen = new AtomicInteger(0); + private static final int NUMBER_WRITES_IN_RUNNABLE = 10; + - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - private String keyspace; - protected String table; + private String table; private String qualifiedTable; - protected File dataDir; + private File dataDir; - - static - { - DatabaseDescriptor.daemonInitialization(); - } ++ protected boolean verifyDataAfterLoading = true; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); - @BeforeClass - public static void setup() throws Exception - { - CommitLog.instance.start(); - SchemaLoader.cleanupAndLeaveDirs(); - Keyspace.setInitialized(); - ServerTestUtils.prepareServerNoRegister(); - MessagingService.instance().waitUntilListeningUnchecked(); - StorageService.instance.initServer(); - } - @Before public void perTestSetup() throws IOException { @@@ -105,13 -102,13 +110,13 @@@ @Test public void testUnsortedWriter() throws Exception { -- try (AutoCloseable switcher = Util.switchPartitioner(ByteOrderedPartitioner.instance)) ++ try (AutoCloseable ignored = Util.switchPartitioner(ByteOrderedPartitioner.instance)) { String schema = "CREATE TABLE " + qualifiedTable + " (" - + " k int PRIMARY KEY," - + " v1 text," - + " v2 int" - + ")"; + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) VALUES (?, ?, ?)"; CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) @@@ -125,34 -122,34 +130,37 @@@ writer.close(); -- loadSSTables(dataDir, keyspace); - - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(4, rs.size()); - - Iterator<UntypedResultSet.Row> iter = rs.iterator(); - UntypedResultSet.Row row; - - row = iter.next(); - assertEquals(0, row.getInt("k")); - assertEquals("test1", row.getString("v1")); - assertEquals(24, row.getInt("v2")); - - row = iter.next(); - assertEquals(1, row.getInt("k")); - assertEquals("test2", row.getString("v1")); - //assertFalse(row.has("v2")); - assertEquals(44, row.getInt("v2")); ++ loadSSTables(dataDir, keyspace, table); - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(4, rs.size()); - - Iterator<UntypedResultSet.Row> iter = rs.iterator(); - UntypedResultSet.Row row; - - row = iter.next(); - assertEquals(0, row.getInt("k")); - assertEquals("test1", row.getString("v1")); - assertEquals(24, row.getInt("v2")); - - row = iter.next(); - assertEquals(1, row.getInt("k")); - assertEquals("test2", row.getString("v1")); - //assertFalse(row.has("v2")); - assertEquals(44, row.getInt("v2")); - -- row = iter.next(); -- assertEquals(2, row.getInt("k")); -- assertEquals("test3", row.getString("v1")); -- assertEquals(42, row.getInt("v2")); -- -- row = iter.next(); -- assertEquals(3, row.getInt("k")); -- assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE -- assertEquals(12, row.getInt("v2")); ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(4, rs.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = rs.iterator(); ++ UntypedResultSet.Row row; ++ ++ row = iter.next(); ++ assertEquals(0, row.getInt("k")); ++ assertEquals("test1", row.getString("v1")); ++ assertEquals(24, row.getInt("v2")); ++ ++ row = iter.next(); ++ assertEquals(1, row.getInt("k")); ++ assertEquals("test2", row.getString("v1")); ++ //assertFalse(row.has("v2")); ++ assertEquals(44, row.getInt("v2")); ++ ++ row = iter.next(); ++ assertEquals(2, row.getInt("k")); ++ assertEquals("test3", row.getString("v1")); ++ assertEquals(42, row.getInt("v2")); ++ ++ row = iter.next(); ++ assertEquals(3, row.getInt("k")); ++ assertEquals(null, row.getBytes("v1")); // Using getBytes because we know it won't NPE ++ assertEquals(12, row.getInt("v2")); ++ } } } @@@ -245,8 -241,8 +253,12 @@@ + ")"; testUpdateStatement(); // start by adding some data -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(2, resultSet.size()); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(2, resultSet.size()); ++ } CQLSSTableWriter writer = CQLSSTableWriter.builder() .inDirectory(dataDir) @@@ -258,12 -254,12 +270,15 @@@ writer.addRow(1, 2, 3); writer.addRow(4, 5, 6); writer.close(); -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(0, resultSet.size()); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- assertFalse(iter.hasNext()); ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(0, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ assertFalse(iter.hasNext()); ++ } } @Test @@@ -292,32 -288,32 +307,35 @@@ writer.addRow(2, 8, 9, "d"); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(4, resultSet.size()); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(2, r1.getInt("c1")); -- assertEquals(3, r1.getInt("c2")); -- assertEquals("a", r1.getString("v")); -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals(1, r2.getInt("k")); -- assertEquals(4, r2.getInt("c1")); -- assertEquals(5, r2.getInt("c2")); -- assertEquals("b", r2.getString("v")); -- UntypedResultSet.Row r3 = iter.next(); -- assertEquals(1, r3.getInt("k")); -- assertEquals(6, r3.getInt("c1")); -- assertEquals(7, r3.getInt("c2")); -- assertEquals("c", r3.getString("v")); -- UntypedResultSet.Row r4 = iter.next(); -- assertEquals(2, r4.getInt("k")); -- assertEquals(8, r4.getInt("c1")); -- assertEquals(9, r4.getInt("c2")); -- assertEquals("d", r4.getString("v")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(4, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(2, r1.getInt("c1")); ++ assertEquals(3, r1.getInt("c2")); ++ assertEquals("a", r1.getString("v")); ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals(1, r2.getInt("k")); ++ assertEquals(4, r2.getInt("c1")); ++ assertEquals(5, r2.getInt("c2")); ++ assertEquals("b", r2.getString("v")); ++ UntypedResultSet.Row r3 = iter.next(); ++ assertEquals(1, r3.getInt("k")); ++ assertEquals(6, r3.getInt("c1")); ++ assertEquals(7, r3.getInt("c2")); ++ assertEquals("c", r3.getString("v")); ++ UntypedResultSet.Row r4 = iter.next(); ++ assertEquals(2, r4.getInt("k")); ++ assertEquals(8, r4.getInt("c1")); ++ assertEquals(9, r4.getInt("c2")); ++ assertEquals("d", r4.getString("v")); ++ assertFalse(iter.hasNext()); ++ } writer = CQLSSTableWriter.builder() .inDirectory(dataDir) @@@ -328,17 -324,17 +346,21 @@@ writer.addRow(1); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(1, resultSet.size()); -- iter = resultSet.iterator(); -- UntypedResultSet.Row r5 = iter.next(); -- assertEquals(2, r5.getInt("k")); -- assertEquals(8, r5.getInt("c1")); -- assertEquals(9, r5.getInt("c2")); -- assertEquals("d", r5.getString("v")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(1, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r5 = iter.next(); ++ assertEquals(2, r5.getInt("k")); ++ assertEquals(8, r5.getInt("c1")); ++ assertEquals(9, r5.getInt("c2")); ++ assertEquals("d", r5.getString("v")); ++ assertFalse(iter.hasNext()); ++ } } @Test @@@ -364,41 -360,41 +386,46 @@@ .using(String.format("DELETE FROM %s WHERE k=? AND c1=? and c2>=?", qualifiedTable)) .build(); -- updateWriter.addRow("v0.0", "a", 0, 0); -- updateWriter.addRow("v0.1", "a", 0, 1); -- updateWriter.addRow("v0.2", "a", 0, 2); -- updateWriter.addRow("v0.0", "b", 0, 0); -- updateWriter.addRow("v0.1", "b", 0, 1); -- updateWriter.addRow("v0.2", "b", 0, 2); -- updateWriter.close(); -- deleteWriter.addRow("a", 0, 1); -- deleteWriter.addRow("b", 0, 2); ++ if (verifyDataAfterLoading) ++ { ++ updateWriter.addRow("v0.0", "a", 0, 0); ++ updateWriter.addRow("v0.1", "a", 0, 1); ++ updateWriter.addRow("v0.2", "a", 0, 2); ++ updateWriter.addRow("v0.0", "b", 0, 0); ++ updateWriter.addRow("v0.1", "b", 0, 1); ++ updateWriter.addRow("v0.2", "b", 0, 2); ++ updateWriter.close(); ++ deleteWriter.addRow("a", 0, 1); ++ deleteWriter.addRow("b", 0, 2); ++ } ++ deleteWriter.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(3, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals("a", r1.getString("k")); -- assertEquals(0, r1.getInt("c1")); -- assertEquals(0, r1.getInt("c2")); -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals("b", r2.getString("k")); -- assertEquals(0, r2.getInt("c1")); -- assertEquals(0, r2.getInt("c2")); -- UntypedResultSet.Row r3 = iter.next(); -- assertEquals("b", r3.getString("k")); -- assertEquals(0, r3.getInt("c1")); -- assertEquals(1, r3.getInt("c2")); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(3, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals("a", r1.getString("k")); ++ assertEquals(0, r1.getInt("c1")); ++ assertEquals(0, r1.getInt("c2")); ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals("b", r2.getString("k")); ++ assertEquals(0, r2.getInt("c1")); ++ assertEquals(0, r2.getInt("c2")); ++ UntypedResultSet.Row r3 = iter.next(); ++ assertEquals("b", r3.getString("k")); ++ assertEquals(0, r3.getInt("c1")); ++ assertEquals(1, r3.getInt("c2")); ++ } } @Test public void testDeleteRangeEmptyKeyComponent() throws Exception { -- -- final String schema = "CREATE TABLE " + qualifiedTable + " (" + " k text," + " c1 int," @@@ -428,20 -424,20 +455,23 @@@ deleteWriter.addRow("a", 0); deleteWriter.addRow("b", 0); deleteWriter.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(2, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals("a", r1.getString("k")); -- assertEquals(1, r1.getInt("c1")); -- assertEquals(2, r1.getInt("c2")); -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals("b", r2.getString("k")); -- assertEquals(1, r2.getInt("c1")); -- assertEquals(2, r2.getInt("c2")); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(2, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals("a", r1.getString("k")); ++ assertEquals(1, r1.getInt("c1")); ++ assertEquals(2, r1.getInt("c2")); ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals("b", r2.getString("k")); ++ assertEquals(1, r2.getInt("c1")); ++ assertEquals(2, r2.getInt("c2")); ++ } } @Test @@@ -482,79 -478,35 +512,43 @@@ updateWriter.addRow("v0.3", "b", 3, 4); updateWriter.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(2, resultSet.size()); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row insertedRow = iter.next(); -- assertEquals("v0.2", insertedRow.getString("v")); -- assertEquals("a", insertedRow.getString("k")); -- assertEquals(1, insertedRow.getInt("c1")); -- assertEquals(2, insertedRow.getInt("c2")); -- UntypedResultSet.Row updatedRow = iter.next(); -- assertEquals("v0.3", updatedRow.getString("v")); -- assertEquals("b", updatedRow.getString("k")); -- assertEquals(3, updatedRow.getInt("c1")); -- assertEquals(4, updatedRow.getInt("c2")); -- -- deleteWriter.addRow("a", 1, 2); -- deleteWriter.addRow("b", 3, 4); - deleteWriter.close(); - loadSSTables(dataDir, keyspace); - - resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(1, resultSet.size()); - iter = resultSet.iterator(); - UntypedResultSet.Row modifiedRow = iter.next(); - assertFalse(modifiedRow.has("v")); - assertEquals("a", modifiedRow.getString("k")); - assertEquals(1, modifiedRow.getInt("c1")); - assertEquals(2, modifiedRow.getInt("c2")); - } - - private static final int NUMBER_WRITES_IN_RUNNABLE = 10; - private class WriterThread extends Thread - { - private final File dataDir; - private final int id; - private final String qualifiedTable; - public volatile Exception exception; ++ loadSSTables(dataDir, keyspace, table); + - public WriterThread(File dataDir, int id, String qualifiedTable) ++ if (verifyDataAfterLoading) + { - this.dataDir = dataDir; - this.id = id; - this.qualifiedTable = qualifiedTable; ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(2, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row insertedRow = iter.next(); ++ assertEquals("v0.2", insertedRow.getString("v")); ++ assertEquals("a", insertedRow.getString("k")); ++ assertEquals(1, insertedRow.getInt("c1")); ++ assertEquals(2, insertedRow.getInt("c2")); ++ UntypedResultSet.Row updatedRow = iter.next(); ++ assertEquals("v0.3", updatedRow.getString("v")); ++ assertEquals("b", updatedRow.getString("k")); ++ assertEquals(3, updatedRow.getInt("c1")); ++ assertEquals(4, updatedRow.getInt("c2")); ++ ++ deleteWriter.addRow("a", 1, 2); ++ deleteWriter.addRow("b", 3, 4); ++ + } + - @Override - public void run() - { - String schema = "CREATE TABLE " + qualifiedTable + " (" - + " k int," - + " v int," - + " PRIMARY KEY (k, v)" - + ")"; - String insert = "INSERT INTO " + qualifiedTable + " (k, v) VALUES (?, ?)"; - CQLSSTableWriter writer = CQLSSTableWriter.builder() - .inDirectory(dataDir) - .forTable(schema) - .using(insert).build(); + deleteWriter.close(); - loadSSTables(dataDir, keyspace); - - resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(1, resultSet.size()); - iter = resultSet.iterator(); - UntypedResultSet.Row modifiedRow = iter.next(); - assertFalse(modifiedRow.has("v")); - assertEquals("a", modifiedRow.getString("k")); - assertEquals(1, modifiedRow.getInt("c1")); - assertEquals(2, modifiedRow.getInt("c2")); ++ loadSSTables(dataDir, keyspace, table); + - try - { - for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++) - { - writer.addRow(id, i); - } - writer.close(); - } - catch (Exception e) - { - exception = e; - } ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(1, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row modifiedRow = iter.next(); ++ assertFalse(modifiedRow.has("v")); ++ assertEquals("a", modifiedRow.getString("k")); ++ assertEquals(1, modifiedRow.getInt("c1")); ++ assertEquals(2, modifiedRow.getInt("c2")); + } } @Test @@@ -578,10 -530,10 +572,13 @@@ } } -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); -- assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); ++ assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size()); ++ } } @Test @@@ -623,30 -575,31 +620,34 @@@ } writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); -- TypeCodec collectionCodec = JavaDriverUtils.codecFor(DataType.CollectionType.list(tuple2Type)); -- TypeCodec tuple3Codec = JavaDriverUtils.codecFor(tuple3Type); - - assertEquals(resultSet.size(), 100); - int cnt = 0; - for (UntypedResultSet.Row row: resultSet) { - assertEquals(cnt, - row.getInt("k")); - List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"), - ProtocolVersion.CURRENT); - assertEquals(values.get(0).getInt("a"), cnt * 10); - assertEquals(values.get(0).getInt("b"), cnt * 20); - assertEquals(values.get(1).getInt("a"), cnt * 30); - assertEquals(values.get(1).getInt("b"), cnt * 40); - - UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT); - - assertEquals(v2.getInt("a"), cnt * 100); - assertEquals(v2.getInt("b"), cnt * 200); - assertEquals(v2.getInt("c"), cnt * 300); - cnt++; ++ loadSSTables(dataDir, keyspace, table); + - assertEquals(resultSet.size(), 100); - int cnt = 0; - for (UntypedResultSet.Row row : resultSet) ++ if (verifyDataAfterLoading) + { - assertEquals(cnt, - row.getInt("k")); - List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"), - ProtocolVersion.CURRENT); - assertEquals(values.get(0).getInt("a"), cnt * 10); - assertEquals(values.get(0).getInt("b"), cnt * 20); - assertEquals(values.get(1).getInt("a"), cnt * 30); - assertEquals(values.get(1).getInt("b"), cnt * 40); - - UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT); - - assertEquals(v2.getInt("a"), cnt * 100); - assertEquals(v2.getInt("b"), cnt * 200); - assertEquals(v2.getInt("c"), cnt * 300); - cnt++; ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); ++ TypeCodec collectionCodec = JavaDriverUtils.codecFor(DataType.CollectionType.list(tuple2Type)); ++ TypeCodec tuple3Codec = JavaDriverUtils.codecFor(tuple3Type); ++ ++ assertEquals(resultSet.size(), 100); ++ int cnt = 0; ++ for (UntypedResultSet.Row row : resultSet) ++ { ++ assertEquals(cnt, ++ row.getInt("k")); ++ List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"), ++ ProtocolVersion.CURRENT); ++ assertEquals(values.get(0).getInt("a"), cnt * 10); ++ assertEquals(values.get(0).getInt("b"), cnt * 20); ++ assertEquals(values.get(1).getInt("a"), cnt * 30); ++ assertEquals(values.get(1).getInt("b"), cnt * 40); ++ ++ UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.CURRENT); ++ ++ assertEquals(v2.getInt("a"), cnt * 100); ++ assertEquals(v2.getInt("b"), cnt * 200); ++ assertEquals(v2.getInt("c"), cnt * 300); ++ cnt++; ++ } } } @@@ -687,23 -640,24 +688,27 @@@ } writer.close(); -- loadSSTables(dataDir, keyspace); - - UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); - - assertEquals(resultSet.size(), 100); - int cnt = 0; - for (UntypedResultSet.Row row: resultSet) { - assertEquals(cnt, - row.getInt("k")); - UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"), - ProtocolVersion.CURRENT); - assertEquals(nestedTpl.getInt("c"), cnt * 100); - UDTValue tpl = nestedTpl.getUDTValue("tpl"); - assertEquals(tpl.getInt("a"), cnt * 200); - assertEquals(tpl.getInt("b"), cnt * 300); - - cnt++; ++ loadSSTables(dataDir, keyspace, table); + - UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); - - assertEquals(resultSet.size(), 100); - int cnt = 0; - for (UntypedResultSet.Row row : resultSet) ++ if (verifyDataAfterLoading) + { - assertEquals(cnt, - row.getInt("k")); - UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"), - ProtocolVersion.CURRENT); - assertEquals(nestedTpl.getInt("c"), cnt * 100); - UDTValue tpl = nestedTpl.getUDTValue("tpl"); - assertEquals(tpl.getInt("a"), cnt * 200); - assertEquals(tpl.getInt("b"), cnt * 300); - - cnt++; ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + keyspace + "." + table); ++ ++ assertEquals(resultSet.size(), 100); ++ int cnt = 0; ++ for (UntypedResultSet.Row row : resultSet) ++ { ++ assertEquals(cnt, ++ row.getInt("k")); ++ UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"), ++ ProtocolVersion.CURRENT); ++ assertEquals(nestedTpl.getInt("c"), cnt * 100); ++ UDTValue tpl = nestedTpl.getUDTValue("tpl"); ++ assertEquals(tpl.getInt("a"), cnt * 200); ++ assertEquals(tpl.getInt("b"), cnt * 300); ++ ++ cnt++; ++ } } } @@@ -771,36 -725,36 +776,39 @@@ writer.addRow(5, 5, 5, "5"); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(1, r1.getInt("c1")); -- assertEquals(1, r1.getInt("c2")); -- assertEquals(false, r1.has("v")); -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals(2, r2.getInt("k")); -- assertEquals(2, r2.getInt("c1")); -- assertEquals(2, r2.getInt("c2")); -- assertEquals(false, r2.has("v")); -- UntypedResultSet.Row r3 = iter.next(); -- assertEquals(3, r3.getInt("k")); -- assertEquals(3, r3.getInt("c1")); -- assertEquals(3, r3.getInt("c2")); -- assertEquals(false, r3.has("v")); -- UntypedResultSet.Row r4 = iter.next(); -- assertEquals(4, r4.getInt("k")); -- assertEquals(4, r4.getInt("c1")); -- assertEquals(4, r4.getInt("c2")); -- assertEquals(false, r3.has("v")); -- UntypedResultSet.Row r5 = iter.next(); -- assertEquals(5, r5.getInt("k")); -- assertEquals(5, r5.getInt("c1")); -- assertEquals(5, r5.getInt("c2")); -- assertEquals(true, r5.has("v")); -- assertEquals("5", r5.getString("v")); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(1, r1.getInt("c1")); ++ assertEquals(1, r1.getInt("c2")); ++ assertEquals(false, r1.has("v")); ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals(2, r2.getInt("k")); ++ assertEquals(2, r2.getInt("c1")); ++ assertEquals(2, r2.getInt("c2")); ++ assertEquals(false, r2.has("v")); ++ UntypedResultSet.Row r3 = iter.next(); ++ assertEquals(3, r3.getInt("k")); ++ assertEquals(3, r3.getInt("c1")); ++ assertEquals(3, r3.getInt("c2")); ++ assertEquals(false, r3.has("v")); ++ UntypedResultSet.Row r4 = iter.next(); ++ assertEquals(4, r4.getInt("k")); ++ assertEquals(4, r4.getInt("c1")); ++ assertEquals(4, r4.getInt("c2")); ++ assertEquals(false, r3.has("v")); ++ UntypedResultSet.Row r5 = iter.next(); ++ assertEquals(5, r5.getInt("k")); ++ assertEquals(5, r5.getInt("c1")); ++ assertEquals(5, r5.getInt("c2")); ++ assertEquals(true, r5.has("v")); ++ assertEquals("5", r5.getString("v")); ++ } } @Test @@@ -826,23 -780,23 +834,26 @@@ writer.addRow(null, 7, 8, 9); writer.addRow(CQLSSTableWriter.UNSET_VALUE, 10, 11, 12); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(2, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(2, r1.getInt("c1")); -- assertEquals(3, r1.getInt("c2")); -- assertEquals("a", r1.getString("v")); -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals(4, r2.getInt("k")); -- assertEquals(5, r2.getInt("c1")); -- assertEquals(6, r2.getInt("c2")); -- assertEquals("b", r2.getString("v")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(2, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(2, r1.getInt("c1")); ++ assertEquals(3, r1.getInt("c2")); ++ assertEquals("a", r1.getString("v")); ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals(4, r2.getInt("k")); ++ assertEquals(5, r2.getInt("c1")); ++ assertEquals(6, r2.getInt("c2")); ++ assertEquals("b", r2.getString("v")); ++ assertFalse(iter.hasNext()); ++ } } @Test @@@ -866,25 -820,25 +877,28 @@@ writer.addRow(4, 5, 6, "efg"); writer.close(); -- loadSSTables(dataDir, keyspace); - - UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(2, resultSet.size()); - - Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); - UntypedResultSet.Row r1 = iter.next(); - assertEquals(1, r1.getInt("k")); - assertEquals(2, r1.getInt("c1")); - assertEquals(3, r1.getInt("c2")); - assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v")); ++ loadSSTables(dataDir, keyspace, table); - UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); - assertEquals(2, resultSet.size()); - - Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); - UntypedResultSet.Row r1 = iter.next(); - assertEquals(1, r1.getInt("k")); - assertEquals(2, r1.getInt("c1")); - assertEquals(3, r1.getInt("c2")); - assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v")); - -- UntypedResultSet.Row r2 = iter.next(); -- assertEquals(4, r2.getInt("k")); -- assertEquals(5, r2.getInt("c1")); -- assertEquals(6, r2.getInt("c2")); -- assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v")); -- -- assertFalse(iter.hasNext()); ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(2, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(2, r1.getInt("c1")); ++ assertEquals(3, r1.getInt("c2")); ++ assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v")); ++ ++ UntypedResultSet.Row r2 = iter.next(); ++ assertEquals(4, r2.getInt("k")); ++ assertEquals(5, r2.getInt("c1")); ++ assertEquals(6, r2.getInt("c2")); ++ assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v")); ++ ++ assertFalse(iter.hasNext()); ++ } } @Test @@@ -913,10 -867,10 +927,13 @@@ } writer.close(); -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(100, resultSet.size()); ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(100, resultSet.size()); ++ } } @Test @@@ -944,16 -899,17 +962,20 @@@ writer.addRow(i + ID_OFFSET, LocalDate.fromDaysSinceEpoch(i)); } writer.close(); -- loadSSTables(dataDir, keyspace); - - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); - assertEquals(200, rs.size()); - Map<Integer, LocalDate> map = StreamSupport.stream(rs.spliterator(), false) - .collect(Collectors.toMap( r -> r.getInt("k"), r -> r.getDate("c"))); - for (int i = 0; i < 100; i++) { - final LocalDate expected = LocalDate.fromDaysSinceEpoch(i); - assertEquals(expected, map.get(i + ID_OFFSET)); - assertEquals(expected, map.get(i)); ++ loadSSTables(dataDir, keyspace, table); + - UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); - assertEquals(200, rs.size()); - Map<Integer, LocalDate> map = StreamSupport.stream(rs.spliterator(), false) - .collect(Collectors.toMap(r -> r.getInt("k"), r -> r.getDate("c"))); - for (int i = 0; i < 100; i++) ++ if (verifyDataAfterLoading) + { - final LocalDate expected = LocalDate.fromDaysSinceEpoch(i); - assertEquals(expected, map.get(i + ID_OFFSET)); - assertEquals(expected, map.get(i)); ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); ++ assertEquals(200, rs.size()); ++ Map<Integer, LocalDate> map = StreamSupport.stream(rs.spliterator(), false) ++ .collect(Collectors.toMap(r -> r.getInt("k"), r -> r.getDate("c"))); ++ for (int i = 0; i < 100; i++) ++ { ++ final LocalDate expected = LocalDate.fromDaysSinceEpoch(i); ++ assertEquals(expected, map.get(i + ID_OFFSET)); ++ assertEquals(expected, map.get(i)); ++ } } } @@@ -988,33 -944,33 +1010,36 @@@ writer.addRow(String.valueOf(i), map); } writer.close(); -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); -- assertEquals(200, rs.size()); -- Map<String, Map<String, String>> map = StreamSupport.stream(rs.spliterator(), false) -- .collect(Collectors.toMap(r -> r.getString("k"), r -> r.getFrozenMap("c", UTF8Type.instance, UTF8Type.instance))); -- for (int i = 0; i < 200; i++) ++ if (verifyDataAfterLoading) { -- final String expectedKey = String.valueOf(i); -- assertTrue(map.containsKey(expectedKey)); -- Map<String, String> innerMap = map.get(expectedKey); -- assertTrue(innerMap.containsKey("a_key")); -- assertEquals(innerMap.get("a_key"), "av" + i); -- assertTrue(innerMap.containsKey("b_key")); -- assertEquals(innerMap.get("b_key"), "zv" + i); -- } ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); ++ assertEquals(200, rs.size()); ++ Map<String, Map<String, String>> map = StreamSupport.stream(rs.spliterator(), false) ++ .collect(Collectors.toMap(r -> r.getString("k"), r -> r.getFrozenMap("c", UTF8Type.instance, UTF8Type.instance))); ++ for (int i = 0; i < 200; i++) ++ { ++ final String expectedKey = String.valueOf(i); ++ assertTrue(map.containsKey(expectedKey)); ++ Map<String, String> innerMap = map.get(expectedKey); ++ assertTrue(innerMap.containsKey("a_key")); ++ assertEquals(innerMap.get("a_key"), "av" + i); ++ assertTrue(innerMap.containsKey("b_key")); ++ assertEquals(innerMap.get("b_key"), "zv" + i); ++ } -- // Make sure we can filter with map values regardless of which order we put the keys in -- UntypedResultSet filtered; -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='0' and c={'a_key': 'av0', 'b_key': 'zv0'};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='0' and c={'b_key': 'zv0', 'a_key': 'av0'};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='100' and c={'b_key': 'zv100', 'a_key': 'av100'};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='100' and c={'a_key': 'av100', 'b_key': 'zv100'};"); -- assertEquals(1, filtered.size()); ++ // Make sure we can filter with map values regardless of which order we put the keys in ++ UntypedResultSet filtered; ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='0' and c={'a_key': 'av0', 'b_key': 'zv0'};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='0' and c={'b_key': 'zv0', 'a_key': 'av0'};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='100' and c={'b_key': 'zv100', 'a_key': 'av100'};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='100' and c={'a_key': 'av100', 'b_key': 'zv100'};"); ++ assertEquals(1, filtered.size()); ++ } } @Test @@@ -1049,26 -1005,26 +1074,29 @@@ writer.addRow(String.valueOf(2), map2); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); -- assertEquals(2, rs.size()); -- -- // Make sure we can filter with map values regardless of which order we put the keys in -- UntypedResultSet filtered; -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid1 + ": 1, " + uuid2 + ": 2};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid2 + ": 2, " + uuid1 + ": 1};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + uuid4 + ": 2};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + uuid3 + ": 1};"); -- assertEquals(1, filtered.size()); -- UUID other = UUIDs.startOf(1234L); // Just some other TimeUUID -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + other + ": 2};"); -- assertEquals(0, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + other + ": 1};"); -- assertEquals(0, filtered.size()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); ++ assertEquals(2, rs.size()); ++ ++ // Make sure we can filter with map values regardless of which order we put the keys in ++ UntypedResultSet filtered; ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid1 + ": 1, " + uuid2 + ": 2};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid2 + ": 2, " + uuid1 + ": 1};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + uuid4 + ": 2};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + uuid3 + ": 1};"); ++ assertEquals(1, filtered.size()); ++ UUID other = UUIDs.startOf(1234L); // Just some other TimeUUID ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid3 + ": 1, " + other + ": 2};"); ++ assertEquals(0, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid4 + ": 2, " + other + ": 1};"); ++ assertEquals(0, filtered.size()); ++ } } @Test @@@ -1101,26 -1057,26 +1129,29 @@@ writer.addRow(String.valueOf(2), set2); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); -- assertEquals(2, rs.size()); -- -- // Make sure we can filter with map values regardless of which order we put the keys in -- UntypedResultSet filtered; -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid1 + ", " + uuid2 + "};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid2 + ", " + uuid1 + "};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid1 + ", " + uuid2 + "};"); -- assertEquals(1, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid2 + ", " + uuid1 + "};"); -- assertEquals(1, filtered.size()); -- UUID other = UUIDs.startOf(10000000L + 1L); // Pick one that's really close just to make sure clustering filters are working -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid1 + ", " + other + "};"); -- assertEquals(0, filtered.size()); -- filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + other + ", " + uuid1 + "};"); -- assertEquals(0, filtered.size()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + ";"); ++ assertEquals(2, rs.size()); ++ ++ // Make sure we can filter with map values regardless of which order we put the keys in ++ UntypedResultSet filtered; ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid1 + ", " + uuid2 + "};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='1' and c={" + uuid2 + ", " + uuid1 + "};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid1 + ", " + uuid2 + "};"); ++ assertEquals(1, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid2 + ", " + uuid1 + "};"); ++ assertEquals(1, filtered.size()); ++ UUID other = UUIDs.startOf(10000000L + 1L); // Pick one that's really close just to make sure clustering filters are working ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + uuid1 + ", " + other + "};"); ++ assertEquals(0, filtered.size()); ++ filtered = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable + " where k='2' and c={" + other + ", " + uuid1 + "};"); ++ assertEquals(0, filtered.size()); ++ } } @Test @@@ -1145,22 -1101,23 +1176,26 @@@ // Note that, all other things being equal, Cassandra will sort these rows lexicographically, so we use "higher" values in the // row we expect to "win" so that we're sure that it isn't just accidentally picked due to the row sorting. - writer.addRow( 1, 4, 5, "b", now); // This write should be the one found at the end because it has a higher timestamp - writer.addRow( 1, 2, 3, "a", then); + writer.addRow(1, 4, 5, "b", now); // This write should be the one found at the end because it has a higher timestamp + writer.addRow(1, 2, 3, "a", then); writer.close(); -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(1, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(4, r1.getInt("v1")); -- assertEquals(5, r1.getInt("v2")); -- assertEquals("b", r1.getString("v3")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(1, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(4, r1.getInt("v1")); ++ assertEquals(5, r1.getInt("v2")); ++ assertEquals("b", r1.getString("v3")); ++ assertFalse(iter.hasNext()); ++ } } + @Test public void testWriteWithTtl() throws Exception { @@@ -1173,30 -1130,31 +1208,34 @@@ + ")"; CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() - .inDirectory(dataDir) - .forTable(schema) - .using("INSERT INTO " + qualifiedTable + - " (k, v1, v2, v3) VALUES (?,?,?,?) using TTL ?"); + .inDirectory(dataDir) + .forTable(schema) + .using("INSERT INTO " + qualifiedTable + + " (k, v1, v2, v3) VALUES (?,?,?,?) using TTL ?"); CQLSSTableWriter writer = builder.build(); // add a row that _should_ show up - 1 hour TTL - writer.addRow( 1, 2, 3, "a", 3600); + writer.addRow(1, 2, 3, "a", 3600); // Insert a row with a TTL of 1 second - should not appear in results once we sleep - writer.addRow( 2, 4, 5, "b", 1); + writer.addRow(2, 4, 5, "b", 1); writer.close(); Thread.sleep(1200); // Slightly over 1 second, just to make sure -- loadSSTables(dataDir, keyspace); -- -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(1, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(2, r1.getInt("v1")); -- assertEquals(3, r1.getInt("v2")); -- assertEquals("a", r1.getString("v3")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(1, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(2, r1.getInt("v1")); ++ assertEquals(3, r1.getInt("v2")); ++ assertEquals("a", r1.getString("v3")); ++ assertFalse(iter.hasNext()); ++ } } + @Test public void testWriteWithTimestampsAndTtl() throws Exception { @@@ -1220,23 -1178,23 +1259,27 @@@ long oneSecondFromNow = TimeUnit.MILLISECONDS.toMicros(currentTimeMillis() + 1000); // Insert some rows with a timestamp of 1 second from now, and different TTLs // add a row that _should_ show up - 1 hour TTL - writer.addRow( 1, 2, 3, "a", oneSecondFromNow, 3600); + writer.addRow(1, 2, 3, "a", oneSecondFromNow, 3600); // Insert a row "two seconds ago" with a TTL of 1 second - should not appear in results - writer.addRow( 2, 4, 5, "b", oneSecondFromNow, 1); + writer.addRow(2, 4, 5, "b", oneSecondFromNow, 1); writer.close(); -- loadSSTables(dataDir, keyspace); -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- Thread.sleep(1200); -- resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(1, resultSet.size()); -- -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- UntypedResultSet.Row r1 = iter.next(); -- assertEquals(1, r1.getInt("k")); -- assertEquals(2, r1.getInt("v1")); -- assertEquals(3, r1.getInt("v2")); -- assertEquals("a", r1.getString("v3")); -- assertFalse(iter.hasNext()); ++ loadSSTables(dataDir, keyspace, table); ++ ++ if (verifyDataAfterLoading) ++ { ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ Thread.sleep(1200); ++ resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(1, resultSet.size()); ++ ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ UntypedResultSet.Row r1 = iter.next(); ++ assertEquals(1, r1.getInt("k")); ++ assertEquals(2, r1.getInt("v1")); ++ assertEquals(3, r1.getInt("v2")); ++ assertEquals("a", r1.getString("v3")); ++ assertFalse(iter.hasNext()); ++ } } @Test @@@ -1258,15 -1216,15 +1301,18 @@@ writer.addRow(i, UUID.randomUUID().toString()); } writer.close(); -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(rowCount, resultSet.size()); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- for (int i = 0; i < rowCount; i++) ++ if (verifyDataAfterLoading) { -- UntypedResultSet.Row row = iter.next(); -- assertEquals(i, row.getInt("k")); ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(rowCount, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ for (int i = 0; i < rowCount; i++) ++ { ++ UntypedResultSet.Row row = iter.next(); ++ assertEquals(i, row.getInt("k")); ++ } } } @@@ -1301,37 -1259,203 +1347,226 @@@ assertTrue("The file size should be close to 1MiB (with at most 50KiB error rate for the test)", Math.abs(1024 * 1024 - closeTo1MiBFileSize) < 50 * 1024); -- loadSSTables(dataDir, keyspace); ++ loadSSTables(dataDir, keyspace, table); -- UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); -- assertEquals(rowCount, resultSet.size()); -- Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); -- for (int i = 0; i < rowCount; i++) ++ if (verifyDataAfterLoading) { -- UntypedResultSet.Row row = iter.next(); -- assertEquals(i, row.getInt("k")); ++ UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + qualifiedTable); ++ assertEquals(rowCount, resultSet.size()); ++ Iterator<UntypedResultSet.Row> iter = resultSet.iterator(); ++ for (int i = 0; i < rowCount; i++) ++ { ++ UntypedResultSet.Row row = iter.next(); ++ assertEquals(i, row.getInt("k")); ++ } } } - private static void loadSSTables(File dataDir, String ks) throws ExecutionException, InterruptedException + @Test + public void testMultipleWritersWithDistinctTables() throws IOException + { - testWriters("table1", "table2"); ++ testWriterInClientMode("table1", "table2"); + } + + @Test + public void testMultipleWritersWithSameTable() throws IOException + { - testWriters("table1", "table1"); ++ testWriterInClientMode("table1", "table1"); + } + - private void testWriters(String table1, String table2) throws IOException, InvalidRequestException ++ public void testWriterInClientMode(String table1, String table2) throws IOException, InvalidRequestException + { + String schema = "CREATE TABLE client_test.%s (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; - + String insert = "INSERT INTO client_test.%s (k, v1, v2) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(String.format(schema, table1)) - .using(String.format(insert, table1)) - .build(); ++ .using(String.format(insert, table1)).build(); + + CQLSSTableWriter writer2 = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(String.format(schema, table2)) - .using(String.format(insert, table2)) - .build(); ++ .using(String.format(insert, table2)).build(); + + writer.addRow(0, "A", 0); + writer2.addRow(0, "A", 0); + writer.addRow(1, "B", 1); + writer2.addRow(1, "B", 1); + writer.close(); + writer2.close(); + + BiPredicate<File, String> filter = (dir, name) -> name.endsWith("-Data.db"); + + File[] dataFiles = dataDir.tryList(filter); + assertEquals(2, dataFiles.length); + } + + @Test + public void testWriteWithSAI() throws Exception + { + writeWithSaiInternal(); + writeWithSaiInternal(); + } + + private void writeWithSaiInternal() throws Exception + { + String schema = "CREATE TABLE " + qualifiedTable + " (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int )"; + + String v1Index = "CREATE INDEX idx1 ON " + qualifiedTable + " (v1) USING 'sai'"; + String v2Index = "CREATE INDEX idx2 ON " + qualifiedTable + " (v2) USING 'sai'"; + + String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using(insert) + .withIndexes(v1Index, v2Index) + .withBuildIndexes(true) + .withPartitioner(Murmur3Partitioner.instance) + .build(); + + int rowCount = 30_000; + for (int i = 0; i < rowCount; i++) + writer.addRow(i, UUID.randomUUID().toString(), i); + + writer.close(); + + File[] dataFiles = dataDir.list(f -> f.name().endsWith('-' + BigFormat.Components.DATA.type.repr)); + assertNotNull(dataFiles); + + IndexDescriptor indexDescriptor = IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]), + Murmur3Partitioner.instance, + Schema.instance.getTableMetadata(keyspace, table).comparator); + + assertTrue(indexDescriptor.isPerColumnIndexBuildComplete(new IndexIdentifier(keyspace, table, "idx1"))); + assertTrue(indexDescriptor.isPerColumnIndexBuildComplete(new IndexIdentifier(keyspace, table, "idx2"))); + + if (PathUtils.isDirectory(dataDir.toPath())) + PathUtils.forEach(dataDir.toPath(), PathUtils::deleteRecursive); + } + + @Test + public void testSkipBuildingIndexesWithSAI() throws Exception + { + String schema = "CREATE TABLE " + qualifiedTable + " (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int )"; + + String v1Index = "CREATE INDEX idx1 ON " + qualifiedTable + " (v1) USING 'sai'"; + String v2Index = "CREATE INDEX idx2 ON " + qualifiedTable + " (v2) USING 'sai'"; + + String insert = "INSERT INTO " + qualifiedTable + " (k, v1, v2) VALUES (?, ?, ?)"; + + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using(insert) + .withIndexes(v1Index, v2Index) + // not building indexes here so no SAI components will be present + .withBuildIndexes(false) + .withPartitioner(Murmur3Partitioner.instance) + .build(); + + int rowCount = 30_000; + for (int i = 0; i < rowCount; i++) + writer.addRow(i, UUID.randomUUID().toString(), i); + + writer.close(); + + File[] dataFiles = dataDir.list(f -> f.name().endsWith('-' + BigFormat.Components.DATA.type.repr)); + assertNotNull(dataFiles); + + IndexDescriptor indexDescriptor = IndexDescriptor.create(Descriptor.fromFile(dataFiles[0]), + Murmur3Partitioner.instance, + Schema.instance.getTableMetadata(keyspace, table).comparator); + + // no indexes built due to withBuildIndexes set to false + assertFalse(indexDescriptor.isPerColumnIndexBuildComplete(new IndexIdentifier(keyspace, table, "idx1"))); + assertFalse(indexDescriptor.isPerColumnIndexBuildComplete(new IndexIdentifier(keyspace, table, "idx2"))); + } + - protected void loadSSTables(File dataDir, String ksName) ++ protected static void loadSSTables(File dataDir, final String ks, final String tb) throws ExecutionException, InterruptedException { - ColumnFamilyStore cfs = Keyspace.openWithoutSSTables(ksName).getColumnFamilyStore(table); - Set<String> dataFilePaths = Set.of(dataDir.absolutePath()); - cfs.importNewSSTables(dataFilePaths, false, false, false, - false, false, false, false, - true, false); + SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client() + { + private String keyspace; + ++ @Override + public void init(String keyspace) + { + this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalReplicas(ks).ranges()) ++ ++ KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keyspace); ++ ++ RangesAtEndpoint addressReplicas = keyspaceMetadata.replicationStrategy.getAddressReplicas(ClusterMetadata.current(), FBUtilities.getBroadcastAddressAndPort()); ++ ++ for (Range<Token> range : addressReplicas.ranges()) + addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); + } + - public TableMetadataRef getTableMetadata(String cfName) ++ @Override ++ public TableMetadataRef getTableMetadata(String tableName) + { - return Schema.instance.getTableMetadataRef(keyspace, cfName); ++ KeyspaceMetadata keyspaceMetadata = ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace); ++ TableMetadata tableMetadata = keyspaceMetadata.tables.getNullable(tableName); ++ assert tableMetadata != null; ++ return tableMetadata.ref; + } - }, new OutputHandler.SystemOutput(false, false)); ++ }, new OutputHandler.SystemOutput(false, false), 1, ks, tb); + + loader.stream().get(); } + + private class WriterThread extends Thread + { + private final File dataDir; + private final int id; + private final String qualifiedTable; + public volatile Exception exception; + + public WriterThread(File dataDir, int id, String qualifiedTable) + { + this.dataDir = dataDir; + this.id = id; + this.qualifiedTable = qualifiedTable; + } + + @Override + public void run() + { + String schema = "CREATE TABLE " + qualifiedTable + " (" + + " k int," + + " v int," + + " PRIMARY KEY (k, v)" + + ")"; + String insert = "INSERT INTO " + qualifiedTable + " (k, v) VALUES (?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .using(insert).build(); + + try + { + for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++) + { + writer.addRow(id, i); + } + writer.close(); + } + catch (Exception e) + { + exception = e; + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
