This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c00c454  CASSANDRA-19507 Fix bulk reads of multiple tables that 
potentially have the same data file name (#47)
c00c454 is described below

commit c00c454d698e5a29caf58e61ed52ab48d08fd7fe
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Mon Apr 1 12:11:52 2024 -0700

    CASSANDRA-19507 Fix bulk reads of multiple tables that potentially have the 
same data file name (#47)
    
    When reading multiple data frames using bulk reader from different tables, 
it is possible to encounter a data
    file name being retrieved from the same Sidecar instance. Because the 
`SSTable`s are cached in the `SSTableCache`,
    it is possible that the `org.apache.cassandra.spark.reader.SSTableReader` 
uses the incorrect `SSTable` if it was
    cached with the same `#hashCode`.
    
    In this patch, the equality takes into account the keyspace, table, and 
snapshot name.
    
    Additionally, we implement the `hashCode` and `equals` method in 
`org.apache.cassandra.clients.SidecarInstanceImpl` to utilize the 
`SSTableCache` correctly. Once the methods are implemented, the issue 
originally described in JIRA is surfaced.
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19507
---
 CHANGES.txt                                        |   1 +
 .../cassandra/clients/SidecarInstanceImpl.java     |  21 +++
 .../spark/data/SidecarProvisionedSSTable.java      |  29 ++--
 .../spark/data/SidecarProvisionedSSTableTest.java  | 170 +++++++++++++++++++++
 .../analytics/QuoteIdentifiersReadTest.java        |  26 +++-
 .../analytics/ReadDifferentTablesTest.java         | 123 +++++++++++++++
 6 files changed, 354 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 718e1d4..914d933 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Fix bulk reads of multiple tables that potentially have the same data file 
name (CASSANDRA-19507)
  * Fix XXHash32Digest calculated digest value (CASSANDRA-19500)
  * Report additional bulk analytics job stats for instrumentation 
(CASSANDRA-19418)
  * Add certificate expiry check to start up validations done in Cassandra 
Analytics library (CASSANDRA-19424)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java
index bb2020a..d73dc2e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/SidecarInstanceImpl.java
@@ -86,6 +86,27 @@ public class SidecarInstanceImpl implements Serializable, 
SidecarInstance
         return String.format("SidecarInstanceImpl{hostname='%s', port=%d}", 
hostname, port);
     }
 
+    @Override
+    public boolean equals(Object object)
+    {
+        if (this == object)
+        {
+            return true;
+        }
+        if (object == null || getClass() != object.getClass())
+        {
+            return false;
+        }
+        SidecarInstanceImpl that = (SidecarInstanceImpl) object;
+        return port == that.port && Objects.equals(hostname, that.hostname);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(port, hostname);
+    }
+
     // JDK Serialization
 
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index 648c74f..6e4ff0f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -243,30 +243,39 @@ public class SidecarProvisionedSSTable extends SSTable
     @Override
     public String toString()
     {
-        return String.format("{\"hostname\"=\"%s\", \"port\"=\"%d\", 
\"dataFileName\"=\"%s\", \"partitionId\"=\"%d\"}",
-                             instance.hostname(), instance.port(), 
dataFileName, partitionId);
+        return "SidecarProvisionedSSTable{" +
+               "hostname='" + instance.hostname() + '\'' +
+               ", port=" + instance.port() +
+               ", keyspace='" + keyspace + '\'' +
+               ", table='" + table + '\'' +
+               ", snapshotName='" + snapshotName + '\'' +
+               ", dataFileName='" + dataFileName + '\'' +
+               ", partitionId=" + partitionId +
+               '}';
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(instance, dataFileName);
+        return Objects.hash(instance, keyspace, table, snapshotName, 
dataFileName);
     }
 
     @Override
-    public boolean equals(Object other)
+    public boolean equals(Object object)
     {
-        if (this == other)
+        if (this == object)
         {
             return true;
         }
-        if (other == null || this.getClass() != other.getClass())
+        if (object == null || getClass() != object.getClass())
         {
             return false;
         }
-
-        SidecarProvisionedSSTable that = (SidecarProvisionedSSTable) other;
-        return this.instance.equals(that.instance)
-            && this.dataFileName.equals(that.dataFileName);
+        SidecarProvisionedSSTable that = (SidecarProvisionedSSTable) object;
+        return Objects.equals(instance, that.instance)
+               && Objects.equals(keyspace, that.keyspace)
+               && Objects.equals(table, that.table)
+               && Objects.equals(snapshotName, that.snapshotName)
+               && Objects.equals(dataFileName, that.dataFileName);
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
new file mode 100644
index 0000000..0ce0e4f
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTableTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import java.util.Collections;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import o.a.c.sidecar.client.shaded.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import org.apache.cassandra.spark.stats.Stats;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link SidecarProvisionedSSTable}
+ */
+class SidecarProvisionedSSTableTest
+{
+    SidecarClient mockSidecarClient;
+    private Sidecar.ClientConfig sidecarClientConfig;
+
+    @BeforeEach
+    void setup()
+    {
+        mockSidecarClient = mock(SidecarClient.class);
+        sidecarClientConfig = Sidecar.ClientConfig.create();
+    }
+
+    // SidecarProvisionedSSTable are cached in SSTableCache, so we need to 
correctly implement
+    // equality and hash code
+    @Test
+    void testEqualityAndHashCode()
+    {
+        SSTable ssTable10 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable20 = prepareTable("localhost1", 9043, "keyspace2", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable30 = prepareTable("localhost1", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable40 = prepareTable("localhost2", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable50 = prepareTable("localhost1", 9044, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable60 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-2-big-Data.db");
+
+        // These are the same as the previous SSTables
+        SSTable ssTable11 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable21 = prepareTable("localhost1", 9043, "keyspace2", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable31 = prepareTable("localhost1", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable41 = prepareTable("localhost2", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable51 = prepareTable("localhost1", 9044, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable61 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-2-big-Data.db");
+
+        assertThat(ssTable10).isNotEqualTo(ssTable20)
+                             .isNotEqualTo(ssTable30)
+                             .isNotEqualTo(ssTable40)
+                             .isNotEqualTo(ssTable50)
+                             .isNotEqualTo(ssTable60);
+
+        assertThat(ssTable20).isNotEqualTo(ssTable30)
+                             .isNotEqualTo(ssTable40)
+                             .isNotEqualTo(ssTable50)
+                             .isNotEqualTo(ssTable60);
+
+        assertThat(ssTable30).isNotEqualTo(ssTable40)
+                             .isNotEqualTo(ssTable50)
+                             .isNotEqualTo(ssTable60);
+
+        assertThat(ssTable40).isNotEqualTo(ssTable50)
+                             .isNotEqualTo(ssTable60);
+
+        assertThat(ssTable50).isNotEqualTo(ssTable60);
+
+        assertThat(ssTable10).isEqualTo(ssTable11);
+        assertThat(ssTable20).isEqualTo(ssTable21);
+        assertThat(ssTable30).isEqualTo(ssTable31);
+        assertThat(ssTable40).isEqualTo(ssTable41);
+        assertThat(ssTable50).isEqualTo(ssTable51);
+        assertThat(ssTable60).isEqualTo(ssTable61);
+    }
+
+    @Test
+    void testToString()
+    {
+        SSTable ssTable10 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable20 = prepareTable("localhost1", 9043, "keyspace2", 
"table1", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable30 = prepareTable("localhost1", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable40 = prepareTable("localhost2", 9043, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable50 = prepareTable("localhost1", 9044, "keyspace1", 
"table2", "snapshot1", "na-1-big-Data.db");
+        SSTable ssTable60 = prepareTable("localhost1", 9043, "keyspace1", 
"table1", "snapshot1", "na-2-big-Data.db");
+
+        
assertThat(ssTable10.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1',
 port=9043, " +
+                                                   "keyspace='keyspace1', 
table='table1', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-1-big-Data.db', partitionId=1}");
+
+        
assertThat(ssTable20.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1',
 port=9043, " +
+                                                   "keyspace='keyspace2', 
table='table1', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-1-big-Data.db', partitionId=1}");
+
+        
assertThat(ssTable30.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1',
 port=9043, " +
+                                                   "keyspace='keyspace1', 
table='table2', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-1-big-Data.db', partitionId=1}");
+
+        
assertThat(ssTable40.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost2',
 port=9043, " +
+                                                   "keyspace='keyspace1', 
table='table2', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-1-big-Data.db', partitionId=1}");
+
+        
assertThat(ssTable50.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1',
 port=9044, " +
+                                                   "keyspace='keyspace1', 
table='table2', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-1-big-Data.db', partitionId=1}");
+
+        
assertThat(ssTable60.toString()).isEqualTo("SidecarProvisionedSSTable{hostname='localhost1',
 port=9043, " +
+                                                   "keyspace='keyspace1', 
table='table1', snapshotName='snapshot1', " +
+                                                   
"dataFileName='na-2-big-Data.db', partitionId=1}");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = { "bad1bigData.db", "na-1-big.db" })
+    void failsOnBadDataFileName(String dataFileName)
+    {
+        assertThatExceptionOfType(ArrayIndexOutOfBoundsException.class)
+        .isThrownBy(() -> prepareTable("localhost", 9043, "ks", "tbl", "snap", 
dataFileName));
+    }
+
+    SSTable prepareTable(String sidecarHostName,
+                         int sidecarPort,
+                         String keyspace,
+                         String table,
+                         String snapshot,
+                         String dataFileName)
+    {
+        ListSnapshotFilesResponse.FileInfo fileInfo = new 
ListSnapshotFilesResponse.FileInfo(5,
+                                                                               
              sidecarHostName,
+                                                                               
              sidecarPort,
+                                                                               
              1,
+                                                                               
              snapshot,
+                                                                               
              keyspace,
+                                                                               
              table,
+                                                                               
              dataFileName);
+        return new SidecarProvisionedSSTable(mockSidecarClient,
+                                             sidecarClientConfig,
+                                             new 
SidecarInstanceImpl(sidecarHostName, sidecarPort),
+                                             keyspace,
+                                             table,
+                                             snapshot,
+                                             
Collections.singletonMap(FileType.DATA, fileInfo),
+                                             1,
+                                             Stats.DoNothingStats.INSTANCE);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
index 65721a5..89513e3 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
@@ -22,7 +22,10 @@ package org.apache.cassandra.analytics;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -76,10 +79,11 @@ class QuoteIdentifiersReadTest extends 
SharedClusterSparkIntegrationTestBase
         List<Row> rowList = data.collectAsList().stream()
                                 .sorted(Comparator.comparing(row -> 
row.getString(0)))
                                 .collect(Collectors.toList());
+        int uniqueNumberForTest = nameToUniqueNumber.get(tableName);
         for (int i = 0; i < DATASET.size(); i++)
         {
-            assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i));
-            assertThat(rowList.get(i).getInt(1)).isEqualTo(i);
+            assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i) + 
"_" + uniqueNumberForTest);
+            
assertThat(rowList.get(i).getInt(1)).isEqualTo((uniqueNumberForTest * 100) + i);
         }
     }
 
@@ -92,11 +96,12 @@ class QuoteIdentifiersReadTest extends 
SharedClusterSparkIntegrationTestBase
         List<Row> rowList = data.collectAsList().stream()
                                 .sorted(Comparator.comparing(row -> 
row.getString(0)))
                                 .collect(Collectors.toList());
+        int uniqueNumberForTest = 
nameToUniqueNumber.get(TABLE_NAME_FOR_UDT_TEST);
         for (int i = 0; i < DATASET.size(); i++)
         {
             Row row = rowList.get(i);
-            assertThat(row.getString(0)).isEqualTo(DATASET.get(i));
-            assertThat(row.getInt(1)).isEqualTo(i);
+            assertThat(rowList.get(i).getString(0)).isEqualTo(DATASET.get(i) + 
"_" + uniqueNumberForTest);
+            
assertThat(rowList.get(i).getInt(1)).isEqualTo((uniqueNumberForTest * 100) + i);
             assertThat(row.getStruct(2).getLong(0)).isEqualTo(i); // from UdT1 
TimE column
             assertThat(row.getStruct(2).getInt(1)).isEqualTo(i); // from UdT1 
limit column (limit is a reserved word)
         }
@@ -160,13 +165,19 @@ class QuoteIdentifiersReadTest extends 
SharedClusterSparkIntegrationTestBase
         populateTableWithUdt(TABLE_NAME_FOR_UDT_TEST, DATASET);
     }
 
+    static final AtomicInteger uniqueNumber = new AtomicInteger();
+    static final Map<QualifiedName, Integer> nameToUniqueNumber = new 
HashMap<>();
+
     void populateTable(QualifiedName tableName, List<String> values)
     {
+        int uniqueNumberForTest = uniqueNumber.incrementAndGet();
+        nameToUniqueNumber.put(tableName, uniqueNumberForTest);
         for (int i = 0; i < values.size(); i++)
         {
             String value = values.get(i);
             String query = String.format("INSERT INTO %s (\"IdEnTiFiEr\", 
IdEnTiFiEr) " +
-                                         "VALUES ('%s', %d);", tableName, 
value, i);
+                                         "VALUES ('%s', %d);", tableName, 
value + "_" + uniqueNumberForTest,
+                                         ((uniqueNumberForTest * 100) + i));
             cluster.getFirstRunningInstance()
                    .coordinator()
                    .execute(query, ConsistencyLevel.ALL);
@@ -175,12 +186,15 @@ class QuoteIdentifiersReadTest extends 
SharedClusterSparkIntegrationTestBase
 
     void populateTableWithUdt(QualifiedName tableName, List<String> dataset)
     {
+        int uniqueNumberForTest = uniqueNumber.incrementAndGet();
+        nameToUniqueNumber.put(tableName, uniqueNumberForTest);
         for (int i = 0; i < dataset.size(); i++)
         {
             String value = dataset.get(i);
             String query = String.format("INSERT INTO %s (\"IdEnTiFiEr\", 
IdEnTiFiEr, \"User_Defined_Type\") " +
                                          "VALUES ('%s', %d, { \"TimE\" : %d, 
\"limit\" : %d });",
-                                         tableName, value, i, i, i);
+                                         tableName, value + "_" + 
uniqueNumberForTest,
+                                         ((uniqueNumberForTest * 100) + i), i, 
i);
             cluster.getFirstRunningInstance()
                    .coordinator()
                    .execute(query, ConsistencyLevel.ALL);
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
new file mode 100644
index 0000000..f046dc2
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ReadDifferentTablesTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.TestVersion;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test that reads different tables with different schemas within the same test
+ */
+class ReadDifferentTablesTest extends SharedClusterSparkIntegrationTestBase
+{
+    static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", 
"f", "g");
+    QualifiedName table1 = uniqueTestTableFullName(TEST_KEYSPACE);
+    QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
+
+    @Test
+    void testReadingFromTwoDifferentTables()
+    {
+        Dataset<Row> dataForTable1 = bulkReaderDataFrame(table1).load();
+        Dataset<Row> dataForTable2 = bulkReaderDataFrame(table2).load();
+
+        assertThat(dataForTable1.count()).isEqualTo(DATASET.size());
+        assertThat(dataForTable2.count()).isEqualTo(DATASET.size());
+
+        List<Row> rowList1 = dataForTable1.collectAsList().stream()
+                                          .sorted(Comparator.comparing(row -> 
row.getInt(0)))
+                                          .collect(Collectors.toList());
+
+        List<Row> rowList2 = dataForTable2.collectAsList().stream()
+                                          .sorted(Comparator.comparing(row -> 
row.getLong(1)))
+                                          .collect(Collectors.toList());
+
+        for (int i = 0; i < DATASET.size(); i++)
+        {
+            assertThat(rowList1.get(i).getInt(0)).isEqualTo(i);
+            assertThat(rowList1.get(i).getString(1)).isEqualTo(DATASET.get(i));
+            assertThat(rowList2.get(i).getString(0)).isEqualTo(DATASET.get(i));
+            assertThat(rowList2.get(i).getLong(1)).isEqualTo(i);
+        }
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        createTestTable(table1, "CREATE TABLE IF NOT EXISTS %s (id int PRIMARY 
KEY, name text);");
+        createTestTable(table2, "CREATE TABLE IF NOT EXISTS %s (name text 
PRIMARY KEY, value bigint);");
+
+        IInstance firstRunningInstance = cluster.getFirstRunningInstance();
+        for (int i = 0; i < DATASET.size(); i++)
+        {
+            String value = DATASET.get(i);
+            String query1 = String.format("INSERT INTO %s (id, name) VALUES 
(%d, '%s');", table1, i, value);
+            String query2 = String.format("INSERT INTO %s (name, value) VALUES 
('%s', %d);", table2, value, i);
+
+            firstRunningInstance.coordinator().execute(query1, 
ConsistencyLevel.ALL);
+            firstRunningInstance.coordinator().execute(query2, 
ConsistencyLevel.ALL);
+        }
+    }
+
+    @Override
+    protected UpgradeableCluster provisionCluster(TestVersion testVersion) 
throws IOException
+    {
+        // spin up a C* cluster using the in-jvm dtest
+        Versions versions = Versions.find();
+        Versions.Version requestedVersion = versions.getLatest(new 
Semver(testVersion.version(), Semver.SemverType.LOOSE));
+
+        UpgradeableCluster.Builder clusterBuilder =
+        UpgradeableCluster.build(1)
+                          .withDynamicPortAllocation(true)
+                          .withVersion(requestedVersion)
+                          .withDCs(1)
+                          .withDataDirCount(1)
+                          
.withSharedClasses(JvmDTestSharedClassesPredicate.INSTANCE)
+                          .withConfig(config -> 
config.with(Feature.NATIVE_PROTOCOL)
+                                                      .with(Feature.GOSSIP)
+                                                      .with(Feature.JMX));
+        TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(1, 
clusterBuilder.getTokenCount());
+        clusterBuilder.withTokenSupplier(tokenSupplier);
+        return clusterBuilder.start();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to