This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f123406  CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG 
to define coordinated write to multiple Cassandra clusters (#79)
f123406 is described below

commit f123406e458c0112145f37dcd3f8c20ba47c949d
Author: Yifan Cai <y...@apache.org>
AuthorDate: Wed Sep 11 21:03:47 2024 -0700

    CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define 
coordinated write to multiple Cassandra clusters (#79)
    
    The option specifies the configuration (in JSON) for coordinated write.
    See 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.
    When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and 
LOCAL_DC are ignored if they are present.
    
    Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for 
CASSANDRA-19909
---
 CHANGES.txt                                        |   1 +
 .../spark/common/model/CassandraInstance.java      |  14 +-
 .../java/org/apache/cassandra/clients/Sidecar.java |   2 +-
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  56 +++++-
 .../spark/bulkwriter/CassandraClusterInfo.java     |  44 ++---
 .../spark/bulkwriter/CassandraContext.java         |   6 +-
 .../spark/bulkwriter/CassandraJobInfo.java         |   9 +
 .../cassandra/spark/bulkwriter/ClusterInfo.java    |  20 +++
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |   7 +
 .../cassandra/spark/bulkwriter/RingInstance.java   |  58 ++++--
 .../cassandra/spark/bulkwriter/WriterOptions.java  |   6 +
 .../coordinatedwrite/CoordinatedWriteConf.java     | 199 +++++++++++++++++++++
 .../spark/common/SidecarInstanceFactory.java       |  12 ++
 .../cassandra/spark/data/CassandraDataLayer.java   |  12 +-
 .../spark/bulkwriter/BulkSparkConfTest.java        |  66 +++++++
 .../spark/bulkwriter/BulkWriteValidatorTest.java   |   4 +-
 .../ImportCompletionCoordinatorTest.java           |   1 +
 .../spark/bulkwriter/MockBulkWriterContext.java    |  15 ++
 .../bulkwriter/RingInstanceSerializationTest.java  |   2 +-
 .../spark/bulkwriter/RingInstanceTest.java         | 129 ++++++++-----
 .../spark/bulkwriter/TokenRangeMappingUtils.java   |  10 +-
 .../coordinatedwrite/CoordinatedWriteConfTest.java | 118 ++++++++++++
 22 files changed, 691 insertions(+), 100 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 33bcdd0..8e70567 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to 
multiple Cassandra clusters (CASSANDRA-19909)
  * Decouple Cassandra types from Spark types so Cassandra types can be used 
independently from Spark (CASSANDRA-19815)
  * Make the compression cache configurable to reduce heap pressure for large 
SSTables (CASSANDRA-19900)
  * Refactor TokenRangeMapping to use proper types instead of Strings 
(CASSANDRA-19901)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
index 3820efd..c69b6eb 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
@@ -20,16 +20,26 @@
 package org.apache.cassandra.spark.common.model;
 
 import org.apache.cassandra.spark.data.model.TokenOwner;
+import org.jetbrains.annotations.Nullable;
 
 public interface CassandraInstance extends TokenOwner
 {
+    /**
+     * @return ID string that can uniquely identify a cluster; the return 
value is nullable
+     */
+    @Nullable String clusterId();
+
+    default boolean hasClusterId()
+    {
+        return clusterId() != null;
+    }
+
     String nodeName();
 
     String datacenter();
 
     /**
-     * IP address string of a Cassandra instance.
-     * Mainly used in blocked instance list to identify instances.
+     * IP address string (w/o port) of a Cassandra instance.
      * Prefer to use {@link #ipAddressWithPort} as instance identifier,
      * unless knowing the compared is IP address without port for sure.
      */
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
index 6d558dd..2f17040 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java
@@ -184,7 +184,7 @@ public final class Sidecar
     }
 
     public static List<CompletableFuture<NodeSettings>> 
allNodeSettings(SidecarClient client,
-                                                                        Set<? 
extends SidecarInstance> instances)
+                                                                        
Set<SidecarInstance> instances)
     {
         return instances.stream()
                         .map(instance -> client
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index f03e743..3c45546 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClientConfig;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
 import org.apache.cassandra.spark.common.SidecarInstanceFactory;
@@ -148,7 +150,9 @@ public class BulkSparkConf implements Serializable
     protected int ringRetryCount;
     // create sidecarInstances from sidecarContactPointsValue and 
effectiveSidecarPort
     private final String sidecarContactPointsValue; // It takes comma 
separated values
-    private transient Set<? extends SidecarInstance> sidecarContactPoints; // 
not serialized
+    private transient Set<SidecarInstance> sidecarContactPoints; // not 
serialized
+    private final String coordinatedWriteConfJson;
+    private transient CoordinatedWriteConf coordinatedWriteConf; // it is 
transient; deserialized from coordinatedWriteConfJson in executors
 
     public BulkSparkConf(SparkConf conf, Map<String, String> options)
     {
@@ -223,7 +227,8 @@ public class BulkSparkConf implements Serializable
         }
         this.jobTimeoutSeconds = MapUtils.getLong(options, 
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
         this.configuredJobId = MapUtils.getOrDefault(options, 
WriterOptions.JOB_ID.name(), null);
-
+        this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, 
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
+        this.coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
         validateEnvironment();
     }
 
@@ -263,7 +268,7 @@ public class BulkSparkConf implements Serializable
         });
     }
 
-    protected Set<? extends SidecarInstance> buildSidecarContactPoints()
+    protected Set<SidecarInstance> buildSidecarContactPoints()
     {
         String[] split = Objects.requireNonNull(sidecarContactPointsValue, 
"Unable to build sidecar instances from null value")
                                 .split(",");
@@ -273,7 +278,7 @@ public class BulkSparkConf implements Serializable
                      .collect(Collectors.toSet());
     }
 
