This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new f123406 CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters (#79) f123406 is described below commit f123406e458c0112145f37dcd3f8c20ba47c949d Author: Yifan Cai <y...@apache.org> AuthorDate: Wed Sep 11 21:03:47 2024 -0700 CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters (#79) The option specifies the configuration (in JSON) for coordinated write. See org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf. When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and LOCAL_DC are ignored if they are present. Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for CASSANDRA-19909 --- CHANGES.txt | 1 + .../spark/common/model/CassandraInstance.java | 14 +- .../java/org/apache/cassandra/clients/Sidecar.java | 2 +- .../cassandra/spark/bulkwriter/BulkSparkConf.java | 56 +++++- .../spark/bulkwriter/CassandraClusterInfo.java | 44 ++--- .../spark/bulkwriter/CassandraContext.java | 6 +- .../spark/bulkwriter/CassandraJobInfo.java | 9 + .../cassandra/spark/bulkwriter/ClusterInfo.java | 20 +++ .../apache/cassandra/spark/bulkwriter/JobInfo.java | 7 + .../cassandra/spark/bulkwriter/RingInstance.java | 58 ++++-- .../cassandra/spark/bulkwriter/WriterOptions.java | 6 + .../coordinatedwrite/CoordinatedWriteConf.java | 199 +++++++++++++++++++++ .../spark/common/SidecarInstanceFactory.java | 12 ++ .../cassandra/spark/data/CassandraDataLayer.java | 12 +- .../spark/bulkwriter/BulkSparkConfTest.java | 66 +++++++ .../spark/bulkwriter/BulkWriteValidatorTest.java | 4 +- .../ImportCompletionCoordinatorTest.java | 1 + .../spark/bulkwriter/MockBulkWriterContext.java | 15 ++ .../bulkwriter/RingInstanceSerializationTest.java | 2 +- .../spark/bulkwriter/RingInstanceTest.java | 129 ++++++++----- .../spark/bulkwriter/TokenRangeMappingUtils.java | 10 +- .../coordinatedwrite/CoordinatedWriteConfTest.java | 118 ++++++++++++ 22 files changed, 691 insertions(+), 100 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 33bcdd0..8e70567 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters (CASSANDRA-19909) * Decouple Cassandra types from Spark types so Cassandra types can be used independently from Spark (CASSANDRA-19815) * Make the compression cache configurable to reduce heap pressure for large SSTables (CASSANDRA-19900) * Refactor TokenRangeMapping to use proper types instead of Strings (CASSANDRA-19901) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java index 3820efd..c69b6eb 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java @@ -20,16 +20,26 @@ package org.apache.cassandra.spark.common.model; import org.apache.cassandra.spark.data.model.TokenOwner; +import org.jetbrains.annotations.Nullable; public interface CassandraInstance extends TokenOwner { + /** + * @return ID string that can uniquely identify a cluster; the return value is nullable + */ + @Nullable String clusterId(); + + default boolean hasClusterId() + { + return clusterId() != null; + } + String nodeName(); String datacenter(); /** - * IP address string of a Cassandra instance. - * Mainly used in blocked instance list to identify instances. + * IP address string (w/o port) of a Cassandra instance. * Prefer to use {@link #ipAddressWithPort} as instance identifier, * unless knowing the compared is IP address without port for sure. */ diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 6d558dd..2f17040 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -184,7 +184,7 @@ public final class Sidecar } public static List<CompletableFuture<NodeSettings>> allNodeSettings(SidecarClient client, - Set<? extends SidecarInstance> instances) + Set<SidecarInstance> instances) { return instances.stream() .map(instance -> client diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index f03e743..3c45546 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClientConfig; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator; import org.apache.cassandra.spark.common.SidecarInstanceFactory; @@ -148,7 +150,9 @@ public class BulkSparkConf implements Serializable protected int ringRetryCount; // create sidecarInstances from sidecarContactPointsValue and effectiveSidecarPort private final String sidecarContactPointsValue; // It takes comma separated values - private transient Set<? extends SidecarInstance> sidecarContactPoints; // not serialized + private transient Set<SidecarInstance> sidecarContactPoints; // not serialized + private final String coordinatedWriteConfJson; + private transient CoordinatedWriteConf coordinatedWriteConf; // it is transient; deserialized from coordinatedWriteConfJson in executors public BulkSparkConf(SparkConf conf, Map<String, String> options) { @@ -223,7 +227,8 @@ public class BulkSparkConf implements Serializable } this.jobTimeoutSeconds = MapUtils.getLong(options, WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L); this.configuredJobId = MapUtils.getOrDefault(options, WriterOptions.JOB_ID.name(), null); - + this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, WriterOptions.COORDINATED_WRITE_CONFIG.name(), null); + this.coordinatedWriteConf = buildCoordinatedWriteConf(dataTransportInfo.getTransport()); validateEnvironment(); } @@ -263,7 +268,7 @@ public class BulkSparkConf implements Serializable }); } - protected Set<? extends SidecarInstance> buildSidecarContactPoints() + protected Set<SidecarInstance> buildSidecarContactPoints() { String[] split = Objects.requireNonNull(sidecarContactPointsValue, "Unable to build sidecar instances from null value") .split(","); @@ -273,7 +278,7 @@ public class BulkSparkConf implements Serializable .collect(Collectors.toSet()); } - Set<? extends SidecarInstance> sidecarContactPoints() + Set<SidecarInstance> sidecarContactPoints() { if (sidecarContactPoints == null) { @@ -282,6 +287,45 @@ public class BulkSparkConf implements Serializable return sidecarContactPoints; } + public boolean isCoordinatedWriteConfigured() + { + return coordinatedWriteConf != null; + } + + public CoordinatedWriteConf coordinatedWriteConf() + { + if (coordinatedWriteConf == null) + { + coordinatedWriteConf = buildCoordinatedWriteConf(dataTransportInfo.getTransport()); + } + + return coordinatedWriteConf; + } + + @Nullable + protected CoordinatedWriteConf buildCoordinatedWriteConf(DataTransport dataTransport) + { + if (coordinatedWriteConfJson == null) + { + return null; + } + + Preconditions.checkArgument(dataTransport == DataTransport.S3_COMPAT, + "Coordinated write only supports " + DataTransport.S3_COMPAT); + + if (sidecarContactPointsValue != null) + { + LOGGER.warn("SIDECAR_CONTACT_POINTS or SIDECAR_INSTANCES are ignored on the presence of COORDINATED_WRITE_CONF"); + } + + if (localDC != null) + { + LOGGER.warn("LOCAL_DC is ignored on the presence of COORDINATED_WRITE_CONF"); + } + + return CoordinatedWriteConf.create(coordinatedWriteConfJson, consistencyLevel, SimpleClusterConf.class); + } + protected void validateEnvironment() throws RuntimeException { Preconditions.checkNotNull(keyspace); @@ -351,12 +395,12 @@ public class BulkSparkConf implements Serializable return truststorePath; } - protected TTLOption getTTLOptions() + public TTLOption getTTLOptions() { return TTLOption.from(ttl); } - protected TimestampOption getTimestampOptions() + public TimestampOption getTimestampOptions() { return TimestampOption.from(timestamp); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index 359dc11..4165c7c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -36,7 +36,6 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import o.a.c.sidecar.client.shaded.common.response.GossipInfoResponse; import o.a.c.sidecar.client.shaded.common.response.NodeSettings; import o.a.c.sidecar.client.shaded.common.response.SchemaResponse; import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; @@ -63,19 +62,26 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfo.class); protected final BulkSparkConf conf; + protected final String clusterId; protected String cassandraVersion; protected Partitioner partitioner; - protected transient TokenRangeMapping<RingInstance> tokenRangeReplicas; - protected transient String keyspaceSchema; - protected transient GossipInfoResponse gossipInfo; - protected transient CassandraContext cassandraContext; + protected transient volatile TokenRangeMapping<RingInstance> tokenRangeReplicas; + protected transient volatile String keyspaceSchema; + protected transient volatile CassandraContext cassandraContext; protected final transient AtomicReference<NodeSettings> nodeSettings; protected final transient List<CompletableFuture<NodeSettings>> allNodeSettingFutures; public CassandraClusterInfo(BulkSparkConf conf) + { + this(conf, null); + } + + // Used by CassandraClusterInfoGroup + public CassandraClusterInfo(BulkSparkConf conf, String clusterId) { this.conf = conf; + this.clusterId = clusterId; this.cassandraContext = buildCassandraContext(); LOGGER.info("Getting Cassandra versions from all nodes"); this.nodeSettings = new AtomicReference<>(null); @@ -113,6 +119,12 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable } } + @Override + public String clusterId() + { + return clusterId; + } + /** * Gets a Cassandra Context * <p> @@ -191,15 +203,11 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable } @Override - public void refreshClusterInfo() + public synchronized void refreshClusterInfo() { - synchronized (this) - { - // Set backing stores to null and let them lazy-load on the next call - gossipInfo = null; - keyspaceSchema = null; - getCassandraContext().refreshClusterConfig(); - } + // Set backing stores to null and let them lazy-load on the next call + keyspaceSchema = null; + getCassandraContext().refreshClusterConfig(); } protected String getCurrentKeyspaceSchema() throws Exception @@ -232,13 +240,6 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable } } - private Set<String> readReplicasFromTokenRangeResponse(TokenRangeReplicasResponse response) - { - return response.readReplicas().stream() - .flatMap(rr -> rr.replicasByDatacenter().values().stream()) - .flatMap(List::stream).collect(Collectors.toSet()); - } - @NotNull protected ReplicationFactor getReplicationFactor() { @@ -287,7 +288,8 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable return topology; } - // Block for the call-sites requesting the latest view of the ring; but it is OK to serve the other call-sites that request for the cached view + // Block only for the call-sites requesting the latest view of the ring + // The other call-sites get the cached/stale view // We can avoid synchronization here if (topology != null) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java index 1da6083..41a7be1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraContext.java @@ -40,7 +40,7 @@ public class CassandraContext implements StartupValidatable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraContext.class); @NotNull - protected Set<? extends SidecarInstance> clusterConfig; + protected Set<SidecarInstance> clusterConfig; private final BulkSparkConf conf; private final transient SidecarClient sidecarClient; @@ -57,7 +57,7 @@ public class CassandraContext implements StartupValidatable, Closeable return new CassandraContext(conf); } - public Set<? extends SidecarInstance> getCluster() + public Set<SidecarInstance> getCluster() { return clusterConfig; } @@ -86,7 +86,7 @@ public class CassandraContext implements StartupValidatable, Closeable return Sidecar.from(new SimpleSidecarInstancesProvider(new ArrayList<>(clusterConfig)), conf); } - protected Set<? extends SidecarInstance> createClusterConfig() + protected Set<SidecarInstance> createClusterConfig() { return conf.sidecarContactPoints(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java index 39ba908..8e12f07 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java @@ -21,9 +21,11 @@ package org.apache.cassandra.spark.bulkwriter; import java.util.UUID; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.data.QualifiedTableName; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; public class CassandraJobInfo implements JobInfo { @@ -105,6 +107,13 @@ public class CassandraJobInfo implements JobInfo return conf.importCoordinatorTimeoutMultiplier; } + @Nullable + @Override + public CoordinatedWriteConf coordinatedWriteConf() + { + return conf.coordinatedWriteConf(); + } + @Override public int getCommitThreadsPerInstance() { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java index b6ed979..c6fe930 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ClusterInfo.java @@ -27,6 +27,7 @@ import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.validation.StartupValidatable; +import org.jetbrains.annotations.Nullable; public interface ClusterInfo extends StartupValidatable, Serializable { @@ -47,10 +48,29 @@ public interface ClusterInfo extends StartupValidatable, Serializable TimeSkewResponse getTimeSkew(List<RingInstance> replicas); + /** + * Return the keyspace schema string of the enclosing keyspace for bulk write in the cluster + * @param cached whether using the cached schema information + * @return keyspace schema string + */ String getKeyspaceSchema(boolean cached); CassandraContext getCassandraContext(); + /** + * ID string that can uniquely identify a cluster + * <p> + * Implementor note: the method is optional. When writing to a single cluster, there is no requirement of assigning an ID for bulk write to proceed. + * When in the coordinated write mode, i.e. writing to multiple clusters, the method must be implemented and return unique string for clusters. + * + * @return cluster id string, null if absent + */ + @Nullable + default String clusterId() + { + return null; + } + default void close() { } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index a454827..496580e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; import java.util.UUID; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.data.QualifiedTableName; import org.jetbrains.annotations.NotNull; @@ -100,4 +101,10 @@ public interface JobInfo extends Serializable * @return multiplier to calculate the final timeout for import coordinator */ double importCoordinatorTimeoutMultiplier(); + + /** + * @return CoordinatedWriteConf if configured, null otherwise + */ + @Nullable + CoordinatedWriteConf coordinatedWriteConf(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java index ae04447..f0edb71 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java @@ -25,6 +25,8 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Objects; +import com.google.common.annotations.VisibleForTesting; + import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata; import o.a.c.sidecar.client.shaded.common.response.data.RingEntry; import org.apache.cassandra.spark.common.model.CassandraInstance; @@ -36,14 +38,11 @@ public class RingInstance implements CassandraInstance, Serializable { private static final long serialVersionUID = 4399143234683369652L; private RingEntry ringEntry; + private @Nullable String clusterId; - public RingInstance(RingEntry ringEntry) - { - this.ringEntry = ringEntry; - } - - public RingInstance(ReplicaMetadata replica) + public RingInstance(ReplicaMetadata replica, @Nullable String clusterId) { + this.clusterId = clusterId; this.ringEntry = new RingEntry.Builder() .fqdn(replica.fqdn()) .address(replica.address()) @@ -54,6 +53,25 @@ public class RingInstance implements CassandraInstance, Serializable .build(); } + @VisibleForTesting + public RingInstance(RingEntry ringEntry) + { + this(ringEntry, null); + } + + @VisibleForTesting + public RingInstance(RingEntry ringEntry, @Nullable String clusterId) + { + this.clusterId = clusterId; + this.ringEntry = ringEntry; + } + + @VisibleForTesting + public RingInstance(ReplicaMetadata replica) + { + this(replica, null); + } + // Used only in tests @Override public String token() @@ -61,6 +79,12 @@ public class RingInstance implements CassandraInstance, Serializable return ringEntry.token(); } + @Override + public String clusterId() + { + return clusterId; + } + @Override public String nodeName() { @@ -98,9 +122,9 @@ public class RingInstance implements CassandraInstance, Serializable } /** - * Custom equality that compares the token, fully qualified domain name, the port, and the datacenter + * Custom equality that compares the token, fully qualified domain name, the port, the datacenter and the clusterId * - * Note that node state and status are not part of the calculation. + * Note that node state, status, are not part of the calculation. * * @param other the other instance * @return true if both instances are equal, false otherwise @@ -108,13 +132,20 @@ public class RingInstance implements CassandraInstance, Serializable @Override public boolean equals(@Nullable Object other) { - if (other == null || !(other instanceof RingInstance)) + if (this == other) + { + return true; + } + + if (other == null || getClass() != other.getClass()) { return false; } final RingInstance that = (RingInstance) other; - return Objects.equals(ringEntry.token(), that.ringEntry.token()) + return Objects.equals(clusterId, that.clusterId) + && Objects.equals(ringEntry.token(), that.ringEntry.token()) && Objects.equals(ringEntry.fqdn(), that.ringEntry.fqdn()) + && Objects.equals(ringEntry.rack(), that.ringEntry.rack()) && Objects.equals(ringEntry.address(), that.ringEntry.address()) && ringEntry.port() == that.ringEntry.port() && Objects.equals(ringEntry.datacenter(), that.ringEntry.datacenter()); @@ -130,7 +161,7 @@ public class RingInstance implements CassandraInstance, Serializable @Override public int hashCode() { - return Objects.hash(ringEntry.token(), ringEntry.fqdn(), ringEntry.port(), ringEntry.datacenter(), ringEntry.address()); + return Objects.hash(clusterId, ringEntry.token(), ringEntry.fqdn(), ringEntry.rack(), ringEntry.port(), ringEntry.datacenter(), ringEntry.address()); } @Override @@ -139,7 +170,7 @@ public class RingInstance implements CassandraInstance, Serializable return ringEntry.toString(); } - public RingEntry ringInstance() + public RingEntry ringEntry() { return ringEntry; } @@ -158,6 +189,7 @@ public class RingInstance implements CassandraInstance, Serializable out.writeObject(ringEntry.hostId()); out.writeObject(ringEntry.load()); out.writeObject(ringEntry.owns()); + out.writeObject(clusterId); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException @@ -174,6 +206,7 @@ public class RingInstance implements CassandraInstance, Serializable String hostId = (String) in.readObject(); String load = (String) in.readObject(); String owns = (String) in.readObject(); + String clusterId = (String) in.readObject(); ringEntry = new RingEntry.Builder().datacenter(datacenter) .address(address) .port(port) @@ -186,5 +219,6 @@ public class RingInstance implements CassandraInstance, Serializable .load(load) .owns(owns) .build(); + this.clusterId = clusterId; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 6c7eba7..1b36bd0 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -31,6 +31,12 @@ public enum WriterOptions implements WriterOption // The option specifies the initial contact points of sidecar servers to discover the cluster topology // Note that the addresses can include port; when port is present, it takes precedence over SIDECAR_PORT SIDECAR_CONTACT_POINTS, + /** + * The option specifies the configuration (in JSON) for coordinated write. + * See org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf. + * When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and LOCAL_DC are ignored if they are present. + */ + COORDINATED_WRITE_CONFIG, KEYSPACE, TABLE, BULK_WRITER_CL, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java new file mode 100644 index 0000000..63fc017 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; +import org.apache.cassandra.spark.common.SidecarInstanceFactory; +import org.jetbrains.annotations.Nullable; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toSet; + +/** + * Data class containing the configurations required for coordinated write. + * The serialization format is JSON string. The class takes care of serialization and deserialization. + */ +public class CoordinatedWriteConf +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CoordinatedWriteConf.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // The runtime type of ClusterConfProvider is erased; use clustersOf method to read the desired type back + private final Map<String, ClusterConf> clusters; + + /** + * Parse JSON string and create a CoordinatedWriteConf object with the specified ClusterConfProvider format + * + * @param json JSON string + * @param clusterConfType concrete type of ClusterConfProvider that can be used for JSON serialization and deserialization + * @return CoordinatedWriteConf object + * @param <T> subtype of ClusterConfProvider + */ + public static <T extends ClusterConf> + CoordinatedWriteConf create(String json, ConsistencyLevel.CL consistencyLevel, Class<T> clusterConfType) + { + JavaType javaType = TypeFactory.defaultInstance().constructMapType(Map.class, String.class, clusterConfType); + CoordinatedWriteConf result; + try + { + result = new CoordinatedWriteConf(OBJECT_MAPPER.readValue(json, javaType)); + } + catch (Exception e) + { + throw new IllegalArgumentException("Unable to parse json string into CoordinatedWriteConf of " + clusterConfType.getSimpleName() + + " due to " + e.getMessage(), e); + } + result.clusters().forEach((clusterId, cluster) -> { + if (consistencyLevel.isLocal()) + { + Preconditions.checkState(!StringUtils.isEmpty(cluster.localDc()), + "localDc is not configured for cluster: " + clusterId + " for consistency level: " + consistencyLevel); + } + else + { + if (cluster.localDc() != null) + { + LOGGER.warn("Ignoring the localDc configured for cluster, when consistency level is non-local. cluster={} consistencyLevel={}", + clusterId, consistencyLevel); + } + } + }); + return result; + } + + public CoordinatedWriteConf(Map<String, ? extends ClusterConf> clusters) + { + this.clusters = Collections.unmodifiableMap(clusters); + } + + public Map<String, ClusterConf> clusters() + { + return clusters; + } + + @Nullable + public ClusterConf cluster(String clusterId) + { + return clusters.get(clusterId); + } + + public <T extends ClusterConf> Map<String, T> clustersOf(Class<T> clusterConfType) + { + // verify that map type can cast; there are only limited number of values and check is cheap + clusters.values().forEach(v -> Preconditions.checkState(clusterConfType.isInstance(v), + "ClusterConfProvider value is not instance of " + clusterConfType)); + return (Map<String, T>) clusters; + } + + public String toJson() throws JsonProcessingException + { + return OBJECT_MAPPER.writeValueAsString(clusters); + } + + public interface ClusterConf + { + Set<SidecarInstance> sidecarContactPoints(); + + @Nullable + String localDc(); + + @Nullable + default String resolveLocalDc(ConsistencyLevel.CL cl) + { + String localDc = localDc(); + boolean hasLocalDc = !StringUtils.isEmpty(localDc()); + if (!cl.isLocal() && hasLocalDc) + { + return null; + } + if (cl.isLocal() && !hasLocalDc) + { + throw new IllegalStateException("No localDc is specified for local consistency level: " + cl); + } + return localDc; + } + } + + @JsonIgnoreProperties(ignoreUnknown = true) + @JsonInclude(JsonInclude.Include.NON_NULL) + public static class SimpleClusterConf implements ClusterConf + { + private final List<String> sidecarContactPointsValue; + private final Set<SidecarInstance> sidecarContactPoints; + private final @Nullable String localDc; + + @JsonCreator + public SimpleClusterConf(@JsonProperty("sidecarContactPoints") List<String> sidecarContactPointsValue, + @JsonProperty("localDc") String localDc) + { + this.sidecarContactPointsValue = sidecarContactPointsValue; + this.sidecarContactPoints = buildSidecarContactPoints(sidecarContactPointsValue); + this.localDc = localDc; + } + + @JsonProperty("sidecarContactPoints") + public List<String> sidecarContactPointsValue() + { + return sidecarContactPointsValue; + } + + @Nullable + @Override + @JsonProperty("localDc") + public String localDc() + { + return localDc; + } + + @Override + public Set<SidecarInstance> sidecarContactPoints() + { + return sidecarContactPoints; + } + + private Set<SidecarInstance> buildSidecarContactPoints(List<String> sidecarContactPoints) + { + return sidecarContactPoints.stream() + .filter(StringUtils::isNotEmpty) + .map(SidecarInstanceFactory::createFromString) + .collect(collectingAndThen(toSet(), Collections::unmodifiableSet)); + } + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java index cc6fb09..ce69703 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/SidecarInstanceFactory.java @@ -58,7 +58,19 @@ public class SidecarInstanceFactory port = Integer.parseInt(portStr); } + Preconditions.checkState(port != -1, "Unable to resolve port from %s", input); + LOGGER.info("Create sidecar instance. hostname={} port={}", hostname, port); return new SidecarInstanceImpl(hostname, port); } + + /** + * Similar to {@link SidecarInstanceFactory#createFromString(String, int)}, but it requires that the input string must include port + * @param hostnameWithPort hostname with port + * @return SidecarInstanceImpl + */ + public static SidecarInstanceImpl createFromString(String hostnameWithPort) + { + return createFromString(hostnameWithPort, -1); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index 5e26deb..369e58e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -124,7 +124,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV // create clusterConfig from sidecarInstances and sidecarPort, see initializeClusterConfig protected String sidecarInstances; protected int sidecarPort; - protected transient Set<? extends SidecarInstance> clusterConfig; + protected transient Set<SidecarInstance> clusterConfig; protected TokenPartitioner tokenPartitioner; protected Map<String, AvailabilityHint> availabilityHints; protected Sidecar.ClientConfig sidecarClientConfig; @@ -907,13 +907,13 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV } } - protected Set<? extends SidecarInstance> initializeClusterConfig(ClientConfig options) + protected Set<SidecarInstance> initializeClusterConfig(ClientConfig options) { return initializeClusterConfig(options.sidecarContactPoints, options.sidecarPort()); } // not intended to be overridden - private Set<? extends SidecarInstance> initializeClusterConfig(String sidecarInstances, int sidecarPort) + private Set<SidecarInstance> initializeClusterConfig(String sidecarInstances, int sidecarPort) { return Arrays.stream(sidecarInstances.split(",")) .filter(StringUtils::isNotEmpty) @@ -921,7 +921,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV .collect(Collectors.toSet()); } - protected String getEffectiveCassandraVersionForRead(Set<? extends SidecarInstance> clusterConfig, + protected String getEffectiveCassandraVersionForRead(Set<SidecarInstance> clusterConfig, NodeSettings nodeSettings) { return nodeSettings.releaseVersion(); @@ -932,7 +932,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV LOGGER.info("Dial home. clientConfig={}", options); } - protected void clearSnapshot(Set<? extends SidecarInstance> clusterConfig, @NotNull ClientConfig options) + protected void clearSnapshot(Set<SidecarInstance> clusterConfig, @NotNull ClientConfig options) { if (maybeQuotedKeyspace == null || maybeQuotedTable == null) { @@ -982,7 +982,7 @@ public class CassandraDataLayer extends PartitionedDataLayer implements StartupV * @param options the {@link ClientConfig} options * @return the {@link Sizing} object based on the {@code sizing} option provided by the user */ - protected Sizing getSizing(Set<? extends SidecarInstance> clusterConfig, + protected Sizing getSizing(Set<SidecarInstance> clusterConfig, ReplicationFactor replicationFactor, ClientConfig options) { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index d48ace8..3130fc1 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -28,6 +28,8 @@ import com.google.common.collect.Maps; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator; import org.apache.cassandra.spark.utils.BuildInfo; import org.apache.spark.SparkConf; @@ -275,6 +277,70 @@ class BulkSparkConfTest .isNull(); } + @Test + void testReadCoordinatedWriteConfFails() + { + Map<String, String> options = copyDefaultOptions(); + String coordinatedWriteConfJsonNoLocalDc = "{\"cluster1\":" + + "{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"]}}"; + + options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), coordinatedWriteConfJsonNoLocalDc); + assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage("Coordinated write only supports S3_COMPAT"); + + options.put(WriterOptions.DATA_TRANSPORT.name(), DataTransport.S3_COMPAT.name()); + options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM"); + assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options)) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("localDc is not configured for cluster: cluster1 for consistency level: LOCAL_QUORUM"); + + options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), "invalid json"); + assertThatThrownBy(() -> new BulkSparkConf(sparkConf, options)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unable to parse json string into CoordinatedWriteConf of SimpleClusterConf due to Unrecognized token 'invalid'"); + } + + @Test + void testCoordinatedWriteConf() + { + Map<String, String> options = copyDefaultOptions(); + options.remove(WriterOptions.COORDINATED_WRITE_CONFIG.name()); + BulkSparkConf conf = new BulkSparkConf(sparkConf, options); + assertThat(conf.isCoordinatedWriteConfigured()) + .describedAs("When COORDINATED_WRITE_CONF is absent, isCoordinatedWriteConfigured should return false") + .isFalse(); + + String coordinatedWriteConfJson = "{\"cluster1\":" + + "{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"]," + + "\"localDc\":\"dc1\"}," + + "\"cluster2\":" + + "{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"]," + + "\"localDc\":\"dc1\"}}"; + options.put(WriterOptions.DATA_TRANSPORT.name(), DataTransport.S3_COMPAT.name()); + options.put(WriterOptions.BULK_WRITER_CL.name(), "LOCAL_QUORUM"); + options.put(WriterOptions.COORDINATED_WRITE_CONFIG.name(), coordinatedWriteConfJson); + conf = new BulkSparkConf(sparkConf, options); + assertThat(conf.isCoordinatedWriteConfigured()) + .describedAs("When COORDINATED_WRITE_CONF is present, it should return true") + .isTrue(); + assertThat(conf.coordinatedWriteConf().clusters()).containsOnlyKeys("cluster1", "cluster2"); + CoordinatedWriteConf.ClusterConf cluster1 = conf.coordinatedWriteConf().cluster("cluster1"); + assertThat(cluster1).isNotNull(); + assertThat(cluster1.sidecarContactPoints()) + .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-1", 9999), + new SidecarInstanceImpl("instance-2", 9999), + new SidecarInstanceImpl("instance-3", 9999)); + assertThat(cluster1.localDc()).isEqualTo("dc1"); + CoordinatedWriteConf.ClusterConf cluster2 = conf.coordinatedWriteConf().cluster("cluster2"); + assertThat(cluster2).isNotNull(); + assertThat(cluster2.sidecarContactPoints()) + .containsExactlyInAnyOrder(new SidecarInstanceImpl("instance-4", 8888), + new SidecarInstanceImpl("instance-5", 8888), + new SidecarInstanceImpl("instance-6", 8888)); + assertThat(cluster2.localDc()).isEqualTo("dc1"); + } + private Map<String, String> copyDefaultOptions() { TreeMap<String, String> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java index 5428363..59d7705 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java @@ -62,9 +62,7 @@ class BulkWriteValidatorTest for (RingInstance instance : topology.getTokenRanges().keySet()) { // Mark nodes 0, 1, 2 as DOWN - int nodeId = Integer.parseInt(instance.ipAddress() - .replace("localhost", "") - .replace(":9042", "")); + int nodeId = Integer.parseInt(instance.ipAddress().replace("localhost", "")); instanceAvailabilityMap.put(instance, (nodeId <= 2) ? WriteAvailability.UNAVAILABLE_DOWN : WriteAvailability.AVAILABLE); } when(mockClusterInfo.clusterWriteAvailability()).thenReturn(instanceAvailabilityMap); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java index f84280f..578781b 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java @@ -494,6 +494,7 @@ class ImportCompletionCoordinatorTest int instanceInRing = i % totalInstances + 1; return new RingInstance(new RingEntry.Builder() .datacenter("DC1") + .rack("Rack") .address("127.0.0." + instanceInRing) .token(String.valueOf(i * 100_000)) .fqdn("DC1-i" + instanceInRing) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index d93a217..33b09a2 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -46,6 +46,7 @@ import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; @@ -63,6 +64,7 @@ import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; @@ -210,6 +212,12 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return null; } + @Override + public String clusterId() + { + return "test-cluster"; + } + @Override public void refreshClusterInfo() { @@ -303,6 +311,13 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return 2.0; } + @Nullable + @Override + public CoordinatedWriteConf coordinatedWriteConf() + { + return null; + } + public void setSkipCleanOnFailures(boolean skipClean) { this.skipClean = skipClean; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java index d15db89..333eabb 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceSerializationTest.java @@ -69,7 +69,7 @@ public class RingInstanceSerializationTest 7000, dataCenter); - RingInstance ring = new RingInstance(metadata); + RingInstance ring = new RingInstance(metadata, "test-cluster"); byte[] bytes = serialize(ring); RingInstance deserialized = deserialize(bytes, RingInstance.class); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java index da2f1af..0846e8f 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java @@ -31,21 +31,23 @@ import java.util.List; import java.util.Map; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import org.junit.jupiter.api.Test; import o.a.c.sidecar.client.shaded.common.response.data.RingEntry; -import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.NotNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RingInstanceTest { @@ -123,58 +125,75 @@ public class RingInstanceTest @Test public void testEquals() { - RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); - RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); + RingInstance instance1 = mockRingInstance(); + RingInstance instance2 = mockRingInstance(); assertEquals(instance1, instance2); } @Test public void testHashCode() { - RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); - RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); + RingInstance instance1 = mockRingInstance(); + RingInstance instance2 = mockRingInstance(); assertEquals(instance1.hashCode(), instance2.hashCode()); } @Test - public void testEqualsAndHashcodeIgnoreHost() - { - RingInstance realInstance = new RingInstance(new RingEntry.Builder() - .datacenter("DATACENTER1") - .address("127.0.0.1") - .port(7000) - .rack("Rack") - .status("UP") - .state("NORMAL") - .load("0") - .token("0") - .fqdn("fqdn") - .hostId("") - .owns("") - .build()); - - RingInstance questionInstance = new RingInstance(new RingEntry.Builder() - .datacenter("DATACENTER1") - .address("127.0.0.1") - .port(7000) - .rack("Rack") - .status("UP") - .state("NORMAL") - .load("0") - .token("0") - .fqdn("fqdn") - .hostId("") - .owns("") - .build()); - assertEquals(realInstance, questionInstance); - assertEquals(realInstance.hashCode(), questionInstance.hashCode()); + public void testEqualsAndHashcodeIgnoreNonCriticalFields() + { + RingEntry.Builder builder = mockRingEntryBuilder(); + // the fields chained in the builder below are not considered for equality check + RingInstance instance1 = new RingInstance(builder + .status("1") + .state("1") + .load("1") + .hostId("1") + .owns("1") + .build()); + RingInstance instance2 = new RingInstance(builder + .status("2") + .state("2") + .load("2") + .hostId("2") + .owns("2") + .build()); + assertEquals(instance1, instance2); + assertEquals(instance1.hashCode(), instance2.hashCode()); + } + + @Test + public void testEqualsAndHashcodeConsidersClusterId() + { + RingEntry ringEntry = mockRingEntry(); + RingInstance c1i1 = new RingInstance(ringEntry, "cluster1"); + RingInstance c1i2 = new RingInstance(ringEntry, "cluster1"); + RingInstance c2i1 = new RingInstance(ringEntry, "cluster2"); + + assertEquals(c1i1, c1i2); + assertEquals(c1i1.hashCode(), c1i2.hashCode()); + + assertNotEquals(c1i1, c2i1); + assertNotEquals(c1i1.hashCode(), c2i1.hashCode()); + } + + @Test + public void testHasClusterId() + { + RingEntry ringEntry = mockRingEntry(); + RingInstance instance = new RingInstance(ringEntry); + assertFalse(instance.hasClusterId()); + + RingInstance instanceWithClusterId = new RingInstance(ringEntry, "cluster1"); + assertTrue(instanceWithClusterId.hasClusterId()); + assertEquals("cluster1", instanceWithClusterId.clusterId()); } @Test public void multiMapWorksWithRingInstances() { - RingInstance instance1 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); - RingInstance instance2 = TokenRangeMappingUtils.getInstances(0, ImmutableMap.of("DATACENTER1", 1), 1).get(0); + RingEntry ringEntry = mockRingEntry(); + RingInstance instance1 = new RingInstance(ringEntry); + RingInstance instance2 = new RingInstance(ringEntry); byte[] buffer; try @@ -224,7 +243,35 @@ public class RingInstanceTest replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE), tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2"); - replicationFactor3.getFailedRanges(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1); - assertFalse(replicationFactor3.hasFailed(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1)); + assertTrue(replicationFactor3.getFailedRanges(tokenRange, CL.LOCAL_QUORUM, DATACENTER_1).isEmpty()); + } + + @NotNull + private static RingEntry mockRingEntry() + { + return mockRingEntryBuilder().build(); + } + + @NotNull + private static RingEntry.Builder mockRingEntryBuilder() + { + return new RingEntry.Builder() + .datacenter("DATACENTER1") + .address("127.0.0.1") + .port(0) + .rack("Rack") + .status("UP") + .state("NORMAL") + .load("0") + .token("0") + .fqdn("DATACENTER1-i1") + .hostId("") + .owns(""); + } + + @NotNull + private static RingInstance mockRingInstance() + { + return new RingInstance(mockRingEntry()); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java index 9c53279..202d3a3 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java @@ -61,7 +61,7 @@ public final class TokenRangeMappingUtils { List<RingInstance> instances = getInstances(initialToken, rfByDC, instancesPerDC); RingInstance instance = instances.remove(0); - RingEntry entry = instance.ringInstance(); + RingEntry entry = instance.ringEntry(); RingEntry newEntry = new RingEntry.Builder() .datacenter(entry.datacenter()) .port(entry.port()) @@ -97,7 +97,7 @@ public final class TokenRangeMappingUtils if (shouldUpdateToken) { RingInstance instance = instances.remove(0); - RingEntry entry = instance.ringInstance(); + RingEntry entry = instance.ringEntry(); RingEntry newEntry = new RingEntry.Builder() .datacenter(entry.datacenter()) .port(entry.port()) @@ -218,9 +218,11 @@ public final class TokenRangeMappingUtils replicasPerDc.put("ignored", replicas); ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), String.valueOf(endToken), replicasPerDc); replicaInfoList.add(ri); - String address = "localhost" + i + ":9042"; + String address = "localhost" + i; + int port = 9042; + String addressWithPort = address + ":" + port; ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, address, 9042, "ignored"); - replicaMetadata.put(address, rm); + replicaMetadata.put(addressWithPort, rm); startToken = endToken; } return new TokenRangeReplicasResponse(replicaInfoList, replicaInfoList, replicaMetadata); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java new file mode 100644 index 0000000..be15a6a --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConf; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class CoordinatedWriteConfTest +{ + @Test + void testSerDeser() throws JsonProcessingException + { + Map<String, SimpleClusterConf> clusters = new HashMap<>(); + clusters.put("cluster1", new SimpleClusterConf(Arrays.asList("instance-1:9999", "instance-2:9999", "instance-3:9999"), "dc1")); + clusters.put("cluster2", new SimpleClusterConf(Arrays.asList("instance-4:8888", "instance-5:8888", "instance-6:8888"), "dc1")); + CoordinatedWriteConf conf = new CoordinatedWriteConf(clusters); + String json = conf.toJson(); + assertThat(json) + .isEqualTo("{\"cluster1\":" + + "{\"sidecarContactPoints\":[\"instance-1:9999\",\"instance-2:9999\",\"instance-3:9999\"]," + + "\"localDc\":\"dc1\"}," + + "\"cluster2\":" + + "{\"sidecarContactPoints\":[\"instance-4:8888\",\"instance-5:8888\",\"instance-6:8888\"]," + + "\"localDc\":\"dc1\"}}"); + CoordinatedWriteConf deser = CoordinatedWriteConf.create(json, CL.LOCAL_QUORUM, SimpleClusterConf.class); + assertThat(deser.clusters()).containsKeys("cluster1", "cluster2"); + assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster1").sidecarContactPointsValue()) + .isEqualTo(Arrays.asList("instance-1:9999", "instance-2:9999", "instance-3:9999")); + assertThat(deser.clustersOf(SimpleClusterConf.class).get("cluster2").sidecarContactPointsValue()) + .isEqualTo(Arrays.asList("instance-4:8888", "instance-5:8888", "instance-6:8888")); + assertThat(deser.clusters().get("cluster1").localDc()).isEqualTo("dc1"); + assertThat(deser.clusters().get("cluster2").localDc()).isEqualTo("dc1"); + Set<SidecarInstance> contactPoints = deser.clusters().get("cluster1").sidecarContactPoints(); + assertThat(contactPoints) + .hasSize(3); + // assertj contains method does not compile with SidecarInstanceImpl due to type erasure + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-1", 9999))).isTrue(); + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-2", 9999))).isTrue(); + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-3", 9999))).isTrue(); + contactPoints = deser.clusters().get("cluster2").sidecarContactPoints(); + assertThat(contactPoints) + .hasSize(3); + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-4", 8888))).isTrue(); + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-5", 8888))).isTrue(); + assertThat(contactPoints.contains(new SidecarInstanceImpl("instance-6", 8888))).isTrue(); + } + + @Test + void testDeserFailsWhenInstanceHasNoPort() + { + String json = "{\"cluster1\":" + + "{\"sidecarContactPoints\":[\"instance-1\",\"instance-2\",\"instance-3\"]," + + "\"localDc\":\"dc1\"}}"; + assertThatThrownBy(() -> CoordinatedWriteConf.create(json, CL.LOCAL_QUORUM, SimpleClusterConf.class)) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unable to parse json string into CoordinatedWriteConf of SimpleClusterConf") + .hasRootCauseExactlyInstanceOf(IllegalStateException.class) + .hasRootCauseMessage("Unable to resolve port from instance-1"); + } + + @Test + void testDeserFailsDueToMissingLocalDcWithNonLocalCL() + { + String json = "{\"cluster1\":{\"sidecarContactPoints\":[\"instance-1:8888\"]}}"; + assertThatThrownBy(() -> CoordinatedWriteConf.create(json, CL.LOCAL_QUORUM, SimpleClusterConf.class)) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("localDc is not configured for cluster: cluster1 for consistency level: LOCAL_QUORUM"); + } + + @Test + void testResolveLocalDc() + { + ClusterConf clusterWithLocalDc = new SimpleClusterConf(Collections.singletonList("instance-1:9999"), "dc1"); + assertThat(clusterWithLocalDc.resolveLocalDc(CL.EACH_QUORUM)) + .describedAs("Resolving localDc with Non-local CL should return null") + .isNull(); + assertThat(clusterWithLocalDc.resolveLocalDc(CL.LOCAL_QUORUM)) + .describedAs("Resolving localDc with local CL should return the actual localDc") + .isEqualTo("dc1"); + ClusterConf clusterWithoutLocalDc = new SimpleClusterConf(Collections.singletonList("instance-1:9999"), null); + assertThat(clusterWithoutLocalDc.resolveLocalDc(CL.EACH_QUORUM)).isNull(); + assertThatThrownBy(() -> clusterWithoutLocalDc.resolveLocalDc(CL.LOCAL_QUORUM)) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("No localDc is specified for local consistency level: " + CL.LOCAL_QUORUM); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org