This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new c00c454 CASSANDRA-19507 Fix bulk reads of multiple tables that potentially have the same data file name (#47) c00c454 is described below commit c00c454d698e5a29caf58e61ed52ab48d08fd7fe Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Mon Apr 1 12:11:52 2024 -0700 CASSANDRA-19507 Fix bulk reads of multiple tables that potentially have the same data file name (#47) When reading multiple data frames using bulk reader from different tables, it is possible to encounter a data file name being retrieved from the same Sidecar instance. Because the `SSTable`s are cached in the `SSTableCache`, it is possible that the `org.apache.cassandra.spark.reader.SSTableReader` uses the incorrect `SSTable` if it was cached with the same `#hashCode`. In this patch, the equality takes into account the keyspace, table, and snapshot name. Additionally, we implement the `hashCode` and `equals` method in `org.apache.cassandra.clients.SidecarInstanceImpl` to utilize the `SSTableCache` correctly. Once the methods are implemented, the issue originally described in JIRA is surfaced. Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19507 --- CHANGES.txt | 1 + .../cassandra/clients/SidecarInstanceImpl.java | 21 +++ .../spark/data/SidecarProvisionedSSTable.java | 29 ++-- .../spark/data/SidecarProvisionedSSTableTest.java | 170 +++++++++++++++++++++ .../analytics/QuoteIdentifiersReadTest.java | 26 +++- .../analytics/ReadDifferentTablesTest.java | 123 +++++++++++++++ 6 files changed, 354 insertions(+), 16 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 718e1d4..914d933 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Fix bulk reads of multiple tables that potentially have the same data file name (CASSANDRA-19507) * Fix XXHash32Digest calculated digest value (CASSANDRA-19500) * Report additional bulk analytics job stats for instrumentation (CASSANDRA-19418) * Add certificate expiry check to start up validations done in Cassandra Analytics library (CASSANDRA-19424) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java index bb2020a..d73dc2e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java @@ -86,6 +86,27 @@ public class SidecarInstanceImpl implements Serializable, SidecarInstance return String.format("SidecarInstanceImpl{hostname='%s', port=%d}", hostname, port); } + @Override + public boolean equals(Object object) + { + if (this == object) + { + return true; + } + if (object == null || getClass() != object.getClass()) + { + return false; + } + SidecarInstanceImpl that = (SidecarInstanceImpl) object; + return port == that.port && Objects.equals(hostname, that.hostname); + } + + @Override + public int hashCode() + { + return Objects.hash(port, hostname); + } + // JDK Serialization private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java index 648c74f..6e4ff0f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java @@ -243,30 +243,39 @@ public class SidecarProvisionedSSTable extends SSTable @Override public String toString() { - return String.format("{\"hostname\"=\"%s\", \"port\"=\"%d\", \"dataFileName\"=\"%s\", \"partitionId\"=\"%d\"}", - instance.hostname(), instance.port(), dataFileName, partitionId); + return "SidecarProvisionedSSTable{" + + "hostname='" + instance.hostname() + '\'' + + ", port=" + instance.port() + + ", keyspace='" + keyspace + '\'' + + ", table='" + table + '\'' + + ", snapshotName='" + snapshotName + '\'' + + ", dataFileName='" + dataFileName + '\'' + + ", partitionId=" + partitionId + + '}'; } @Override public int hashCode() { - return Objects.hash(instance, dataFileName); + return Objects.hash(instance, keyspace, table, snapshotName, dataFileName); } @Override - public boolean equals(Object other) + public boolean equals(Object object) { - if (this == other) + if (this == object) { return true; } - if (other == null || this.getClass() != other.getClass()) + if (object == null || getClass() != object.getClass()) { return false; } - - SidecarProvisionedSSTable that = (SidecarProvisionedSSTable) other; - return this.instance.equals(that.instance) - && this.dataFileName.equals(that.dataFileName); + SidecarProvisionedSSTable that = (SidecarProvisionedSSTable) object; + return Objects.equals(instance, that.instance) + && Objects.equals(keyspace, that.keyspace) + && Objects.equals(table, that.table) + && Objects.equals(snapshotName, that.snapshotName) + && Objects.equals(dataFileName, that.dataFileName); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java new file mode 100644 index 0000000..0ce0e4f --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data; + +import java.util.Collections; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import o.a.c.sidecar.client.shaded.common.data.ListSnapshotFilesResponse; +import org.apache.cassandra.clients.Sidecar; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.spark.stats.Stats; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link SidecarProvisionedSSTable} + */ +class SidecarProvisionedSSTableTest +{ + SidecarClient mockSidecarClient; + private Sidecar.ClientConfig sidecarClientConfig; + + @BeforeEach + void setup() + { + mockSidecarClient = mock(SidecarClient.class); + sidecarClientConfig = Sidecar.ClientConfig.create(); + } + + // SidecarProvisionedSSTable are cached in SSTableCache, so we need to correctly implement + // equality and hash code + @Test + void testEqualityAndHashCode() + { + SSTable ssTable10 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable20 = prepareTable("localhost1", 9043, "keyspace2", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable30 = prepareTable("localhost1", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable40 = prepareTable("localhost2", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable50 = prepareTable("localhost1", 9044, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable60 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-2-big-Data.db"); + + // These are the same as the previous SSTables + SSTable ssTable11 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable21 = prepareTable("localhost1", 9043, "keyspace2", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable31 = prepareTable("localhost1", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable41 = prepareTable("localhost2", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable51 = prepareTable("localhost1", 9044, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable61 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-2-big-Data.db"); + + assertThat(ssTable10).isNotEqualTo(ssTable20) + .isNotEqualTo(ssTable30) + .isNotEqualTo(ssTable40) + .isNotEqualTo(ssTable50) + .isNotEqualTo(ssTable60); + + assertThat(ssTable20).isNotEqualTo(ssTable30) + .isNotEqualTo(ssTable40) + .isNotEqualTo(ssTable50) + .isNotEqualTo(ssTable60); + + assertThat(ssTable30).isNotEqualTo(ssTable40) + .isNotEqualTo(ssTable50) + .isNotEqualTo(ssTable60); + + assertThat(ssTable40).isNotEqualTo(ssTable50) + .isNotEqualTo(ssTable60); + + assertThat(ssTable50).isNotEqualTo(ssTable60); + + assertThat(ssTable10).isEqualTo(ssTable11); + assertThat(ssTable20).isEqualTo(ssTable21); + assertThat(ssTable30).isEqualTo(ssTable31); + assertThat(ssTable40).isEqualTo(ssTable41); + assertThat(ssTable50).isEqualTo(ssTable51); + assertThat(ssTable60).isEqualTo(ssTable61); + } + + @Test + void testToString() + { + SSTable ssTable10 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable20 = prepareTable("localhost1", 9043, "keyspace2", "table1", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable30 = prepareTable("localhost1", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable40 = prepareTable("localhost2", 9043, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable50 = prepareTable("localhost1", 9044, "keyspace1", "table2", "snapshot1", "na-1-big-Data.db"); + SSTable ssTable60 = prepareTable("localhost1", 9043, "keyspace1", "table1", "snapshot1", "na-2-big-Data.db"); + + assertThat(ssTable10.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1', port=9043, " + + "keyspace='keyspace1', table='table1', snapshotName='snapshot1', " + + "dataFileName='na-1-big-Data.db', partitionId=1}"); + + assertThat(ssTable20.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1', port=9043, " + + "keyspace='keyspace2', table='table1', snapshotName='snapshot1', " + + "dataFileName='na-1-big-Data.db', partitionId=1}"); + + assertThat(ssTable30.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1', port=9043, " + + "keyspace='keyspace1', table='table2', snapshotName='snapshot1', " + + "dataFileName='na-1-big-Data.db', partitionId=1}"); + + assertThat(ssTable40.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost2', port=9043, " + + "keyspace='keyspace1', table='table2', snapshotName='snapshot1', " + + "dataFileName='na-1-big-Data.db', partitionId=1}"); + + assertThat(ssTable50.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1', port=9044, " + + "keyspace='keyspace1', table='table2', snapshotName='snapshot1', " + + "dataFileName='na-1-big-Data.db', partitionId=1}"); + + assertThat(ssTable60.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1', port=9043, " + + "keyspace='keyspace1', table='table1', snapshotName='snapshot1', " + + "dataFileName='na-2-big-Data.db', partitionId=1}"); + } + + @ParameterizedTest + @ValueSource(strings = { "bad1bigData.db", "na-1-big.db" }) + void failsOnBadDataFileName(String dataFileName) + { + assertThatExceptionOfType(ArrayIndexOutOfBoundsException.class) + .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", dataFileName)); + } + + SSTable prepareTable(String sidecarHostName, + int sidecarPort, + String keyspace, + String table, + String snapshot, + String dataFileName) + { + ListSnapshotFilesResponse.FileInfo fileInfo = new ListSnapshotFilesResponse.FileInfo(5, + sidecarHostName, + sidecarPort, + 1, + snapshot, + keyspace, + table, + dataFileName); + return new SidecarProvisionedSSTable(mockSidecarClient, + sidecarClientConfig, + new SidecarInstanceImpl(sidecarHostName, sidecarPort), + keyspace, + table, + snapshot, + Collections.singletonMap(FileType.DATA, fileInfo), + 1, + Stats.DoNothingStats.INSTANCE); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java index 65721a5..89513e3 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java @@ -22,7 +22,10 @@ package org.apache.cassandra.analytics; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -76,10 +79,11 @@ class QuoteIdentifiersReadTest extends SharedClusterSparkIntegrationTestBase List<Row> rowList = data.collectAsList().stream() .sorted(Comparator.comparing(row -> row.getString(0))) .collect(Collectors.toList()); + int uniqueNumberForTest = nameToUniqueNumber.get(tableName); for (int i = 0; i < DATASET.size(); i++) { - assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i)); - assertThat(rowList.get(i).getInt(1)).isEqualTo(i); + assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i) + "_" + uniqueNumberForTest); + assertThat(rowList.get(i).getInt(1)).isEqualTo((uniqueNumberForTest * 100) + i); } } @@ -92,11 +96,12 @@ class QuoteIdentifiersReadTest extends SharedClusterSparkIntegrationTestBase List<Row> rowList = data.collectAsList().stream() .sorted(Comparator.comparing(row -> row.getString(0))) .collect(Collectors.toList()); + int uniqueNumberForTest = nameToUniqueNumber.get(TABLE_NAME_FOR_UDT_TEST); for (int i = 0; i < DATASET.size(); i++) { Row row = rowList.get(i); - assertThat(row.getString(0)).isEqualTo(DATASET.get(i)); - assertThat(row.getInt(1)).isEqualTo(i); + assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i) + "_" + uniqueNumberForTest); + assertThat(rowList.get(i).getInt(1)).isEqualTo((uniqueNumberForTest * 100) + i); assertThat(row.getStruct(2).getLong(0)).isEqualTo(i); // from UdT1 TimE column assertThat(row.getStruct(2).getInt(1)).isEqualTo(i); // from UdT1 limit column (limit is a reserved word) } @@ -160,13 +165,19 @@ class QuoteIdentifiersReadTest extends SharedClusterSparkIntegrationTestBase populateTableWithUdt(TABLE_NAME_FOR_UDT_TEST, DATASET); } + static final AtomicInteger uniqueNumber = new AtomicInteger(); + static final Map<QualifiedName, Integer> nameToUniqueNumber = new HashMap<>(); + void populateTable(QualifiedName tableName, List<String> values) { + int uniqueNumberForTest = uniqueNumber.incrementAndGet(); + nameToUniqueNumber.put(tableName, uniqueNumberForTest); for (int i = 0; i < values.size(); i++) { String value = values.get(i); String query = String.format("INSERT INTO %s (\"IdEnTiFiEr\", IdEnTiFiEr) " + - "VALUES ('%s', %d);", tableName, value, i); + "VALUES ('%s', %d);", tableName, value + "_" + uniqueNumberForTest, + ((uniqueNumberForTest * 100) + i)); cluster.getFirstRunningInstance() .coordinator() .execute(query, ConsistencyLevel.ALL); @@ -175,12 +186,15 @@ class QuoteIdentifiersReadTest extends SharedClusterSparkIntegrationTestBase void populateTableWithUdt(QualifiedName tableName, List<String> dataset) { + int uniqueNumberForTest = uniqueNumber.incrementAndGet(); + nameToUniqueNumber.put(tableName, uniqueNumberForTest); for (int i = 0; i < dataset.size(); i++) { String value = dataset.get(i); String query = String.format("INSERT INTO %s (\"IdEnTiFiEr\", IdEnTiFiEr, \"User_Defined_Type\") " + "VALUES ('%s', %d, { \"TimE\" : %d, \"limit\" : %d });", - tableName, value, i, i, i); + tableName, value + "_" + uniqueNumberForTest, + ((uniqueNumberForTest * 100) + i), i, i); cluster.getFirstRunningInstance() .coordinator() .execute(query, ConsistencyLevel.ALL); diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java new file mode 100644 index 0000000..f046dc2 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; + +import com.vdurmont.semver4j.Semver; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.Versions; +import org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.TestVersion; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test that reads different tables with different schemas within the same test + */ +class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase +{ + static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); + QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE); + QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE); + + @Test + void testReadingFromTwoDifferentTables() + { + Dataset<Row> dataForTable1 = bulkReaderDataFrame(table1).load(); + Dataset<Row> dataForTable2 = bulkReaderDataFrame(table2).load(); + + assertThat(dataForTable1.count()).isEqualTo(DATASET.size()); + assertThat(dataForTable2.count()).isEqualTo(DATASET.size()); + + List<Row> rowList1 = dataForTable1.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(0))) + .collect(Collectors.toList()); + + List<Row> rowList2 = dataForTable2.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getLong(1))) + .collect(Collectors.toList()); + + for (int i = 0; i < DATASET.size(); i++) + { + assertThat(rowList1.get(i).getInt(0)).isEqualTo(i); + assertThat(rowList1.get(i).getString(1)).isEqualTo(DATASET.get(i)); + assertThat(rowList2.get(i).getString(0)).isEqualTo(DATASET.get(i)); + assertThat(rowList2.get(i).getLong(1)).isEqualTo(i); + } + } + + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY KEY, name text);"); + createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text PRIMARY KEY, value bigint);"); + + IInstance firstRunningInstance = cluster.getFirstRunningInstance(); + for (int i = 0; i < DATASET.size(); i++) + { + String value = DATASET.get(i); + String query1 = String.format("INSERT INTO %s (id, name) VALUES (%d, '%s');", table1, i, value); + String query2 = String.format("INSERT INTO %s (name, value) VALUES ('%s', %d);", table2, value, i); + + firstRunningInstance.coordinator().execute(query1, ConsistencyLevel.ALL); + firstRunningInstance.coordinator().execute(query2, ConsistencyLevel.ALL); + } + } + + @Override + protected UpgradeableCluster provisionCluster(TestVersion testVersion) throws IOException + { + // spin up a C* cluster using the in-jvm dtest + Versions versions = Versions.find(); + Versions.Version requestedVersion = versions.getLatest(new Semver(testVersion.version(), Semver.SemverType.LOOSE)); + + UpgradeableCluster.Builder clusterBuilder = + UpgradeableCluster.build(1) + .withDynamicPortAllocation(true) + .withVersion(requestedVersion) + .withDCs(1) + .withDataDirCount(1) + .withSharedClasses(JvmDTestSharedClassesPredicate.INSTANCE) + .withConfig(config -> config.with(Feature.NATIVE_PROTOCOL) + .with(Feature.GOSSIP) + .with(Feature.JMX)); + TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(1, clusterBuilder.getTokenCount()); + clusterBuilder.withTokenSupplier(tokenSupplier); + return clusterBuilder.start(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org