This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
commit d1d0dd70951c9997ca7f9eeb184da64a0eb8fed7 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Tue Apr 2 12:01:49 2024 -0700 Ninja fix for CASSANDRA-19340 Revert "Make sure bridge exists" This reverts commit 98baab1b8f0d5d7eb93f8d13db3b0a7a985fb03a. We revert this commit because the commit message was lost during merge. We immediately add the same commit with the correct commit message, to avoid rewriting git history. --- .circleci/config.yml | 32 ++--- CHANGES.txt | 1 - .../spark/bulkwriter/BulkWriterContext.java | 3 - .../bulkwriter/CassandraBulkWriterContext.java | 26 +--- .../spark/bulkwriter/CassandraSchemaInfo.java | 14 +- .../spark/bulkwriter/CqlTableInfoProvider.java | 17 +-- .../cassandra/spark/bulkwriter/RecordWriter.java | 77 ++--------- .../cassandra/spark/bulkwriter/SSTableWriter.java | 8 +- .../spark/bulkwriter/SSTableWriterFactory.java | 4 - .../cassandra/spark/bulkwriter/SchemaInfo.java | 6 - .../spark/bulkwriter/SqlToCqlTypeConverter.java | 127 ++++++------------ .../spark/bulkwriter/token/TokenUtils.java | 3 +- .../cassandra/spark/data/LocalDataLayer.java | 17 --- .../spark/bulkwriter/MockBulkWriterContext.java | 14 -- .../bulkwriter/SqlToCqlTypeConverterTest.java | 2 - .../spark/bulkwriter/TableSchemaNormalizeTest.java | 28 +--- .../spark/bulkwriter/TableSchemaTest.java | 3 +- .../spark/bulkwriter/TableSchemaTestCommon.java | 31 ----- .../testing/SharedClusterIntegrationTestBase.java | 53 +------- cassandra-analytics-integration-tests/build.gradle | 1 - .../cassandra/analytics/BulkWriteUdtTest.java | 145 --------------------- .../cassandra/analytics/DataGenerationUtils.java | 77 +---------- .../analytics/QuoteIdentifiersWriteTest.java | 71 +--------- .../SharedClusterSparkIntegrationTestBase.java | 88 ------------- .../apache/cassandra/bridge/CassandraBridge.java | 1 - .../cassandra/spark/data/BridgeUdtValue.java | 69 ---------- .../bridge/CassandraBridgeImplementation.java | 4 +- .../bridge/SSTableWriterImplementation.java | 30 ++--- .../cassandra/spark/data/complex/CqlUdt.java | 4 - .../bridge/SSTableWriterImplementationTest.java | 2 - 30 files changed, 104 insertions(+), 854 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b7603fc..9d5d7ef 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -147,7 +147,7 @@ jobs: - "*.jar" - "org/**/*" - spark2-2_11-jdk8: + cassandra-analytics-core-spark2-2_11-jdk8: docker: - image: cimg/openjdk:8.0 resource_class: large @@ -172,7 +172,7 @@ jobs: - store_test_results: path: build/test-reports - int-spark2-2_11-jdk8: + cassandra-analytics-core-int-spark2-2_11-jdk8: parallelism: 8 docker: - image: cimg/openjdk:8.0 @@ -198,7 +198,7 @@ jobs: - store_test_results: path: build/test-reports - spark2-2_12-jdk8: + cassandra-analytics-core-spark2-2_12-jdk8: docker: - image: cimg/openjdk:8.0 resource_class: large @@ -223,7 +223,7 @@ jobs: - store_test_results: path: build/test-reports - int-spark2-2_12-jdk8: + cassandra-analytics-core-int-spark2-2_12-jdk8: parallelism: 8 docker: - image: cimg/openjdk:8.0 @@ -249,7 +249,7 @@ jobs: - store_test_results: path: build/test-reports - spark3-2_12-jdk11: + cassandra-analytics-core-spark3-2_12-jdk11: docker: - image: cimg/openjdk:11.0 resource_class: large @@ -275,7 +275,7 @@ jobs: - store_test_results: path: build/test-reports - int-spark3-2_12-jdk11: + cassandra-analytics-core-int-spark3-2_12-jdk11: parallelism: 8 docker: - image: cimg/openjdk:11.0 @@ -302,7 +302,7 @@ jobs: - store_test_results: path: build/test-reports - spark3-2_13-jdk11: + cassandra-analytics-core-spark3-2_13-jdk11: docker: - image: cimg/openjdk:11.0 resource_class: large @@ -328,7 +328,7 @@ jobs: - store_test_results: path: build/test-reports - int-spark3-2_13-jdk11: + cassandra-analytics-core-int-spark3-2_13-jdk11: parallelism: 8 docker: - image: cimg/openjdk:11.0 @@ -361,27 +361,27 @@ workflows: jobs: - build-dependencies-jdk8 - build-dependencies-jdk11 - - spark2-2_11-jdk8: + - cassandra-analytics-core-spark2-2_11-jdk8: requires: - build-dependencies-jdk8 - - spark2-2_12-jdk8: + - cassandra-analytics-core-spark2-2_12-jdk8: requires: - build-dependencies-jdk8 - - spark3-2_12-jdk11: + - cassandra-analytics-core-spark3-2_12-jdk11: requires: - build-dependencies-jdk11 - - spark3-2_13-jdk11: + - cassandra-analytics-core-spark3-2_13-jdk11: requires: - build-dependencies-jdk11 - - int-spark2-2_11-jdk8: + - cassandra-analytics-core-int-spark2-2_11-jdk8: requires: - build-dependencies-jdk8 - - int-spark2-2_12-jdk8: + - cassandra-analytics-core-int-spark2-2_12-jdk8: requires: - build-dependencies-jdk8 - - int-spark3-2_12-jdk11: + - cassandra-analytics-core-int-spark3-2_12-jdk11: requires: - build-dependencies-jdk11 - - int-spark3-2_13-jdk11: + - cassandra-analytics-core-int-spark3-2_13-jdk11: requires: - build-dependencies-jdk11 diff --git a/CHANGES.txt b/CHANGES.txt index 741584c..914d933 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,4 @@ 1.0.0 - * Support UDTs in the Bulk Writer (CASSANDRA-19340) * Fix bulk reads of multiple tables that potentially have the same data file name (CASSANDRA-19507) * Fix XXHash32Digest calculated digest value (CASSANDRA-19500) * Report additional bulk analytics job stats for instrumentation (CASSANDRA-19418) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java index 945f8a2..10928d4 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContext.java @@ -22,7 +22,6 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; import org.apache.cassandra.spark.common.stats.JobStatsPublisher; -import org.apache.cassandra.bridge.CassandraBridge; public interface BulkWriterContext extends Serializable { @@ -36,8 +35,6 @@ public interface BulkWriterContext extends Serializable DataTransferApi transfer(); - CassandraBridge bridge(); - // NOTE: This interface intentionally does *not* implement AutoClosable as Spark can close Broadcast variables // that implement AutoClosable while they are still in use, causing the underlying object to become unusable void shutdown(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 0999604..84d100c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -54,8 +54,6 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial @NotNull private final BulkSparkConf conf; private final JobInfo jobInfo; - private final String lowestCassandraVersion; - private transient CassandraBridge bridge; private transient DataTransferApi dataTransferApi; private final CassandraClusterInfo clusterInfo; private final SchemaInfo schemaInfo; @@ -70,8 +68,9 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial this.conf = conf; this.clusterInfo = clusterInfo; this.jobStatsPublisher = new LogStatsPublisher(); - lowestCassandraVersion = clusterInfo.getLowestCassandraVersion(); - this.bridge = CassandraBridgeFactory.get(lowestCassandraVersion); + String lowestCassandraVersion = clusterInfo.getLowestCassandraVersion(); + CassandraBridge bridge = CassandraBridgeFactory.get(lowestCassandraVersion); + TokenRangeMapping<RingInstance> tokenRangeMapping = clusterInfo.getTokenRangeMapping(true); jobInfo = new CassandraJobInfo(conf, new TokenPartitioner(tokenRangeMapping, @@ -93,23 +92,11 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial Set<String> udts = CqlUtils.extractUdts(keyspaceSchema, keyspace); ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(keyspaceSchema, keyspace); int indexCount = CqlUtils.extractIndexCount(keyspaceSchema, keyspace, table); - CqlTable cqlTable = bridge().buildSchema(createTableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount); + CqlTable cqlTable = bridge.buildSchema(createTableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount); TableInfoProvider tableInfoProvider = new CqlTableInfoProvider(createTableSchema, cqlTable); TableSchema tableSchema = initializeTableSchema(conf, dfSchema, tableInfoProvider, lowestCassandraVersion); - schemaInfo = new CassandraSchemaInfo(tableSchema, udts, cqlTable); - } - - @Override - public CassandraBridge bridge() - { - CassandraBridge currentBridge = this.bridge; - if (currentBridge != null) - { - return currentBridge; - } - this.bridge = CassandraBridgeFactory.get(lowestCassandraVersion); - return bridge; + schemaInfo = new CassandraSchemaInfo(tableSchema); } public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, @@ -217,8 +204,9 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial { if (dataTransferApi == null) { + CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext(), - bridge(), + bridge, jobInfo, conf); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java index 7c63320..d55b49b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraSchemaInfo.java @@ -19,20 +19,14 @@ package org.apache.cassandra.spark.bulkwriter; -import java.util.Set; - -import org.apache.cassandra.spark.data.CqlTable; - public class CassandraSchemaInfo implements SchemaInfo { private static final long serialVersionUID = -2327383232935001862L; private final TableSchema tableSchema; - private final Set<String> userDefinedTypeStatements; - public CassandraSchemaInfo(TableSchema tableSchema, Set<String> userDefinedTypeStatements, CqlTable cqlTable) + public CassandraSchemaInfo(TableSchema tableSchema) { this.tableSchema = tableSchema; - this.userDefinedTypeStatements = userDefinedTypeStatements; } @Override @@ -40,10 +34,4 @@ public class CassandraSchemaInfo implements SchemaInfo { return tableSchema; } - - @Override - public Set<String> getUserDefinedTypeStatements() - { - return userDefinedTypeStatements; - } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java index 29977cf..e512d27 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java @@ -98,20 +98,9 @@ public class CqlTableInfoProvider implements TableInfoProvider @Override public List<ColumnType<?>> getPartitionKeyTypes() { - List<ColumnType<?>> types = cqlTable.partitionKeys().stream() - .map(cqlField -> { - String typeName = cqlField.type().cqlName().toLowerCase(); - ColumnType<?> type = DATA_TYPES.get(typeName); - if (type == null) - { - throw new RuntimeException( - "Could not find ColumnType for type name" + typeName); - } - return type; - }) - .collect(Collectors.toList()); - return types; - + return cqlTable.partitionKeys().stream() + .map(cqlField -> DATA_TYPES.get(cqlField.type().cqlName().toLowerCase())) + .collect(Collectors.toList()); } @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index b6fea5a..232c461 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -37,7 +37,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -45,7 +44,6 @@ import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +51,6 @@ import org.slf4j.LoggerFactory; import o.a.c.sidecar.client.shaded.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -import org.apache.cassandra.spark.data.BridgeUdtValue; -import org.apache.cassandra.spark.data.CqlField; -import org.apache.cassandra.spark.data.CqlTable; -import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.utils.DigestAlgorithm; import org.apache.spark.InterruptibleIterator; import org.apache.spark.TaskContext; @@ -67,8 +61,6 @@ import static org.apache.cassandra.spark.utils.ScalaConversionUtils.asScalaItera @SuppressWarnings({ "ConstantConditions" }) public class RecordWriter implements Serializable { - public static final ReplicationFactor IGNORED_REPLICATION_FACTOR = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.SimpleStrategy, - ImmutableMap.of("replication_factor", 1)); private static final Logger LOGGER = LoggerFactory.getLogger(RecordWriter.class); private static final long serialVersionUID = 3746578054834640428L; private final BulkWriterContext writerContext; @@ -80,10 +72,8 @@ public class RecordWriter implements Serializable private final ReplicaAwareFailureHandler<RingInstance> failureHandler; private final Supplier<TaskContext> taskContextSupplier; - private final ConcurrentHashMap<String, CqlField.CqlUdt> udtCache = new ConcurrentHashMap<>(); private SSTableWriter sstableWriter = null; private int outputSequence = 0; // sub-folder for possible subrange splits - private transient volatile CqlTable cqlTable; public RecordWriter(BulkWriterContext writerContext, String[] columnNames) { @@ -117,21 +107,6 @@ public class RecordWriter implements Serializable return String.format("%d-%s", taskContext.partitionId(), UUID.randomUUID()); } - private CqlTable cqlTable() - { - if (cqlTable == null) - { - cqlTable = writerContext.bridge() - .buildSchema(writerContext.schema().getTableSchema().createStatement, - writerContext.job().keyspace(), - IGNORED_REPLICATION_FACTOR, - writerContext.cluster().getPartitioner(), - writerContext.schema().getUserDefinedTypeStatements()); - } - - return cqlTable; - } - public WriteResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator) { TaskContext taskContext = taskContextSupplier.get(); @@ -234,14 +209,6 @@ public class RecordWriter implements Serializable } } - public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2) - { - return Stream.concat( - set1.stream().filter(element -> !set2.contains(element)), - set2.stream().filter(element -> !set1.contains(element))) - .collect(Collectors.toSet()); - } - private Map<Range<BigInteger>, List<RingInstance>> taskTokenRangeMapping(TokenRangeMapping<RingInstance> tokenRange, Range<BigInteger> taskTokenRange) { @@ -341,6 +308,14 @@ public class RecordWriter implements Serializable } } + public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2) + { + return Stream.concat( + set1.stream().filter(element -> !set2.contains(element)), + set2.stream().filter(element -> !set1.contains(element))) + .collect(Collectors.toSet()); + } + private void validateAcceptableTimeSkewOrThrow(List<RingInstance> replicas) { if (replicas.isEmpty()) @@ -395,48 +370,16 @@ public class RecordWriter implements Serializable } } - private Map<String, Object> getBindValuesForColumns(Map<String, Object> map, String[] columnNames, Object[] values) + private static Map<String, Object> getBindValuesForColumns(Map<String, Object> map, String[] columnNames, Object[] values) { assert values.length == columnNames.length : "Number of values does not match the number of columns " + values.length + ", " + columnNames.length; for (int i = 0; i < columnNames.length; i++) { - map.put(columnNames[i], maybeConvertUdt(values[i])); + map.put(columnNames[i], values[i]); } return map; } - private Object maybeConvertUdt(Object value) - { - if (value instanceof BridgeUdtValue) - { - BridgeUdtValue udtValue = (BridgeUdtValue) value; - // Depth-first replacement of BridgeUdtValue instances to their appropriate Cql types - for (Map.Entry<String, Object> entry : udtValue.udtMap.entrySet()) - { - if (entry.getValue() instanceof BridgeUdtValue) - { - udtValue.udtMap.put(entry.getKey(), maybeConvertUdt(entry.getValue())); - } - } - return getUdt(udtValue.name).convertForCqlWriter(udtValue.udtMap, writerContext.bridge().getVersion()); - } - return value; - } - - private synchronized CqlField.CqlType getUdt(String udtName) - { - return udtCache.computeIfAbsent(udtName, name -> { - for (CqlField.CqlUdt udt1 : cqlTable().udts()) - { - if (udt1.cqlName().equals(name)) - { - return udt1; - } - } - throw new IllegalArgumentException("Could not find udt with name " + name); - }); - } - /** * Close the {@link RecordWriter#sstableWriter} if present. Schedule a stream session with the produced sstables. * And finally, nullify {@link RecordWriter#sstableWriter} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index addbc11..8c1a35f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -27,7 +27,6 @@ import java.nio.file.Path; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Range; @@ -79,15 +78,13 @@ public class SSTableWriter String packageVersion = getPackageVersion(lowestCassandraVersion); LOGGER.info("Running with version " + packageVersion); - SchemaInfo schema = writerContext.schema(); - TableSchema tableSchema = schema.getTableSchema(); + TableSchema tableSchema = writerContext.schema().getTableSchema(); this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter( CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion), this.outDir.toString(), writerContext.cluster().getPartitioner().toString(), tableSchema.createStatement, tableSchema.modificationStatement, - schema.getUserDefinedTypeStatements(), writerContext.job().sstableDataSizeInMiB()); } @@ -140,9 +137,8 @@ public class SSTableWriter CassandraVersion version = CassandraBridgeFactory.getCassandraVersion(writerContext.cluster().getLowestCassandraVersion()); String keyspace = writerContext.job().keyspace(); String schema = writerContext.schema().getTableSchema().createStatement; - Set<String> udtStatements = writerContext.schema().getUserDefinedTypeStatements(); String directory = getOutDir().toString(); - DataLayer layer = new LocalDataLayer(version, keyspace, schema, udtStatements, directory); + DataLayer layer = new LocalDataLayer(version, keyspace, schema, directory); try (StreamScanner<Rid> scanner = layer.openCompactionScanner(partitionId, Collections.emptyList(), null)) { while (scanner.hasNext()) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java index 77b8f5f..55cace3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java @@ -19,8 +19,6 @@ package org.apache.cassandra.spark.bulkwriter; -import java.util.Set; - import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersionFeatures; @@ -38,7 +36,6 @@ public final class SSTableWriterFactory String partitioner, String createStatement, String insertStatement, - Set<String> userDefinedTypeStatements, int bufferSizeMB) { CassandraBridge cassandraBridge = CassandraBridgeFactory.get(serverVersion); @@ -46,7 +43,6 @@ public final class SSTableWriterFactory partitioner, createStatement, insertStatement, - userDefinedTypeStatements, bufferSizeMB); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java index 0257d29..ca95618 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SchemaInfo.java @@ -20,14 +20,8 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; -import java.util.Set; - -import org.jetbrains.annotations.NotNull; public interface SchemaInfo extends Serializable { TableSchema getTableSchema(); - - @NotNull - Set<String> getUserDefinedTypeStatements(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java index a80c8c6..a19ca21 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java @@ -41,10 +41,8 @@ import com.google.common.net.InetAddresses; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.spark.data.BridgeUdtValue; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.utils.UUIDs; -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import scala.Tuple2; import static org.apache.cassandra.spark.utils.ScalaConversionUtils.asJavaIterable; @@ -52,6 +50,8 @@ import static org.apache.cassandra.spark.utils.ScalaConversionUtils.asJavaIterab @SuppressWarnings("unchecked") public final class SqlToCqlTypeConverter implements Serializable { + private static final Logger LOGGER = LoggerFactory.getLogger(SqlToCqlTypeConverter.class); + public static final String ASCII = "ascii"; public static final String BIGINT = "bigint"; public static final String BLOB = "blob"; @@ -79,7 +79,7 @@ public final class SqlToCqlTypeConverter implements Serializable public static final String UDT = "udt"; public static final String VARCHAR = "varchar"; public static final String VARINT = "varint"; - private static final Logger LOGGER = LoggerFactory.getLogger(SqlToCqlTypeConverter.class); + private static final NoOp<Object> NO_OP_CONVERTER = new NoOp<>(); private static final LongConverter LONG_CONVERTER = new LongConverter(); private static final BytesConverter BYTES_CONVERTER = new BytesConverter(); @@ -164,14 +164,11 @@ public final class SqlToCqlTypeConverter implements Serializable return new MapConverter<>((CqlField.CqlMap) cqlType); case SET: return new SetConverter<>((CqlField.CqlCollection) cqlType); + case UDT: + return NO_OP_CONVERTER; case TUPLE: return NO_OP_CONVERTER; default: - if (cqlType.internalType() == CqlField.CqlType.InternalType.Udt) - { - assert cqlType instanceof CqlField.CqlUdt; - return new UdtConverter((CqlField.CqlUdt) cqlType); - } LOGGER.warn("Unable to match type={}. Defaulting to NoOp Converter", cqlName); return NO_OP_CONVERTER; } @@ -215,12 +212,12 @@ public final class SqlToCqlTypeConverter implements Serializable abstract static class Converter<T> implements Serializable { + public abstract T convertInternal(Object object) throws RuntimeException; + public T convert(Object object) { return convertInternal(object); } - - abstract T convertInternal(Object object); } private abstract static class NullableConverter<T> extends Converter<T> @@ -444,7 +441,7 @@ public final class SqlToCqlTypeConverter implements Serializable * @throws RuntimeException when the object cannot be converted to timestamp */ @Override - public Long convertInternal(Object object) + public Long convertInternal(Object object) throws RuntimeException { if (object instanceof Date) { @@ -482,7 +479,7 @@ public final class SqlToCqlTypeConverter implements Serializable * @throws RuntimeException when the object cannot be converted to timestamp */ @Override - public Date convertInternal(Object object) + public Date convertInternal(Object object) throws RuntimeException { if (object instanceof Date) { @@ -513,6 +510,22 @@ public final class SqlToCqlTypeConverter implements Serializable return "Date"; } + protected int fromDate(Date value) + { + long millisSinceEpoch = value.getTime(); + return fromMillisSinceEpoch(millisSinceEpoch); + } + + protected int fromMillisSinceEpoch(long millisSinceEpoch) + { + // NOTE: This code is lifted from org.apache.cassandra.serializers.SimpleDateSerializer#timeInMillisToDay. + // Reproduced here due to the difficulties of referencing classes from specific versions of Cassandra + // in the SBW. + int result = (int) TimeUnit.MILLISECONDS.toDays(millisSinceEpoch); + result -= Integer.MIN_VALUE; + return result; + } + @Override public Integer convertInternal(Object object) { @@ -529,22 +542,6 @@ public final class SqlToCqlTypeConverter implements Serializable throw new RuntimeException("Unsupported conversion for DATE from " + object.getClass().getTypeName()); } } - - protected int fromDate(Date value) - { - long millisSinceEpoch = value.getTime(); - return fromMillisSinceEpoch(millisSinceEpoch); - } - - protected int fromMillisSinceEpoch(long millisSinceEpoch) - { - // NOTE: This code is lifted from org.apache.cassandra.serializers.SimpleDateSerializer#timeInMillisToDay. - // Reproduced here due to the difficulties of referencing classes from specific versions of Cassandra - // in the SBW. - int result = (int) TimeUnit.MILLISECONDS.toDays(millisSinceEpoch); - result -= Integer.MIN_VALUE; - return result; - } } static class TimeConverter extends NullableConverter<Long> @@ -677,12 +674,6 @@ public final class SqlToCqlTypeConverter implements Serializable } } - @Override - public String toString() - { - return "List"; - } - private List<E> makeList(Iterable<?> iterable) { List<E> list = new ArrayList<>(); @@ -692,6 +683,12 @@ public final class SqlToCqlTypeConverter implements Serializable } return list; } + + @Override + public String toString() + { + return "List"; + } } static class SetConverter<E> extends NullableConverter<Set<E>> @@ -720,12 +717,6 @@ public final class SqlToCqlTypeConverter implements Serializable } } - @Override - public String toString() - { - return "Set<" + innerConverter.toString() + ">"; - } - private Set<E> makeSet(Iterable<?> iterable) { Set<E> set = new HashSet<>(); @@ -735,6 +726,12 @@ public final class SqlToCqlTypeConverter implements Serializable } return set; } + + @Override + public String toString() + { + return "Set<" + innerConverter.toString() + ">"; + } } static class MapConverter<K, V> extends NullableConverter<Map<K, V>> @@ -766,12 +763,6 @@ public final class SqlToCqlTypeConverter implements Serializable throw new RuntimeException("Unsupported conversion for MAP from " + object.getClass().getTypeName()); } - @Override - public String toString() - { - return "Map<" + keyConverter.toString() + ", " + valConverter.toString() + '>'; - } - private Map<K, V> makeMap(Iterable<?> iterable) { Object key; @@ -797,53 +788,11 @@ public final class SqlToCqlTypeConverter implements Serializable } return map; } - } - - public static class UdtConverter extends NullableConverter<BridgeUdtValue> - { - private final String name; - private final HashMap<String, Converter<?>> converters; - - UdtConverter(CqlField.CqlUdt udt) - { - this.name = udt.cqlName(); - this.converters = new HashMap<>(); - for (CqlField f : udt.fields()) - { - converters.put(f.name(), getConverter(f.type())); - } - } - - @Override - public BridgeUdtValue convertInternal(Object object) - { - if (object instanceof GenericRowWithSchema) - { - Map<String, Object> udtMap = makeUdtMap((GenericRowWithSchema) object); - return new BridgeUdtValue(name, udtMap); - } - throw new RuntimeException("Unsupported conversion for UDT from " + object.getClass().getTypeName()); - } @Override public String toString() { - return String.format("UDT[%s]", name); - } - - // Unfortunately, we don't have easy access to the bridge here. - // Rather than trying to create an actual UDTValue here, we will push - // that responsibility down to the SSTableWriter Implementation - private Map<String, Object> makeUdtMap(GenericRowWithSchema row) - { - Map<String, Object> result = new HashMap<>(); - for (String fieldName : row.schema().fieldNames()) - { - Converter<?> converter = converters.get(fieldName); - Object val = row.get(row.fieldIndex(fieldName)); - result.put(fieldName, converter.convert(val)); - } - return result; + return "Map"; } } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenUtils.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenUtils.java index d9b0e67..b0169ae 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenUtils.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenUtils.java @@ -38,7 +38,7 @@ import org.apache.cassandra.spark.data.partitioner.Partitioner; * This reduces number of SSTables that get created in Cassandra by the bulk writing job. * Fewer SSTables will result in lower read latencies and lower compaction overhead. */ -@SuppressWarnings({"WeakerAccess", "rawtypes", "unchecked"}) +@SuppressWarnings("WeakerAccess") public class TokenUtils implements Serializable { private final String[] partitionKeyColumns; @@ -54,6 +54,7 @@ public class TokenUtils implements Serializable this.isMurmur3Partitioner = isMurmur3Partitioner; } + // noinspection unchecked private ByteBuffer getByteBuffer(Object columnValue, int partitionKeyColumnIdx) { ColumnType columnType = partitionKeyColumnTypes[partitionKeyColumnIdx]; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java index 1f79d7e..214d5cc 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java @@ -195,23 +195,6 @@ public class LocalDataLayer extends DataLayer implements Serializable paths); } - public LocalDataLayer(@NotNull CassandraVersion version, - @NotNull String keyspace, - @NotNull String createStatement, - @NotNull Set<String> udtStatements, - String... paths) - { - this(version, - Partitioner.Murmur3Partitioner, - keyspace, - createStatement, - udtStatements, - Collections.emptyList(), - false, - null, - paths); - } - // CHECKSTYLE IGNORE: Constructor with many parameters public LocalDataLayer(@NotNull CassandraVersion version, @NotNull Partitioner partitioner, diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index cf6a6f5..91e7174 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -40,9 +40,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; import o.a.c.sidecar.client.shaded.common.data.TimeSkewResponse; -import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; -import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.Digest; @@ -79,7 +77,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo private ConsistencyLevel.CL consistencyLevel; private int sstableDataSizeInMB = 128; private int sstableWriteBatchSize = 2; - private CassandraBridge bridge = CassandraBridgeFactory.get(CassandraVersion.FOURZERO); @Override public void publish(Map<String, String> stats) @@ -298,12 +295,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return schema; } - @Override - public Set<String> getUserDefinedTypeStatements() - { - return Collections.emptySet(); - } - @Override public Partitioner getPartitioner() { @@ -455,11 +446,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return this; } - public CassandraBridge bridge() - { - return bridge; - } - @Override public boolean quoteIdentifiers() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java index c72bddd..ccfd7e6 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java @@ -52,7 +52,6 @@ import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockListCqlType; import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockMapCqlType; import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockSetCqlType; -import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockUdtCqlType; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -74,7 +73,6 @@ public final class SqlToCqlTypeConverterTest na(mockCqlType(DATE), SqlToCqlTypeConverter.DateConverter.class), na(mockMapCqlType(INT, INT), SqlToCqlTypeConverter.MapConverter.class), na(mockSetCqlType(INET), SqlToCqlTypeConverter.SetConverter.class), - na(mockUdtCqlType("udtType", "f1", TEXT, "f2", INT, "f3", TIMEUUID), SqlToCqlTypeConverter.UdtConverter.class), // Special Cassandra 1.2 Timestamp type should use TimestampConverter na(mockCqlCustom("org.apache.cassandra.db.marshal.DateType"), SqlToCqlTypeConverter.TimestampConverter.class)); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java index fe106bb..0cc628e 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaNormalizeTest.java @@ -34,7 +34,6 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import com.google.common.collect.ImmutableMap; import com.google.common.net.InetAddresses; import org.junit.jupiter.api.Test; @@ -43,13 +42,8 @@ import org.apache.cassandra.spark.common.schema.ColumnTypes; import org.apache.cassandra.spark.common.schema.ListType; import org.apache.cassandra.spark.common.schema.MapType; import org.apache.cassandra.spark.common.schema.SetType; -import org.apache.cassandra.spark.data.BridgeUdtValue; import org.apache.cassandra.spark.data.CqlField; -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import static java.util.AbstractMap.SimpleEntry; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.ASCII; @@ -79,7 +73,6 @@ import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockListCqlType; import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockMapCqlType; import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockSetCqlType; -import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockUdtCqlType; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsEqual.equalTo; @@ -303,24 +296,6 @@ public class TableSchemaNormalizeTest DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.BinaryType)))); } - @Test - public void testUdtNormalization() - { - StructType structType = new StructType() - .add(new StructField("f1", DataTypes.IntegerType, false, Metadata.empty())) - .add(new StructField("f2", DataTypes.StringType, false, Metadata.empty())); - - GenericRowWithSchema source = new GenericRowWithSchema(new Object[]{1, "course"}, structType); - // NOTE: UDT Types carry their type name around, so the use of `udt_field` consistently here is a bit - // "wrong" for the real-world, but is tested by integration tests elsewhere and is correct for the way - // the asserts in this test work. - BridgeUdtValue udtValue = new BridgeUdtValue("udt_field", ImmutableMap.of("f1", 1, "f2", "course")); - - CqlField.CqlUdt cqlType = mockUdtCqlType("udt_field", "f1", INT, "f2", TEXT); - assertNormalized("udt_field", cqlType, new MapType<>(ColumnTypes.STRING, new ListType<>(ColumnTypes.BYTES)), - source, udtValue, structType); - } - private void assertNormalized(String field, CqlField.CqlType cqlType, ColumnType<?> columnType, @@ -334,7 +309,6 @@ public class TableSchemaNormalizeTest TableSchema schema = buildSchema(fieldNames, sparkTypes, new CqlField.CqlType[]{cqlType}, fieldNames, cqlTypes, fieldNames); Object[] source = new Object[]{sourceVal}; Object[] expected = new Object[]{expectedVal}; - Object[] normalized = schema.normalize(source); - assertThat(normalized, is(equalTo(expected))); + assertThat(schema.normalize(source), is(equalTo(expected))); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java index 83564f5..5cb184f 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java @@ -159,8 +159,7 @@ public class TableSchemaTest TableSchema schema = getValidSchemaBuilder() .build(); - assertThat(schema.normalize(new Object[]{1, 1L, "foo", 2}), - is(equalTo(new Object[]{1, -2147483648, "foo", 2}))); + assertThat(schema.normalize(new Object[]{1, 1L, "foo", 2}), is(equalTo(new Object[]{1, -2147483648, "foo", 2}))); } @Test diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index 8dbc5fc..ef937ce 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -19,9 +19,7 @@ package org.apache.cassandra.spark.bulkwriter; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -45,7 +43,6 @@ import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.CUSTOM import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.LIST; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.MAP; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.SET; -import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.UDT; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,34 +124,6 @@ public final class TableSchemaTestCommon return mock; } - @NotNull - public static CqlField.CqlUdt mockUdtCqlType(String name, String... namesAndTypes) - { - assert namesAndTypes.length > 0 && (namesAndTypes.length % 2) == 0; - HashMap<String, CqlField> udtDef = new HashMap<>(); - CqlField.CqlUdt udtMock = mock(CqlField.CqlUdt.class); - when(udtMock.cqlName()).thenReturn(name); - when(udtMock.internalType()).thenReturn(CqlField.CqlType.InternalType.Udt); - when(udtMock.name()).thenReturn(UDT); - List<CqlField> fields = new ArrayList<>(); - for (int i = 0; i < namesAndTypes.length; i += 2) - { - String field = namesAndTypes[i]; - String type = namesAndTypes[i + 1]; - CqlField mock = mock(CqlField.class); - when(mock.name()).thenReturn(field); - when(mock.cqlTypeName()).thenReturn(type); - CqlField.CqlType fieldType = mockCqlType(type); - when(mock.type()).thenReturn(fieldType); - udtDef.put(field, mock); - when(udtMock.field(i / 2)).thenReturn(mock); - when(udtMock.field(field)).thenReturn(mock); - fields.add(mock); - } - when(udtMock.fields()).thenReturn(fields); - return udtMock; - } - public static TableSchema buildSchema(String[] fieldNames, org.apache.spark.sql.types.DataType[] sparkTypes, CqlField.CqlType[] driverTypes, diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java index 267c175..028b436 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/SharedClusterIntegrationTestBase.java @@ -54,8 +54,6 @@ import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.distributed.UpgradeableCluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.Feature; -import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.impl.AbstractCluster; @@ -83,10 +81,6 @@ import org.apache.cassandra.testing.TestUtils; import org.apache.cassandra.testing.TestVersion; import org.apache.cassandra.testing.TestVersionSupplier; import org.apache.cassandra.utils.Throwables; -import shaded.com.datastax.driver.core.Cluster; -import shaded.com.datastax.driver.core.ResultSet; -import shaded.com.datastax.driver.core.Session; -import shaded.com.datastax.driver.core.SimpleStatement; import static org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext.tryGetIntConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -136,7 +130,6 @@ public abstract class SharedClusterIntegrationTestBase protected AbstractCluster<? extends IInstance> cluster; protected Server server; protected Injector injector; - protected TestVersion testVersion; static { @@ -147,11 +140,10 @@ public abstract class SharedClusterIntegrationTestBase @BeforeAll protected void setup() throws InterruptedException, IOException { - Optional<TestVersion> maybeTestVersion = TestVersionSupplier.testVersions().findFirst(); - assertThat(maybeTestVersion).isPresent(); - this.testVersion = maybeTestVersion.get(); + Optional<TestVersion> testVersion = TestVersionSupplier.testVersions().findFirst(); + assertThat(testVersion).isPresent(); logger.info("Testing with version={}", testVersion); - cluster = provisionClusterWithRetries(this.testVersion); + cluster = provisionClusterWithRetries(testVersion.get()); assertThat(cluster).isNotNull(); initializeSchemaForTest(); startSidecar(cluster); @@ -354,45 +346,6 @@ public abstract class SharedClusterIntegrationTestBase return cluster.coordinator(1).execute(String.format("SELECT * FROM %s;", table), consistencyLevel); } - /** - * Convenience method to query all data from the provided {@code table} at the specified consistency level. - * - * @param table the qualified Cassandra table name - * @param consistency - * @return all the data queried from the table - */ - protected ResultSet queryAllDataWithDriver(ICluster cluster, QualifiedName table, shaded.com.datastax.driver.core.ConsistencyLevel consistency) - { - Cluster driverCluster = createDriverCluster(cluster); - Session session = driverCluster.connect(); - SimpleStatement statement = new SimpleStatement(String.format("SELECT * FROM %s;", table)); - statement.setConsistencyLevel(consistency); - return session.execute(statement); - } - - public static Cluster createDriverCluster(ICluster<? extends IInstance> dtest) - { - if (dtest.size() == 0) - { - throw new IllegalArgumentException("Attempted to open java driver for empty cluster"); - } - else - { - dtest.stream().forEach((i) -> { - if (!i.config().has(Feature.NATIVE_PROTOCOL) || !i.config().has(Feature.GOSSIP)) - { - throw new IllegalStateException("java driver requires Feature.NATIVE_PROTOCOL and Feature.GOSSIP; but one or more is missing"); - } - }); - Cluster.Builder builder = Cluster.builder(); - dtest.stream().forEach((i) -> { - builder.addContactPointsWithPorts(new InetSocketAddress(i.broadcastAddress().getAddress(), i.config().getInt("native_transport_port"))); - }); - - return builder.build(); - } - } - static class IntegrationTestModule extends AbstractModule { private final AbstractCluster<? extends IInstance> cluster; diff --git a/cassandra-analytics-integration-tests/build.gradle b/cassandra-analytics-integration-tests/build.gradle index 38cad7e..711e09d 100644 --- a/cassandra-analytics-integration-tests/build.gradle +++ b/cassandra-analytics-integration-tests/build.gradle @@ -34,7 +34,6 @@ project(':cassandra-analytics-integration-tests') { } dependencies { - testImplementation(project(':cassandra-bridge')) testImplementation(project(':cassandra-analytics-core')) testImplementation(group: 'net.java.dev.jna', name: 'jna', version: '5.9.0') diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java deleted file mode 100644 index f8e7816..0000000 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.analytics; - -import java.io.IOException; - -import org.junit.jupiter.api.Test; - -import com.vdurmont.semver4j.Semver; -import org.apache.cassandra.distributed.UpgradeableCluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.api.Feature; -import org.apache.cassandra.distributed.api.SimpleQueryResult; -import org.apache.cassandra.distributed.api.TokenSupplier; -import org.apache.cassandra.distributed.shared.Versions; -import org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate; -import org.apache.cassandra.sidecar.testing.QualifiedName; -import org.apache.cassandra.testing.TestVersion; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; - -import static org.apache.cassandra.testing.CassandraTestTemplate.fixDistributedSchemas; -import static org.apache.cassandra.testing.CassandraTestTemplate.waitForHealthyRing; -import static org.apache.cassandra.testing.TestUtils.DC1_RF3; -import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; -import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; -import static org.assertj.core.api.Assertions.assertThat; - -class BulkWriteUdtTest extends SharedClusterSparkIntegrationTestBase -{ - static final QualifiedName UDT_TABLE_NAME = new QualifiedName(TEST_KEYSPACE, "test_udt"); - static final QualifiedName NESTED_TABLE_NAME = new QualifiedName(TEST_KEYSPACE, "test_nested_udt"); - public static final String TWO_FIELD_UDT_NAME = "two_field_udt"; - public static final String NESTED_FIELD_UDT_NAME = "nested_udt"; - public static final String UDT_TABLE_CREATE = "CREATE TABLE " + UDT_TABLE_NAME + " (\n" - + " id BIGINT PRIMARY KEY,\n" - + " udtfield " + TWO_FIELD_UDT_NAME + ");"; - public static final String TWO_FIELD_UDT_DEF = "CREATE TYPE " + UDT_TABLE_NAME.keyspace() + "." - + TWO_FIELD_UDT_NAME + " (\n" - + " f1 text,\n" - + " f2 int);"; - public static final String NESTED_UDT_DEF = "CREATE TYPE " + NESTED_TABLE_NAME.keyspace() + "." - + NESTED_FIELD_UDT_NAME + " (\n" - + " n1 BIGINT,\n" - + " n2 frozen<" + TWO_FIELD_UDT_NAME + ">" - + ");"; - public static final String NESTED_TABLE_CREATE = "CREATE TABLE " + NESTED_TABLE_NAME + "(\n" - + " id BIGINT PRIMARY KEY,\n" - + " nested " + NESTED_FIELD_UDT_NAME + ");"; - @Test - void testWriteWithUdt() - { - SparkSession spark = getOrCreateSparkSession(); - Dataset<Row> df = DataGenerationUtils.generateUdtData(spark, ROW_COUNT); - - bulkWriterDataFrameWriter(df, UDT_TABLE_NAME).save(); - - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT * FROM " + UDT_TABLE_NAME, ConsistencyLevel.ALL); - assertThat(result.hasNext()).isTrue(); - validateWritesWithDriverResultSet(df.collectAsList(), - queryAllDataWithDriver(cluster, UDT_TABLE_NAME, - shaded.com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM), - BulkWriteUdtTest::defaultRowFormatter); - } - - @Test - void testWriteWithNestedUdt() - { - SparkSession spark = getOrCreateSparkSession(); - Dataset<Row> df = DataGenerationUtils.generateNestedUdtData(spark, ROW_COUNT); - - bulkWriterDataFrameWriter(df, NESTED_TABLE_NAME).save(); - - SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT * FROM " + NESTED_TABLE_NAME, ConsistencyLevel.ALL); - assertThat(result.hasNext()).isTrue(); - validateWritesWithDriverResultSet(df.collectAsList(), - queryAllDataWithDriver(cluster, NESTED_TABLE_NAME, shaded.com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM), - BulkWriteUdtTest::defaultRowFormatter); - } - - @NotNull - public static String defaultRowFormatter(shaded.com.datastax.driver.core.Row row) - { - return row.getLong(0) + - ":" + - row.getUDTValue(1); // Formats as field:value with no whitespaces, and strings quoted - } - - @Override - protected UpgradeableCluster provisionCluster(TestVersion testVersion) throws IOException - { - // spin up a C* cluster using the in-jvm dtest - Versions versions = Versions.find(); - Versions.Version requestedVersion = versions.getLatest(new Semver(testVersion.version(), Semver.SemverType.LOOSE)); - - UpgradeableCluster.Builder clusterBuilder = - UpgradeableCluster.build(3) - .withDynamicPortAllocation(true) - .withVersion(requestedVersion) - .withDCs(1) - .withDataDirCount(1) - .withSharedClasses(JvmDTestSharedClassesPredicate.INSTANCE) - .withConfig(config -> config.with(Feature.NATIVE_PROTOCOL) - .with(Feature.GOSSIP) - .with(Feature.JMX)); - TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(3, clusterBuilder.getTokenCount()); - clusterBuilder.withTokenSupplier(tokenSupplier); - UpgradeableCluster cluster = clusterBuilder.createWithoutStarting(); - cluster.startup(); - - waitForHealthyRing(cluster); - fixDistributedSchemas(cluster); - return cluster; - } - - @Override - protected void initializeSchemaForTest() - { - createTestKeyspace(UDT_TABLE_NAME, DC1_RF3); - - cluster.schemaChangeIgnoringStoppedInstances(TWO_FIELD_UDT_DEF); - cluster.schemaChangeIgnoringStoppedInstances(NESTED_UDT_DEF); - cluster.schemaChangeIgnoringStoppedInstances(UDT_TABLE_CREATE); - cluster.schemaChangeIgnoringStoppedInstances(NESTED_TABLE_CREATE); - } -} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java index 3c1897d..a0882c5 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java @@ -31,14 +31,11 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; -import static org.apache.spark.sql.types.DataTypes.createStructType; /** * Utilities for data generation used for tests @@ -52,7 +49,6 @@ public final class DataGenerationUtils /** * Generates course data with schema - * Does not generate a User Defined Field * * <pre> * id integer, @@ -65,48 +61,18 @@ public final class DataGenerationUtils * @return a {@link Dataset} with generated data */ public static Dataset<Row> generateCourseData(SparkSession spark, int rowCount) - { - return generateCourseData(spark, rowCount, false); - } - - /** - * Generates course data with schema - * - * <pre> - * id integer, - * course string, - * marks integer - * </pre> - * - * @param spark the spark session to use - * @param rowCount the number of records to generate - * @param udfData if a field representing a User Defined Type should be added - * @return a {@link Dataset} with generated data - */ - public static Dataset<Row> generateCourseData(SparkSession spark, int rowCount, boolean udfData) { SQLContext sql = spark.sqlContext(); StructType schema = new StructType() .add("id", IntegerType, false) .add("course", StringType, false) .add("marks", IntegerType, false); - if (udfData) - { - StructType udfType = new StructType() - .add("TimE", IntegerType, false) - .add("limit", IntegerType, false); - schema = schema.add("User_Defined_Type", udfType); - } List<Row> rows = IntStream.range(0, rowCount) .mapToObj(recordNum -> { String course = "course" + recordNum; - if (!udfData) - { - return RowFactory.create(recordNum, course, recordNum); - } - return RowFactory.create(recordNum, course, recordNum, - RowFactory.create(recordNum, recordNum)); + Object[] values = {recordNum, course, recordNum}; + return RowFactory.create(values); }).collect(Collectors.toList()); return sql.createDataFrame(rows, schema); } @@ -149,45 +115,6 @@ public final class DataGenerationUtils return sql.createDataFrame(rows, schema); } - public static Dataset<Row> generateUdtData(SparkSession spark, int rowCount) - { - SQLContext sql = spark.sqlContext(); - StructType udtType = createStructType(new StructField[]{new StructField("f1", StringType, false, Metadata.empty()), - new StructField("f2", IntegerType, false, Metadata.empty())}); - StructType schema = new StructType() - .add("id", IntegerType, false) - .add("udtfield", udtType, false); - - List<Row> rows = IntStream.range(0, rowCount) - .mapToObj(id -> { - String course = "course" + id; - Object[] values = {id, RowFactory.create(course, id)}; - return RowFactory.create(values); - }).collect(Collectors.toList()); - return sql.createDataFrame(rows, schema); - } - - public static Dataset<Row> generateNestedUdtData(SparkSession spark, int rowCount) - { - SQLContext sql = spark.sqlContext(); - StructType udtType = createStructType(new StructField[]{new StructField("f1", StringType, false, Metadata.empty()), - new StructField("f2", IntegerType, false, Metadata.empty())}); - StructType nestedType = createStructType(new StructField[] {new StructField("n1", IntegerType, false, Metadata.empty()), - new StructField("n2", udtType, false, Metadata.empty())}); - StructType schema = new StructType() - .add("id", IntegerType, false) - .add("nested", nestedType, false); - - List<Row> rows = IntStream.range(0, rowCount) - .mapToObj(id -> { - String course = "course" + id; - Row innerUdt = RowFactory.create(id, RowFactory.create(course, id)); - Object[] values = {id, innerUdt}; - return RowFactory.create(values); - }).collect(Collectors.toList()); - return sql.createDataFrame(rows, schema); - } - private static String dupString(String string, Integer times) { byte[] stringBytes = string.getBytes(); diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java index 7cd47dd..2650cd7 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java @@ -40,10 +40,9 @@ import org.apache.cassandra.testing.TestVersion; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.jetbrains.annotations.NotNull; -import shaded.com.datastax.driver.core.ConsistencyLevel; import static org.apache.cassandra.analytics.DataGenerationUtils.generateCourseData; +import static org.apache.cassandra.analytics.SparkTestUtils.validateWrites; import static org.apache.cassandra.testing.TestUtils.DC1_RF1; import static org.apache.cassandra.testing.TestUtils.ROW_COUNT; import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; @@ -57,13 +56,11 @@ import static org.apache.cassandra.testing.TestUtils.uniqueTestTableFullName; */ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase { - static final QualifiedName TABLE_NAME_FOR_UDT_TEST = uniqueTestTableFullName("QuOtEd_KeYsPaCe", "QuOtEd_TaBlE"); static final List<QualifiedName> TABLE_NAMES = Arrays.asList(uniqueTestTableFullName("QuOtEd_KeYsPaCe"), uniqueTestTableFullName("keyspace"), // keyspace is a reserved word uniqueTestTableFullName(TEST_KEYSPACE, "QuOtEd_TaBlE"), - new QualifiedName(TEST_KEYSPACE, "table"), // table is a reserved word - TABLE_NAME_FOR_UDT_TEST); + new QualifiedName(TEST_KEYSPACE, "table")); // table is a reserved word @ParameterizedTest(name = "{index} => table={0}") @MethodSource("testInputs") @@ -72,51 +69,12 @@ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase SparkSession spark = getOrCreateSparkSession(); // Generates course data from and renames the dataframe columns to use case-sensitive and reserved // words in the dataframe - boolean udfData = tableName.equals(TABLE_NAME_FOR_UDT_TEST); - Dataset<Row> df; - Dataset<Row> generatedDf = generateCourseData(spark, ROW_COUNT, udfData); - if (!udfData) - { - df = generatedDf.toDF("IdEnTiFiEr", // case-sensitive struct - "course", - "limit"); // limit is a reserved word in Cassandra - } - else - { - df = generatedDf.toDF("IdEnTiFiEr", // case-sensitive struct - "course", - "limit", // limit is a reserved word in Cassandra - "User_Defined_Type"); - } + Dataset<Row> df = generateCourseData(spark, ROW_COUNT).toDF("IdEnTiFiEr", // case-sensitive struct + "course", + "limit"); // limit is a reserved word in Cassandra bulkWriterDataFrameWriter(df, tableName).option(WriterOptions.QUOTE_IDENTIFIERS.name(), "true") .save(); - validateWritesWithDriverResultSet(df.collectAsList(), - queryAllDataWithDriver(cluster, tableName, - ConsistencyLevel.LOCAL_QUORUM), - udfData ? - QuoteIdentifiersWriteTest::rowWithUdtFormatter : - QuoteIdentifiersWriteTest::defaultRowFormatter); - } - - public static String defaultRowFormatter(shaded.com.datastax.driver.core.Row row) - { - return row.getInt("IdEnTiFiEr") + - ":'" + - row.getString("course") + - "':" + - row.getInt("limit"); - } - - @NotNull - private static String rowWithUdtFormatter(shaded.com.datastax.driver.core.Row row) - { - return row.getInt("IdEnTiFiEr") + - ":'" + - row.getString("course") + - "':" + - row.getInt("limit") + - ":" + - row.getUDTValue("User_Defined_Type"); + validateWrites(df.collectAsList(), queryAllData(tableName)); } static Stream<Arguments> testInputs() @@ -157,22 +115,7 @@ class QuoteIdentifiersWriteTest extends SharedClusterSparkIntegrationTestBase TABLE_NAMES.forEach(name -> { createTestKeyspace(name, DC1_RF1); - if (!name.equals(TABLE_NAME_FOR_UDT_TEST)) - { - createTestTable(name, createTableStatement); - } + createTestTable(name, createTableStatement); }); - - // Create UDT - String createUdtQuery = "CREATE TYPE " + TABLE_NAME_FOR_UDT_TEST.maybeQuotedKeyspace() - + ".\"UdT1\" (\"TimE\" bigint, \"limit\" int);"; - cluster.schemaChangeIgnoringStoppedInstances(createUdtQuery); - - createTestTable(TABLE_NAME_FOR_UDT_TEST, "CREATE TABLE IF NOT EXISTS %s (" + - "\"IdEnTiFiEr\" int, " + - "course text, " + - "\"limit\" int," + - "\"User_Defined_Type\" frozen<\"UdT1\">, " + - "PRIMARY KEY(\"IdEnTiFiEr\"));"); } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java index b0807d8..699f705 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java @@ -20,10 +20,6 @@ package org.apache.cassandra.analytics; import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -31,10 +27,7 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.extension.ExtendWith; -import com.vdurmont.semver4j.Semver; import io.vertx.junit5.VertxExtension; -import org.apache.cassandra.bridge.CassandraBridge; -import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.sidecar.testing.QualifiedName; import org.apache.cassandra.sidecar.testing.SharedClusterIntegrationTestBase; @@ -44,13 +37,10 @@ import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.types.StructField; -import shaded.com.datastax.driver.core.ResultSet; import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkReaderDataFrame; import static org.apache.cassandra.analytics.SparkTestUtils.defaultBulkWriterDataFrameWriter; import static org.apache.cassandra.analytics.SparkTestUtils.defaultSparkConf; -import static org.assertj.core.api.Assertions.assertThat; /** * Extends functionality from {@link SharedClusterIntegrationTestBase} and provides additional functionality for running @@ -62,23 +52,6 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste { protected SparkConf sparkConf; protected SparkSession sparkSession; - protected CassandraBridge bridge; - - public void validateWritesWithDriverResultSet(List<Row> sourceData, ResultSet queriedData, - Function<shaded.com.datastax.driver.core.Row, String> rowFormatter) - { - Set<String> actualEntries = new HashSet<>(); - queriedData.forEach(row -> actualEntries.add(rowFormatter.apply(row))); - - // Number of entries in Cassandra must match the original datasource - assertThat(actualEntries.size()).isEqualTo(sourceData.size()); - - // remove from actual entries to make sure that the data read is the same as the data written - Set<String> sourceEntries = sourceData.stream().map(this::getFormattedSourceEntry) - .collect(Collectors.toSet()); - assertThat(actualEntries).as("All entries are expected to be read from database") - .containsExactlyInAnyOrderElementsOf(sourceEntries); - } /** * A preconfigured {@link DataFrameReader} with pre-populated required options that can be overridden @@ -149,65 +122,4 @@ public abstract class SharedClusterSparkIntegrationTestBase extends SharedCluste } return sparkSession; } - - protected CassandraBridge getOrCreateBridge() - { - if (bridge == null) - { - Semver semVer = new Semver(testVersion.version(), - Semver.SemverType.LOOSE); - bridge = CassandraBridgeFactory.get(semVer.toStrict().toString()); - } - return bridge; - } - - private String getFormattedSourceEntry(Row row) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < row.size(); i++) - { - maybeFormatUdt(sb, row.get(i)); - if (i != (row.size() - 1)) - { - sb.append(":"); - } - } - return sb.toString(); - } - - // Format a Spark row to look like what the toString on a UDT looks like - // Unfortunately not _quite_ json, so we need to do this manually. - protected void maybeFormatUdt(StringBuilder sb, Object o) - { - if (o instanceof Row) - { - Row r = (Row) o; - sb.append("{"); - StructField[] fields = r.schema().fields(); - for (int i = 0; i < r.size(); i++) - { - sb.append(maybeQuoteFieldName(fields[i])); - sb.append(":"); - maybeFormatUdt(sb, r.get(i)); - if (i != r.size() - 1) - { - sb.append(','); - } - } - sb.append("}"); - } - else if (o instanceof String) - { - sb.append(String.format("'%s'", o)); - } - else - { - sb.append(String.format("%s", o)); - } - } - - protected String maybeQuoteFieldName(StructField fields) - { - return getOrCreateBridge().maybeQuoteIdentifier(fields.name()); - } } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index b234065..c89f2cd 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -388,7 +388,6 @@ public abstract class CassandraBridge String partitioner, String createStatement, String insertStatement, - Set<String> userDefinedTypeStatements, int bufferSizeMB); public interface IRow diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BridgeUdtValue.java b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BridgeUdtValue.java deleted file mode 100644 index a6d1215..0000000 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BridgeUdtValue.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.spark.data; - -import java.io.Serializable; -import java.util.Map; -import java.util.Objects; - -/** - * The BridgeUdtValue class exists because the Cassandra values produced (UDTValue) are not serializable - * because they come from the classloader inside the bridge, and therefore can't be passed around - * from one Spark phase to another. Therefore, we build a map of these instances (potentially nested) - * and return them from the conversion stage for later use when the writer actually writes them. - */ -public class BridgeUdtValue implements Serializable -{ - public final String name; - public final Map<String, Object> udtMap; - - public BridgeUdtValue(String name, Map<String, Object> valueMap) - { - this.name = name; - this.udtMap = valueMap; - } - - public boolean equals(Object o) - { - if (this == o) - { - return true; - } - if (o == null || getClass() != o.getClass()) - { - return false; - } - BridgeUdtValue udtValue = (BridgeUdtValue) o; - return Objects.equals(name, udtValue.name) && Objects.equals(udtMap, udtValue.udtMap); - } - - public int hashCode() - { - return Objects.hash(name, udtMap); - } - - public String toString() - { - return "BridgeUdtValue{" + - "name='" + name + '\'' + - ", udtMap=" + udtMap + - '}'; - } -} diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 0aa576e..56e6d46 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -592,11 +592,9 @@ public class CassandraBridgeImplementation extends CassandraBridge String partitioner, String createStatement, String insertStatement, - @NotNull Set<String> userDefinedTypeStatements, int bufferSizeMB) { - return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, - userDefinedTypeStatements, bufferSizeMB); + return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, bufferSizeMB); } // Version-Specific Test Utility Methods diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java index fdc9cab..0a7ecde 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java @@ -21,7 +21,6 @@ package org.apache.cassandra.bridge; import java.io.IOException; import java.util.Map; -import java.util.Set; import com.google.common.annotations.VisibleForTesting; @@ -31,7 +30,6 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.CQLSSTableWriter; -import org.jetbrains.annotations.NotNull; public class SSTableWriterImplementation implements SSTableWriter { @@ -46,7 +44,6 @@ public class SSTableWriterImplementation implements SSTableWriter String partitioner, String createStatement, String insertStatement, - @NotNull Set<String> userDefinedTypeStatements, int bufferSizeMB) { IPartitioner cassPartitioner = partitioner.toLowerCase().contains("random") ? new RandomPartitioner() @@ -56,7 +53,6 @@ public class SSTableWriterImplementation implements SSTableWriter createStatement, insertStatement, bufferSizeMB, - userDefinedTypeStatements, cassPartitioner); writer = builder.build(); } @@ -85,23 +81,17 @@ public class SSTableWriterImplementation implements SSTableWriter String createStatement, String insertStatement, int bufferSizeMB, - Set<String> udts, IPartitioner cassPartitioner) { - CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder(); - - for (String udt : udts) - { - builder.withType(udt); - } - - return builder.inDirectory(inDirectory) - .forTable(createStatement) - .withPartitioner(cassPartitioner) - .using(insertStatement) - // The data frame to write is always sorted, - // see org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation.insert - .sorted() - .withMaxSSTableSizeInMiB(bufferSizeMB); + return CQLSSTableWriter + .builder() + .inDirectory(inDirectory) + .forTable(createStatement) + .withPartitioner(cassPartitioner) + .using(insertStatement) + // The data frame to write is always sorted, + // see org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation.insert + .sorted() + .withMaxSSTableSizeInMiB(bufferSizeMB); } } diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java index 6000af0..e97ce6c 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java @@ -162,10 +162,6 @@ public class CqlUdt extends CqlType implements CqlField.CqlUdt @Override public Object convertForCqlWriter(Object value, CassandraVersion version) { - if (value instanceof UDTValue) - { - return value; - } return toUserTypeValue(version, this, value); } diff --git a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java index 589ee4e..e5e5846 100644 --- a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java +++ b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java @@ -22,7 +22,6 @@ package org.apache.cassandra.bridge; import java.io.File; import java.lang.reflect.Field; import java.util.Arrays; -import java.util.HashSet; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -52,7 +51,6 @@ class SSTableWriterImplementationTest CREATE_STATEMENT, INSERT_STATEMENT, 250, - new HashSet<>(), new Murmur3Partitioner()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org