-    Set<? extends SidecarInstance> sidecarContactPoints()
+    Set<SidecarInstance> sidecarContactPoints()
     {
         if (sidecarContactPoints == null)
         {
@@ -282,6 +287,45 @@ public class BulkSparkConf implements Serializable
         return sidecarContactPoints;
     }
 
+    public boolean isCoordinatedWriteConfigured()
+    {
+        return coordinatedWriteConf != null;
+    }
+
+    public CoordinatedWriteConf coordinatedWriteConf()
+    {
+        if (coordinatedWriteConf == null)
+        {
+            coordinatedWriteConf = 
buildCoordinatedWriteConf(dataTransportInfo.getTransport());
+        }
+
+        return coordinatedWriteConf;
+    }
+
+    @Nullable
+    protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport 
dataTransport)
+    {
+        if (coordinatedWriteConfJson == null)
+        {
+            return null;
+        }
+
+        Preconditions.checkArgument(dataTransport == DataTransport.S3_COMPAT,
+                                    "Coordinated write only supports " + 
DataTransport.S3_COMPAT);
+
+        if (sidecarContactPointsValue != null)
+        {
+            LOGGER.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are 
ignored on the presence of COORDINATED_WRITE_CONF");
+        }
+
+        if (localDC != null)
+        {
+            LOGGER.warn("LOCAL_DC is ignored on the presence of 
COORDINATED_WRITE_CONF");
+        }
+
+        return CoordinatedWriteConf.create(coordinatedWriteConfJson, 
consistencyLevel, SimpleClusterConf.class);
+    }
+
     protected void validateEnvironment() throws RuntimeException
     {
         Preconditions.checkNotNull(keyspace);
@@ -351,12 +395,12 @@ public class BulkSparkConf implements Serializable
         return truststorePath;
     }
 
-    protected TTLOption getTTLOptions()
+    public TTLOption getTTLOptions()
     {
         return TTLOption.from(ttl);
     }
 
-    protected TimestampOption getTimestampOptions()
+    public TimestampOption getTimestampOptions()
     {
         return TimestampOption.from(timestamp);
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index 359dc11..4165c7c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -36,7 +36,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import o.a.c.sidecar.client.shaded.common.response.GossipInfoResponse;
 import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
 import o.a.c.sidecar.client.shaded.common.response.SchemaResponse;
 import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
@@ -63,19 +62,26 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfo.class);
 
     protected final BulkSparkConf conf;
+    protected final String clusterId;
     protected String cassandraVersion;
     protected Partitioner partitioner;
 
-    protected transient TokenRangeMapping<RingInstance> tokenRangeReplicas;
-    protected transient String keyspaceSchema;
-    protected transient GossipInfoResponse gossipInfo;
-    protected transient CassandraContext cassandraContext;
+    protected transient volatile TokenRangeMapping<RingInstance> 
tokenRangeReplicas;
+    protected transient volatile String keyspaceSchema;
+    protected transient volatile CassandraContext cassandraContext;
     protected final transient AtomicReference<NodeSettings> nodeSettings;
     protected final transient List<CompletableFuture<NodeSettings>> 
allNodeSettingFutures;
 
     public CassandraClusterInfo(BulkSparkConf conf)
+    {
+        this(conf, null);
+    }
+
+    // Used by CassandraClusterInfoGroup
+    public CassandraClusterInfo(BulkSparkConf conf, String clusterId)
     {
         this.conf = conf;
+        this.clusterId = clusterId;
         this.cassandraContext = buildCassandraContext();
         LOGGER.info("Getting Cassandra versions from all nodes");
         this.nodeSettings = new AtomicReference<>(null);
@@ -113,6 +119,12 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
         }
     }
 
+    @Override
+    public String clusterId()
+    {
+        return clusterId;
+    }
+
     /**
      * Gets a Cassandra Context
      * <p>
@@ -191,15 +203,11 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
     }
 
     @Override
-    public void refreshClusterInfo()
+    public synchronized void refreshClusterInfo()
     {
-        synchronized (this)
-        {
-            // Set backing stores to null and let them lazy-load on the next 
call
-            gossipInfo = null;
-            keyspaceSchema = null;
-            getCassandraContext().refreshClusterConfig();
-        }
+        // Set backing stores to null and let them lazy-load on the next call
+        keyspaceSchema = null;
+        getCassandraContext().refreshClusterConfig();
     }
 
     protected String getCurrentKeyspaceSchema() throws Exception
@@ -232,13 +240,6 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
         }
     }
 
-    private Set<String> 
readReplicasFromTokenRangeResponse(TokenRangeReplicasResponse response)
-    {
-        return response.readReplicas().stream()
-                       .flatMap(rr -> 
rr.replicasByDatacenter().values().stream())
-                       .flatMap(List::stream).collect(Collectors.toSet());
-    }
-
     @NotNull
     protected ReplicationFactor getReplicationFactor()
     {
@@ -287,7 +288,8 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
             return topology;
         }
 
-        // Block for the call-sites requesting the latest view of the ring; 
but it is OK to serve the other call-sites that request for the cached view
+        // Block only for the call-sites requesting the latest view of the ring
+        // The other call-sites get the cached/stale view
         // We can avoid synchronization here
         if (topology != null)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
index 1da6083..41a7be1 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java
@@ -40,7 +40,7 @@ public class CassandraContext implements StartupValidatable, 
Closeable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraContext.class);
     @NotNull
-    protected Set<? extends SidecarInstance> clusterConfig;
+    protected Set<SidecarInstance> clusterConfig;
     private final BulkSparkConf conf;
     private final transient SidecarClient sidecarClient;
 
@@ -57,7 +57,7 @@ public class CassandraContext implements StartupValidatable, 
Closeable
         return new CassandraContext(conf);
     }
 
-    public Set<? extends SidecarInstance> getCluster()
+    public Set<SidecarInstance> getCluster()
     {
         return clusterConfig;
     }
@@ -86,7 +86,7 @@ public class CassandraContext implements StartupValidatable, 
Closeable
         return Sidecar.from(new SimpleSidecarInstancesProvider(new 
ArrayList<>(clusterConfig)), conf);
     }
 
-    protected Set<? extends SidecarInstance> createClusterConfig()
+    protected Set<SidecarInstance> createClusterConfig()
     {
         return conf.sidecarContactPoints();
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 39ba908..8e12f07 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.util.UUID;
 
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 public class CassandraJobInfo implements JobInfo
 {
@@ -105,6 +107,13 @@ public class CassandraJobInfo implements JobInfo
         return conf.importCoordinatorTimeoutMultiplier;
     }
 
+    @Nullable
+    @Override
+    public CoordinatedWriteConf coordinatedWriteConf()
+    {
+        return conf.coordinatedWriteConf();
+    }
+
     @Override
     public int getCommitThreadsPerInstance()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
index b6ed979..c6fe930 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java
@@ -27,6 +27,7 @@ import 
o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.validation.StartupValidatable;
+import org.jetbrains.annotations.Nullable;
 
 public interface ClusterInfo extends StartupValidatable, Serializable
 {
@@ -47,10 +48,29 @@ public interface ClusterInfo extends StartupValidatable, 
Serializable
 
     TimeSkewResponse getTimeSkew(List<RingInstance> replicas);
 
+    /**
+     * Return the keyspace schema string of the enclosing keyspace for bulk 
write in the cluster
+     * @param cached whether using the cached schema information
+     * @return keyspace schema string
+     */
     String getKeyspaceSchema(boolean cached);
 
     CassandraContext getCassandraContext();
 
+    /**
+     * ID string that can uniquely identify a cluster
+     * <p>
+     * Implementor note: the method is optional. When writing to a single 
cluster, there is no requirement of assigning an ID for bulk write to proceed.
+     * When in the coordinated write mode, i.e. writing to multiple clusters, 
the method must be implemented and return unique string for clusters.
+     *
+     * @return cluster id string, null if absent
+     */
+    @Nullable
+    default String clusterId()
+    {
+        return null;
+    }
+
     default void close()
     {
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index a454827..496580e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
 import java.io.Serializable;
 import java.util.UUID;
 
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.jetbrains.annotations.NotNull;
@@ -100,4 +101,10 @@ public interface JobInfo extends Serializable
      * @return multiplier to calculate the final timeout for import coordinator
      */
     double importCoordinatorTimeoutMultiplier();
+
+    /**
+     * @return CoordinatedWriteConf if configured, null otherwise
+     */
+    @Nullable
+    CoordinatedWriteConf coordinatedWriteConf();
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index ae04447..f0edb71 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -25,6 +25,8 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
 import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
@@ -36,14 +38,11 @@ public class RingInstance implements CassandraInstance, 
Serializable
 {
     private static final long serialVersionUID = 4399143234683369652L;
     private RingEntry ringEntry;
+    private @Nullable String clusterId;
 
-    public RingInstance(RingEntry ringEntry)
-    {
-        this.ringEntry = ringEntry;
-    }
-
-    public RingInstance(ReplicaMetadata replica)
+    public RingInstance(ReplicaMetadata replica, @Nullable String clusterId)
     {
+        this.clusterId = clusterId;
         this.ringEntry = new RingEntry.Builder()
                          .fqdn(replica.fqdn())
                          .address(replica.address())
@@ -54,6 +53,25 @@ public class RingInstance implements CassandraInstance, 
Serializable
                          .build();
     }
 
+    @VisibleForTesting
+    public RingInstance(RingEntry ringEntry)
+    {
+        this(ringEntry, null);
+    }
+
+    @VisibleForTesting
+    public RingInstance(RingEntry ringEntry, @Nullable String clusterId)
+    {
+        this.clusterId = clusterId;
+        this.ringEntry = ringEntry;
+    }
+
+    @VisibleForTesting
+    public RingInstance(ReplicaMetadata replica)
+    {
+        this(replica, null);
+    }
+
     // Used only in tests
     @Override
     public String token()
@@ -61,6 +79,12 @@ public class RingInstance implements CassandraInstance, 
Serializable
         return ringEntry.token();
     }
 
+    @Override
+    public String clusterId()
+    {
+        return clusterId;
+    }
+
     @Override
     public String nodeName()
     {
@@ -98,9 +122,9 @@ public class RingInstance implements CassandraInstance, 
Serializable
     }
 
     /**
-     * Custom equality that compares the token, fully qualified domain name, 
the port, and the datacenter
+     * Custom equality that compares the token, fully qualified domain name, 
the port, the datacenter and the clusterId
      *
-     * Note that node state and status are not part of the calculation.
+     * Note that node state, status,  are not part of the calculation.
      *
      * @param other the other instance
      * @return true if both instances are equal, false otherwise
@@ -108,13 +132,20 @@ public class RingInstance implements CassandraInstance, 
Serializable
     @Override
     public boolean equals(@Nullable Object other)
     {
-        if (other == null || !(other instanceof RingInstance))
+        if (this == other)
+        {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass())
         {
             return false;
         }
         final RingInstance that = (RingInstance) other;
-        return Objects.equals(ringEntry.token(), that.ringEntry.token())
+        return Objects.equals(clusterId, that.clusterId)
+               && Objects.equals(ringEntry.token(), that.ringEntry.token())
                && Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn())
+               && Objects.equals(ringEntry.rack(), that.ringEntry.rack())
                && Objects.equals(ringEntry.address(), that.ringEntry.address())
                && ringEntry.port() == that.ringEntry.port()
                && Objects.equals(ringEntry.datacenter(), 
that.ringEntry.datacenter());
@@ -130,7 +161,7 @@ public class RingInstance implements CassandraInstance, 
Serializable
     @Override
     public int hashCode()
     {
-        return Objects.hash(ringEntry.token(), ringEntry.fqdn(), 
ringEntry.port(), ringEntry.datacenter(), ringEntry.address());
+        return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), 
ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(), 
ringEntry.address());
     }
 
     @Override
@@ -139,7 +170,7 @@ public class RingInstance implements CassandraInstance, 
Serializable
         return ringEntry.toString();
     }
 
-    public RingEntry ringInstance()
+    public RingEntry ringEntry()
     {
         return ringEntry;
     }
@@ -158,6 +189,7 @@ public class RingInstance implements CassandraInstance, 
Serializable
         out.writeObject(ringEntry.hostId());
         out.writeObject(ringEntry.load());
         out.writeObject(ringEntry.owns());
+        out.writeObject(clusterId);
     }
 
     private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException
@@ -174,6 +206,7 @@ public class RingInstance implements CassandraInstance, 
Serializable
         String hostId = (String) in.readObject();
         String load = (String) in.readObject();
         String owns = (String) in.readObject();
+        String clusterId = (String) in.readObject();
         ringEntry = new RingEntry.Builder().datacenter(datacenter)
                                            .address(address)
                                            .port(port)
@@ -186,5 +219,6 @@ public class RingInstance implements CassandraInstance, 
Serializable
                                            .load(load)
                                            .owns(owns)
                                            .build();
+        this.clusterId = clusterId;
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 6c7eba7..1b36bd0 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -31,6 +31,12 @@ public enum WriterOptions implements WriterOption
     // The option specifies the initial contact points of sidecar servers to 
discover the cluster topology
     // Note that the addresses can include port; when port is present, it 
takes precedence over SIDECAR_PORT
     SIDECAR_CONTACT_POINTS,
+    /**
+     * The option specifies the configuration (in JSON) for coordinated write.
+     * See 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.
+     * When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES 
and LOCAL_DC are ignored if they are present.
+     */
+    COORDINATED_WRITE_CONFIG,
     KEYSPACE,
     TABLE,
     BULK_WRITER_CL,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
new file mode 100644
index 0000000..63fc017
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Data class containing the configurations required for coordinated write.
+ * The serialization format is JSON string. The class takes care of 
serialization and deserialization.
+ */
+public class CoordinatedWriteConf
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CoordinatedWriteConf.class);
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    // The runtime type of ClusterConfProvider is erased; use clustersOf 
method to read the desired type back
+    private final Map<String, ClusterConf> clusters;
+
+    /**
+     * Parse JSON string and create a CoordinatedWriteConf object with the 
specified ClusterConfProvider format
+     *
+     * @param json JSON string
+     * @param clusterConfType concrete type of ClusterConfProvider that can be 
used for JSON serialization and deserialization
+     * @return CoordinatedWriteConf object
+     * @param <T> subtype of ClusterConfProvider
+     */
+    public static <T extends ClusterConf>
+    CoordinatedWriteConf create(String json, ConsistencyLevel.CL 
consistencyLevel, Class<T> clusterConfType)
+    {
+        JavaType javaType = 
TypeFactory.defaultInstance().constructMapType(Map.class, String.class, 
clusterConfType);
+        CoordinatedWriteConf result;
+        try
+        {
+            result = new CoordinatedWriteConf(OBJECT_MAPPER.readValue(json, 
javaType));
+        }
+        catch (Exception e)
+        {
+            throw new IllegalArgumentException("Unable to parse json string 
into CoordinatedWriteConf of " + clusterConfType.getSimpleName() +
+                                               " due to " + e.getMessage(), e);
+        }
+        result.clusters().forEach((clusterId, cluster) -> {
+            if (consistencyLevel.isLocal())
+            {
+                
Preconditions.checkState(!StringUtils.isEmpty(cluster.localDc()),
+                                         "localDc is not configured for 
cluster: " + clusterId + " for consistency level: " + consistencyLevel);
+            }
+            else
+            {
+                if (cluster.localDc() != null)
+                {
+                    LOGGER.warn("Ignoring the localDc configured for cluster, 
when consistency level is non-local. cluster={} consistencyLevel={}",
+                                clusterId, consistencyLevel);
+                }
+            }
+        });
+        return result;
+    }
+
+    public CoordinatedWriteConf(Map<String, ? extends ClusterConf> clusters)
+    {
+        this.clusters = Collections.unmodifiableMap(clusters);
+    }
+
+    public Map<String, ClusterConf> clusters()
+    {
+        return clusters;
+    }
+
+    @Nullable
+    public ClusterConf cluster(String clusterId)
+    {
+        return clusters.get(clusterId);
+    }
+
+    public <T extends ClusterConf> Map<String, T> clustersOf(Class<T> 
clusterConfType)
+    {
+        // verify that map type can cast; there are only limited number of 
values and check is cheap
+        clusters.values().forEach(v -> 
Preconditions.checkState(clusterConfType.isInstance(v),
+                                                                
"ClusterConfProvider value is not instance of " + clusterConfType));
+        return (Map<String, T>) clusters;
+    }
+
+    public String toJson() throws JsonProcessingException
+    {
+        return OBJECT_MAPPER.writeValueAsString(clusters);
+    }
+
+    public interface ClusterConf
+    {
+        Set<SidecarInstance> sidecarContactPoints();
+
+        @Nullable
+        String localDc();
+
+        @Nullable
+        default String resolveLocalDc(ConsistencyLevel.CL cl)
+        {
+            String localDc = localDc();
+            boolean hasLocalDc = !StringUtils.isEmpty(localDc());
+            if (!cl.isLocal() && hasLocalDc)
+            {
+                return null;
+            }
+            if (cl.isLocal() && !hasLocalDc)
+            {
+                throw new IllegalStateException("No localDc is specified for 
local consistency level: " + cl);
+            }
+            return localDc;
+        }
+    }
+
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public static class SimpleClusterConf implements ClusterConf
+    {
+        private final List<String> sidecarContactPointsValue;
+        private final Set<SidecarInstance> sidecarContactPoints;
+        private final @Nullable String localDc;
+
+        @JsonCreator
+        public SimpleClusterConf(@JsonProperty("sidecarContactPoints") 
List<String> sidecarContactPointsValue,
+                                 @JsonProperty("localDc") String localDc)
+        {
+            this.sidecarContactPointsValue = sidecarContactPointsValue;
+            this.sidecarContactPoints = 
buildSidecarContactPoints(sidecarContactPointsValue);
+            this.localDc = localDc;
+        }
+
+        @JsonProperty("sidecarContactPoints")
+        public List<String> sidecarContactPointsValue()
+        {
+            return sidecarContactPointsValue;
+        }
+
+        @Nullable
+        @Override
+        @JsonProperty("localDc")
+        public String localDc()
+        {
+            return localDc;
+        }
+
+        @Override
+        public Set<SidecarInstance> sidecarContactPoints()
+        {
+            return sidecarContactPoints;
+        }
+
+        private Set<SidecarInstance> buildSidecarContactPoints(List<String> 
sidecarContactPoints)
+        {
+            return sidecarContactPoints.stream()
+                                       .filter(StringUtils::isNotEmpty)
+                                       
.map(SidecarInstanceFactory::createFromString)
+                                       .collect(collectingAndThen(toSet(), 
Collections::unmodifiableSet));
+        }
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
index cc6fb09..ce69703 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java
@@ -58,7 +58,19 @@ public class SidecarInstanceFactory
             port = Integer.parseInt(portStr);
         }
 
+        Preconditions.checkState(port != -1, "Unable to resolve port from %s", 
input);
+
         LOGGER.info("Create sidecar instance. hostname={} port={}", hostname, 
port);
         return new SidecarInstanceImpl(hostname, port);
     }
+
+    /**
+     * Similar to {@link SidecarInstanceFactory#createFromString(String, 
int)}, but it requires that the input string must include port
+     * @param hostnameWithPort hostname with port
+     * @return SidecarInstanceImpl
+     */
+    public static SidecarInstanceImpl createFromString(String hostnameWithPort)
+    {
+        return createFromString(hostnameWithPort, -1);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 5e26deb..369e58e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -124,7 +124,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     // create clusterConfig from sidecarInstances and sidecarPort, see 
initializeClusterConfig
     protected String sidecarInstances;
     protected int sidecarPort;
-    protected transient Set<? extends SidecarInstance> clusterConfig;
+    protected transient Set<SidecarInstance> clusterConfig;
     protected TokenPartitioner tokenPartitioner;
     protected Map<String, AvailabilityHint> availabilityHints;
     protected Sidecar.ClientConfig sidecarClientConfig;
@@ -907,13 +907,13 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         }
     }
 
-    protected Set<? extends SidecarInstance> 
initializeClusterConfig(ClientConfig options)
+    protected Set<SidecarInstance> initializeClusterConfig(ClientConfig 
options)
     {
         return initializeClusterConfig(options.sidecarContactPoints, 
options.sidecarPort());
     }
 
     // not intended to be overridden
-    private Set<? extends SidecarInstance> initializeClusterConfig(String 
sidecarInstances, int sidecarPort)
+    private Set<SidecarInstance> initializeClusterConfig(String 
sidecarInstances, int sidecarPort)
     {
         return Arrays.stream(sidecarInstances.split(","))
                      .filter(StringUtils::isNotEmpty)
@@ -921,7 +921,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
                      .collect(Collectors.toSet());
     }
 
-    protected String getEffectiveCassandraVersionForRead(Set<? extends 
SidecarInstance> clusterConfig,
+    protected String getEffectiveCassandraVersionForRead(Set<SidecarInstance> 
clusterConfig,
                                                          NodeSettings 
nodeSettings)
     {
         return nodeSettings.releaseVersion();
@@ -932,7 +932,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
         LOGGER.info("Dial home. clientConfig={}", options);
     }
 
-    protected void clearSnapshot(Set<? extends SidecarInstance> clusterConfig, 
@NotNull ClientConfig options)
+    protected void clearSnapshot(Set<SidecarInstance> clusterConfig, @NotNull 
ClientConfig options)
     {
         if (maybeQuotedKeyspace == null || maybeQuotedTable == null)
         {
@@ -982,7 +982,7 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
      * @param options           the {@link ClientConfig} options
      * @return the {@link Sizing} object based on the {@code sizing} option 
provided by the user
      */
-    protected Sizing getSizing(Set<? extends SidecarInstance> clusterConfig,
+    protected Sizing getSizing(Set<SidecarInstance> clusterConfig,
                                ReplicationFactor replicationFactor,
                                ClientConfig options)
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index d48ace8..3130fc1 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Maps;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
 import org.apache.cassandra.spark.utils.BuildInfo;
 import org.apache.spark.SparkConf;
@@ -275,6 +277,70 @@ class BulkSparkConfTest
         .isNull();
     }
 
+    @Test
+    void testReadCoordinatedWriteConfFails()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        String coordinatedWriteConfJsonNoLocalDc = "{\"cluster1\":" +
+                                                   
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"]}}";
+
+        options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), 
coordinatedWriteConfJsonNoLocalDc);
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Coordinated write only supports S3_COMPAT");
+
+        options.put(WriterOptions.DATA_TRANSPORT.name(), 
DataTransport.S3_COMPAT.name());
+        options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM");
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(IllegalStateException.class)
+        .hasMessage("localDc is not configured for cluster: cluster1 for 
consistency level: LOCAL_QUORUM");
+
+        options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), "invalid 
json");
+        assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Unable to parse json string into 
CoordinatedWriteConf of SimpleClusterConf due to Unrecognized token 'invalid'");
+    }
+
+    @Test
+    void testCoordinatedWriteConf()
+    {
+        Map<String, String> options = copyDefaultOptions();
+        options.remove(WriterOptions.COORDINATED_WRITE_CONFIG.name());
+        BulkSparkConf conf = new BulkSparkConf(sparkConf, options);
+        assertThat(conf.isCoordinatedWriteConfigured())
+        .describedAs("When COORDINATED_WRITE_CONF is absent, 
isCoordinatedWriteConfigured should return false")
+        .isFalse();
+
+        String coordinatedWriteConfJson = "{\"cluster1\":" +
+                                          
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"],"
 +
+                                          "\"localDc\":\"dc1\"}," +
+                                          "\"cluster2\":" +
+                                          
"{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"],"
 +
+                                          "\"localDc\":\"dc1\"}}";
+        options.put(WriterOptions.DATA_TRANSPORT.name(), 
DataTransport.S3_COMPAT.name());
+        options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM");
+        options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), 
coordinatedWriteConfJson);
+        conf = new BulkSparkConf(sparkConf, options);
+        assertThat(conf.isCoordinatedWriteConfigured())
+        .describedAs("When COORDINATED_WRITE_CONF is present, it should return 
true")
+        .isTrue();
+        
assertThat(conf.coordinatedWriteConf().clusters()).containsOnlyKeys("cluster1", 
"cluster2");
+        CoordinatedWriteConf.ClusterConf cluster1 = 
conf.coordinatedWriteConf().cluster("cluster1");
+        assertThat(cluster1).isNotNull();
+        assertThat(cluster1.sidecarContactPoints())
+        .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-1", 9999),
+                                   new SidecarInstanceImpl("instance-2", 9999),
+                                   new SidecarInstanceImpl("instance-3", 
9999));
+        assertThat(cluster1.localDc()).isEqualTo("dc1");
+        CoordinatedWriteConf.ClusterConf cluster2 = 
conf.coordinatedWriteConf().cluster("cluster2");
+        assertThat(cluster2).isNotNull();
+        assertThat(cluster2.sidecarContactPoints())
+        .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-4", 8888),
+                                   new SidecarInstanceImpl("instance-5", 8888),
+                                   new SidecarInstanceImpl("instance-6", 
8888));
+        assertThat(cluster2.localDc()).isEqualTo("dc1");
+    }
+
     private Map<String, String> copyDefaultOptions()
     {
         TreeMap<String, String> map = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
index 5428363..59d7705 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
@@ -62,9 +62,7 @@ class BulkWriteValidatorTest
         for (RingInstance instance : topology.getTokenRanges().keySet())
         {
             // Mark nodes 0, 1, 2 as DOWN
-            int nodeId = Integer.parseInt(instance.ipAddress()
-                                                  .replace("localhost", "")
-                                                  .replace(":9042", ""));
+            int nodeId = 
Integer.parseInt(instance.ipAddress().replace("localhost", ""));
             instanceAvailabilityMap.put(instance, (nodeId <= 2) ? 
WriteAvailability.UNAVAILABLE_DOWN : WriteAvailability.AVAILABLE);
         }
         
when(mockClusterInfo.clusterWriteAvailability()).thenReturn(instanceAvailabilityMap);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
index f84280f..578781b 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
@@ -494,6 +494,7 @@ class ImportCompletionCoordinatorTest
         int instanceInRing = i % totalInstances + 1;
         return new RingInstance(new RingEntry.Builder()
                                 .datacenter("DC1")
+                                .rack("Rack")
                                 .address("127.0.0." + instanceInRing)
                                 .token(String.valueOf(i * 100_000))
                                 .fqdn("DC1-i" + instanceInRing)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index d93a217..33b09a2 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -46,6 +46,7 @@ import 
o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersion;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -63,6 +64,7 @@ import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -210,6 +212,12 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return null;
     }
 
+    @Override
+    public String clusterId()
+    {
+        return "test-cluster";
+    }
+
     @Override
     public void refreshClusterInfo()
     {
@@ -303,6 +311,13 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return 2.0;
     }
 
+    @Nullable
+    @Override
+    public CoordinatedWriteConf coordinatedWriteConf()
+    {
+        return null;
+    }
+
     public void setSkipCleanOnFailures(boolean skipClean)
     {
         this.skipClean = skipClean;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
index d15db89..333eabb 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java
@@ -69,7 +69,7 @@ public class RingInstanceSerializationTest
                                                        7000,
                                                        dataCenter);
 
-        RingInstance ring = new RingInstance(metadata);
+        RingInstance ring = new RingInstance(metadata, "test-cluster");
 
         byte[] bytes = serialize(ring);
         RingInstance deserialized = deserialize(bytes, RingInstance.class);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index da2f1af..0846e8f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -31,21 +31,23 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import org.junit.jupiter.api.Test;
 
 import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
-import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.NotNull;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RingInstanceTest
 {
@@ -123,58 +125,75 @@ public class RingInstanceTest
     @Test
     public void testEquals()
     {
-        RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
-        RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+        RingInstance instance1 = mockRingInstance();
+        RingInstance instance2 = mockRingInstance();
         assertEquals(instance1, instance2);
     }
 
     @Test
     public void testHashCode()
     {
-        RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
-        RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+        RingInstance instance1 = mockRingInstance();
+        RingInstance instance2 = mockRingInstance();
         assertEquals(instance1.hashCode(), instance2.hashCode());
     }
 
     @Test
-    public void testEqualsAndHashcodeIgnoreHost()
-    {
-        RingInstance realInstance = new RingInstance(new RingEntry.Builder()
-                                                     .datacenter("DATACENTER1")
-                                                     .address("127.0.0.1")
-                                                     .port(7000)
-                                                     .rack("Rack")
-                                                     .status("UP")
-                                                     .state("NORMAL")
-                                                     .load("0")
-                                                     .token("0")
-                                                     .fqdn("fqdn")
-                                                     .hostId("")
-                                                     .owns("")
-                                                     .build());
-
-        RingInstance questionInstance = new RingInstance(new 
RingEntry.Builder()
-                                                         
.datacenter("DATACENTER1")
-                                                         .address("127.0.0.1")
-                                                         .port(7000)
-                                                         .rack("Rack")
-                                                         .status("UP")
-                                                         .state("NORMAL")
-                                                         .load("0")
-                                                         .token("0")
-                                                         .fqdn("fqdn")
-                                                         .hostId("")
-                                                         .owns("")
-                                                         .build());
-        assertEquals(realInstance, questionInstance);
-        assertEquals(realInstance.hashCode(), questionInstance.hashCode());
+    public void testEqualsAndHashcodeIgnoreNonCriticalFields()
+    {
+        RingEntry.Builder builder = mockRingEntryBuilder();
+        // the fields chained in the builder below are not considered for 
equality check
+        RingInstance instance1 = new RingInstance(builder
+                                                  .status("1")
+                                                  .state("1")
+                                                  .load("1")
+                                                  .hostId("1")
+                                                  .owns("1")
+                                                  .build());
+        RingInstance instance2 = new RingInstance(builder
+                                                  .status("2")
+                                                  .state("2")
+                                                  .load("2")
+                                                  .hostId("2")
+                                                  .owns("2")
+                                                  .build());
+        assertEquals(instance1, instance2);
+        assertEquals(instance1.hashCode(), instance2.hashCode());
+    }
+
+    @Test
+    public void testEqualsAndHashcodeConsidersClusterId()
+    {
+        RingEntry ringEntry = mockRingEntry();
+        RingInstance c1i1 = new RingInstance(ringEntry, "cluster1");
+        RingInstance c1i2 = new RingInstance(ringEntry, "cluster1");
+        RingInstance c2i1 = new RingInstance(ringEntry, "cluster2");
+
+        assertEquals(c1i1, c1i2);
+        assertEquals(c1i1.hashCode(), c1i2.hashCode());
+
+        assertNotEquals(c1i1, c2i1);
+        assertNotEquals(c1i1.hashCode(), c2i1.hashCode());
+    }
+
+    @Test
+    public void testHasClusterId()
+    {
+        RingEntry ringEntry = mockRingEntry();
+        RingInstance instance = new RingInstance(ringEntry);
+        assertFalse(instance.hasClusterId());
+
+        RingInstance instanceWithClusterId = new RingInstance(ringEntry, 
"cluster1");
+        assertTrue(instanceWithClusterId.hasClusterId());
+        assertEquals("cluster1", instanceWithClusterId.clusterId());
     }
 
     @Test
     public void multiMapWorksWithRingInstances()
     {
-        RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
-        RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, 
ImmutableMap.of("DATACENTER1", 1), 1).get(0);
+        RingEntry ringEntry = mockRingEntry();
+        RingInstance instance1 = new RingInstance(ringEntry);
+        RingInstance instance2 = new RingInstance(ringEntry);
         byte[] buffer;
 
         try
@@ -224,7 +243,35 @@ public class RingInstanceTest
         
replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
                                                        
tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
 
-        replicationFactor3.getFailedRanges(tokenRange, 
ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
-        assertFalse(replicationFactor3.hasFailed(tokenRange, 
ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1));
+        assertTrue(replicationFactor3.getFailedRanges(tokenRange, 
CL.LOCAL_QUORUM, DATACENTER_1).isEmpty());
+    }
+
+    @NotNull
+    private static RingEntry mockRingEntry()
+    {
+        return mockRingEntryBuilder().build();
+    }
+
+    @NotNull
+    private static RingEntry.Builder mockRingEntryBuilder()
+    {
+        return new RingEntry.Builder()
+               .datacenter("DATACENTER1")
+               .address("127.0.0.1")
+               .port(0)
+               .rack("Rack")
+               .status("UP")
+               .state("NORMAL")
+               .load("0")
+               .token("0")
+               .fqdn("DATACENTER1-i1")
+               .hostId("")
+               .owns("");
+    }
+
+    @NotNull
+    private static RingInstance mockRingInstance()
+    {
+        return new RingInstance(mockRingEntry());
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index 9c53279..202d3a3 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -61,7 +61,7 @@ public final class TokenRangeMappingUtils
     {
         List<RingInstance> instances = getInstances(initialToken, rfByDC, 
instancesPerDC);
         RingInstance instance = instances.remove(0);
-        RingEntry entry = instance.ringInstance();
+        RingEntry entry = instance.ringEntry();
         RingEntry newEntry = new RingEntry.Builder()
                              .datacenter(entry.datacenter())
                              .port(entry.port())
@@ -97,7 +97,7 @@ public final class TokenRangeMappingUtils
         if (shouldUpdateToken)
         {
             RingInstance instance = instances.remove(0);
-            RingEntry entry = instance.ringInstance();
+            RingEntry entry = instance.ringEntry();
             RingEntry newEntry = new RingEntry.Builder()
                                  .datacenter(entry.datacenter())
                                  .port(entry.port())
@@ -218,9 +218,11 @@ public final class TokenRangeMappingUtils
             replicasPerDc.put("ignored", replicas);
             ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), 
String.valueOf(endToken), replicasPerDc);
             replicaInfoList.add(ri);
-            String address = "localhost" + i + ":9042";
+            String address = "localhost" + i;
+            int port = 9042;
+            String addressWithPort = address + ":" + port;
             ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, 
address, 9042, "ignored");
-            replicaMetadata.put(address, rm);
+            replicaMetadata.put(addressWithPort, rm);
             startToken = endToken;
         }
         return new TokenRangeReplicasResponse(replicaInfoList, 
replicaInfoList, replicaMetadata);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
new file mode 100644
index 0000000..be15a6a
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConf;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CoordinatedWriteConfTest
+{
+    @Test
+    void testSerDeser() throws JsonProcessingException
+    {
+        Map<String, SimpleClusterConf> clusters = new HashMap<>();
+        clusters.put("cluster1", new 
SimpleClusterConf(Arrays.asList("instance-1:9999", "instance-2:9999", 
"instance-3:9999"), "dc1"));
+        clusters.put("cluster2", new 
SimpleClusterConf(Arrays.asList("instance-4:8888", "instance-5:8888", 
"instance-6:8888"), "dc1"));
+        CoordinatedWriteConf conf = new CoordinatedWriteConf(clusters);
+        String json = conf.toJson();
+        assertThat(json)
+        .isEqualTo("{\"cluster1\":" +
+                   
"{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"],"
 +
+                   "\"localDc\":\"dc1\"}," +
+                   "\"cluster2\":" +
+                   
"{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"],"
 +
+                   "\"localDc\":\"dc1\"}}");
+        CoordinatedWriteConf deser = CoordinatedWriteConf.create(json, 
CL.LOCAL_QUORUM, SimpleClusterConf.class);
+        assertThat(deser.clusters()).containsKeys("cluster1", "cluster2");
+        
assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster1").sidecarContactPointsValue())
+        .isEqualTo(Arrays.asList("instance-1:9999", "instance-2:9999", 
"instance-3:9999"));
+        
assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster2").sidecarContactPointsValue())
+        .isEqualTo(Arrays.asList("instance-4:8888", "instance-5:8888", 
"instance-6:8888"));
+        
assertThat(deser.clusters().get("cluster1").localDc()).isEqualTo("dc1");
+        
assertThat(deser.clusters().get("cluster2").localDc()).isEqualTo("dc1");
+        Set<SidecarInstance> contactPoints = 
deser.clusters().get("cluster1").sidecarContactPoints();
+        assertThat(contactPoints)
+        .hasSize(3);
+        // assertj contains method does not compile with SidecarInstanceImpl 
due to type erasure
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-1", 9999))).isTrue();
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-2", 9999))).isTrue();
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-3", 9999))).isTrue();
+        contactPoints = 
deser.clusters().get("cluster2").sidecarContactPoints();
+        assertThat(contactPoints)
+        .hasSize(3);
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-4", 8888))).isTrue();
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-5", 8888))).isTrue();
+        assertThat(contactPoints.contains(new 
SidecarInstanceImpl("instance-6", 8888))).isTrue();
+    }
+
+    @Test
+    void testDeserFailsWhenInstanceHasNoPort()
+    {
+        String json = "{\"cluster1\":" +
+                      
"{\"sidecarContactPoints\":[\"instance-1\",\"instance-2\",\"instance-3\"]," +
+                      "\"localDc\":\"dc1\"}}";
+        assertThatThrownBy(() -> CoordinatedWriteConf.create(json, 
CL.LOCAL_QUORUM, SimpleClusterConf.class))
+        .isExactlyInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Unable to parse json string into 
CoordinatedWriteConf of SimpleClusterConf")
+        .hasRootCauseExactlyInstanceOf(IllegalStateException.class)
+        .hasRootCauseMessage("Unable to resolve port from instance-1");
+    }
+
+    @Test
+    void testDeserFailsDueToMissingLocalDcWithNonLocalCL()
+    {
+        String json = 
"{\"cluster1\":{\"sidecarContactPoints\":[\"instance-1:8888\"]}}";
+        assertThatThrownBy(() -> CoordinatedWriteConf.create(json, 
CL.LOCAL_QUORUM, SimpleClusterConf.class))
+        .isExactlyInstanceOf(IllegalStateException.class)
+        .hasMessage("localDc is not configured for cluster: cluster1 for 
consistency level: LOCAL_QUORUM");
+    }
+
+    @Test
+    void testResolveLocalDc()
+    {
+        ClusterConf clusterWithLocalDc = new 
SimpleClusterConf(Collections.singletonList("instance-1:9999"), "dc1");
+        assertThat(clusterWithLocalDc.resolveLocalDc(CL.EACH_QUORUM))
+        .describedAs("Resolving localDc with Non-local CL should return null")
+        .isNull();
+        assertThat(clusterWithLocalDc.resolveLocalDc(CL.LOCAL_QUORUM))
+        .describedAs("Resolving localDc with local CL should return the actual 
localDc")
+        .isEqualTo("dc1");
+        ClusterConf clusterWithoutLocalDc = new 
SimpleClusterConf(Collections.singletonList("instance-1:9999"), null);
+        
assertThat(clusterWithoutLocalDc.resolveLocalDc(CL.EACH_QUORUM)).isNull();
+        assertThatThrownBy(() -> 
clusterWithoutLocalDc.resolveLocalDc(CL.LOCAL_QUORUM))
+        .isExactlyInstanceOf(IllegalStateException.class)
+        .hasMessage("No localDc is specified for local consistency level: " + 
CL.LOCAL_QUORUM);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to