This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9523a38 CASSANDRA-18605 Adding support for TTL & Timestamps for bulk
writes
9523a38 is described below
commit 9523a38b3f1b5bc4313e2949896ddc1fff58afbe
Author: jkonisa <[email protected]>
AuthorDate: Thu Jun 15 13:31:01 2023 -0700
CASSANDRA-18605 Adding support for TTL & Timestamps for bulk writes
This commit introduces a new feature in Spark Bulk Writer to support writes
with
constant/per_row based TTL & Timestamps.
Patch by Jyothsna Konisa; Reviewed by Dinesh Joshi, Francisco Guerrero,
Yifan Cai for CASSANDRA-18605
---
CHANGES.txt | 3 +-
.../spark/example/SampleCassandraJob.java | 54 +++++++--
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 14 +++
.../bulkwriter/CassandraBulkSourceRelation.java | 26 ++---
.../bulkwriter/CassandraBulkWriterContext.java | 2 +-
.../cassandra/spark/bulkwriter/RecordWriter.java | 27 ++++-
.../cassandra/spark/bulkwriter/SSTableWriter.java | 5 +-
.../spark/bulkwriter/SqlToCqlTypeConverter.java | 10 ++
.../cassandra/spark/bulkwriter/TTLOption.java | 127 +++++++++++++++++++++
.../cassandra/spark/bulkwriter/TableSchema.java | 83 +++++++++++---
.../spark/bulkwriter/TimestampOption.java | 125 ++++++++++++++++++++
.../cassandra/spark/bulkwriter/WriterOptions.java | 4 +-
.../spark/bulkwriter/MockBulkWriterContext.java | 2 +
.../spark/bulkwriter/MockTableWriter.java | 5 +-
.../spark/bulkwriter/RecordWriterTest.java | 103 ++++++++++++++---
.../spark/bulkwriter/SSTableWriterTest.java | 3 +-
.../bulkwriter/StreamSessionConsistencyTest.java | 8 +-
.../spark/bulkwriter/StreamSessionTest.java | 24 ++--
.../spark/bulkwriter/TableSchemaTest.java | 39 ++++++-
.../spark/bulkwriter/TableSchemaTestCommon.java | 25 +++-
.../org/apache/cassandra/bridge/SSTableWriter.java | 3 +-
.../bridge/SSTableWriterImplementation.java | 3 +-
22 files changed, 600 insertions(+), 95 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3632517..eeaee1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
* Add circleci configuration yaml for Cassandra Analytics (CASSANDRA-18578)
* Provide a SecretsProvider interface to abstract the secret provisioning
(CASSANDRA-18545)
- * Add the .asf.yaml file (CASSANDRA-18548)
\ No newline at end of file
+ * Add the .asf.yaml file (CASSANDRA-18548)
diff --git
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
index 4e21749..58c9827 100644
---
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
+++
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
@@ -28,6 +28,9 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.TimestampOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
/**
@@ -122,15 +126,12 @@ public final class SampleCassandraJob
private static Dataset<Row> write(long rowCount, SparkConf sparkConf,
SQLContext sql, SparkContext sc)
{
- StructType schema = new StructType()
- .add("id", LongType, false)
- .add("course", BinaryType, false)
- .add("marks", LongType, false);
-
JavaSparkContext javaSparkContext =
JavaSparkContext.fromSparkContext(sc);
int parallelism = sc.defaultParallelism();
- JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount,
parallelism);
- Dataset<Row> df = sql.createDataFrame(rows, schema);
+ boolean addTTLColumn = true;
+ boolean addTimestampColumn = true;
+ JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount,
parallelism, addTTLColumn, addTimestampColumn);
+ Dataset<Row> df = sql.createDataFrame(rows,
getWriteSchema(addTTLColumn, addTimestampColumn));
df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
@@ -140,6 +141,11 @@ public final class SampleCassandraJob
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("number_splits", "-1")
+ // A constant timestamp and TTL can be used by setting the following
options.
+ // .option(WriterOptions.TIMESTAMP.name(),
TimestampOption.constant(System.currentTimeMillis() * 1000))
+ // .option(WriterOptions.TTL.name(), TTLOption.constant(20))
+ .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"))
+ .option(WriterOptions.TIMESTAMP.name(),
TimestampOption.perRow("timestamp"))
.mode("append")
.save();
return df;
@@ -174,6 +180,23 @@ public final class SampleCassandraJob
return df;
}
+ private static StructType getWriteSchema(boolean addTTLColumn, boolean
addTimestampColumn)
+ {
+ StructType schema = new StructType()
+ .add("id", LongType, false)
+ .add("course", BinaryType, false)
+ .add("marks", LongType, false);
+ if (addTTLColumn)
+ {
+ schema = schema.add("ttl", IntegerType, false);
+ }
+ if (addTimestampColumn)
+ {
+ schema = schema.add("timestamp", LongType, false);
+ }
+ return schema;
+ }
+
private static void checkSmallDataFrameEquality(Dataset<Row> expected,
Dataset<Row> actual)
{
if (actual == null)
@@ -186,11 +209,14 @@ public final class SampleCassandraJob
}
}
- private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records,
Integer parallelism)
+ private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records,
Integer parallelism,
+ boolean addTTLColumn, boolean
addTimestampColumn)
{
long recordsPerPartition = records / parallelism;
long remainder = records - (recordsPerPartition * parallelism);
List<Integer> seq = IntStream.range(0,
parallelism).boxed().collect(Collectors.toList());
+ int ttl = 10;
+ long timeStamp = System.currentTimeMillis() * 1000;
JavaRDD<Row> dataset = sc.parallelize(seq,
parallelism).mapPartitionsWithIndex(
(Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index,
integerIterator) -> {
long firstRecordNumber = index * recordsPerPartition;
@@ -201,6 +227,18 @@ public final class SampleCassandraJob
Integer courseNameStringLen =
courseNameString.length();
Integer courseNameMultiplier = 1000 /
courseNameStringLen;
byte[] courseName = dupStringAsBytes(courseNameString,
courseNameMultiplier);
+ if (addTTLColumn && addTimestampColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, ttl, timeStamp);
+ }
+ if (addTTLColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, ttl);
+ }
+ if (addTimestampColumn)
+ {
+ return RowFactory.create(recordNumber, courseName,
recordNumber, timeStamp);
+ }
return RowFactory.create(recordNumber, courseName,
recordNumber);
}).iterator();
return rows;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 292282f..ee63df9 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -119,6 +119,8 @@ public class BulkSparkConf implements Serializable
protected final String truststorePath;
protected final String truststoreBase64Encoded;
protected final String truststoreType;
+ protected final String ttl;
+ protected final String timestamp;
protected final SparkConf conf;
public final boolean validateSSTables;
public final int commitThreadsPerInstance;
@@ -157,6 +159,8 @@ public class BulkSparkConf implements Serializable
// else fall back to props, and then default if neither specified
this.useOpenSsl = getBoolean(USE_OPENSSL, true);
this.ringRetryCount = getInt(RING_RETRY_COUNT,
DEFAULT_RING_RETRY_COUNT);
+ this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(),
null);
+ this.timestamp = MapUtils.getOrDefault(options,
WriterOptions.TIMESTAMP.name(), null);
validateEnvironment();
}
@@ -241,6 +245,16 @@ public class BulkSparkConf implements Serializable
return truststorePath;
}
+ protected TTLOption getTTLOptions()
+ {
+ return TTLOption.from(ttl);
+ }
+
+ protected TimestampOption getTimestampOptions()
+ {
+ return TimestampOption.from(timestamp);
+ }
+
protected String getTruststoreBase64Encoded()
{
return truststoreBase64Encoded;
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 15d4371..8f2c0b5 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -19,8 +19,9 @@
package org.apache.cassandra.spark.bulkwriter;
-import java.io.Serializable;
+import java.util.Iterator;
+import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +103,7 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
.map(tableSchema::normalize)
.keyBy(tokenizer::getDecoratedKey)
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
- persist(sortedRDD);
+ persist(sortedRDD, data.columns());
}
private void cancelJob(@NotNull CancelJobEvent cancelJobEvent)
@@ -121,7 +122,7 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
}
@SuppressWarnings("RedundantCast")
- private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]>
sortedRDD)
+ private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]>
sortedRDD, String[] columnNames)
{
writeValidator.setPhase("Environment Validation");
writeValidator.validateInitialEnvironment();
@@ -129,7 +130,7 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
try
{
- sortedRDD.foreachPartition(new
WriteIterator(broadcastContext)::call);
+ sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext,
columnNames));
writeValidator.failIfRingChanged();
}
catch (Throwable throwable)
@@ -168,18 +169,11 @@ public class CassandraBulkSourceRelation extends
BaseRelation implements Inserta
}
}
- private static class WriteIterator implements Serializable
+ // Made this function static to avoid capturing reference to
CassandraBulkSourceRelation object which cannot be
+ // serialized.
+ private static VoidFunction<Iterator<Tuple2<DecoratedKey, Object[]>>>
writeRowsInPartition(Broadcast<BulkWriterContext> broadcastContext,
+
String[] columnNames)
{
- private final Broadcast<BulkWriterContext> broadcastContext;
-
- WriteIterator(Broadcast<BulkWriterContext> broadcastContext)
- {
- this.broadcastContext = broadcastContext;
- }
-
- public void call(java.util.Iterator<Tuple2<DecoratedKey, Object[]>>
iterator)
- {
- new RecordWriter(broadcastContext.getValue()).write(iterator);
- }
+ return itr -> new RecordWriter(broadcastContext.getValue(),
columnNames).write(itr);
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 4813b1c..2709b51 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -91,7 +91,7 @@ public class CassandraBulkWriterContext implements
BulkWriterContext, KryoSerial
CqlTable cqlTable = bridge.buildSchema(tableSchema, keyspace,
replicationFactor, partitioner, udts, null, indexCount);
TableInfoProvider tableInfoProvider = new
CqlTableInfoProvider(tableSchema, cqlTable);
- schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema,
tableInfoProvider, conf.writeMode));
+ schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema,
tableInfoProvider, conf.writeMode, conf.getTTLOptions(),
conf.getTimestampOptions()));
}
public static BulkWriterContext fromOptions(@NotNull SparkContext
sparkContext,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index a5aaf8d..32b8632 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -27,8 +27,10 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@@ -52,23 +54,26 @@ public class RecordWriter implements Serializable
private static final Logger LOGGER =
LoggerFactory.getLogger(RecordWriter.class);
private final BulkWriterContext writerContext;
+ private final String[] columnNames;
private Supplier<TaskContext> taskContextSupplier;
private final BiFunction<BulkWriterContext, Path, SSTableWriter>
tableWriterSupplier;
private SSTableWriter sstableWriter = null;
private int batchNumber = 0;
private int batchSize = 0;
- public RecordWriter(BulkWriterContext writerContext)
+ public RecordWriter(BulkWriterContext writerContext, String[] columnNames)
{
- this(writerContext, TaskContext::get, SSTableWriter::new);
+ this(writerContext, columnNames, TaskContext::get, SSTableWriter::new);
}
@VisibleForTesting
RecordWriter(BulkWriterContext writerContext,
+ String[] columnNames,
Supplier<TaskContext> taskContextSupplier,
BiFunction<BulkWriterContext, Path, SSTableWriter>
tableWriterSupplier)
{
this.writerContext = writerContext;
+ this.columnNames = columnNames;
this.taskContextSupplier = taskContextSupplier;
this.tableWriterSupplier = tableWriterSupplier;
}
@@ -99,12 +104,13 @@ public class RecordWriter implements Serializable
Integer.toString(taskContext.stageAttemptNumber()),
Integer.toString(taskContext.attemptNumber()),
Integer.toString(partitionId));
+ Map<String, Object> valueMap = new HashMap<>();
try
{
while (dataIterator.hasNext())
{
maybeCreateTableWriter(partitionId, baseDir);
- writeRow(dataIterator, partitionId, range);
+ writeRow(valueMap, dataIterator, partitionId, range);
checkBatchSize(streamSession, partitionId, job);
}
@@ -138,7 +144,8 @@ public class RecordWriter implements Serializable
}
}
- public void writeRow(scala.collection.Iterator<Tuple2<DecoratedKey,
Object[]>> dataIterator,
+ public void writeRow(Map<String, Object> valueMap,
+ scala.collection.Iterator<Tuple2<DecoratedKey,
Object[]>> dataIterator,
int partitionId,
Range<BigInteger> range) throws IOException
{
@@ -149,7 +156,7 @@ public class RecordWriter implements Serializable
String.format("Received Token %s outside of
expected range %s", token, range));
try
{
- sstableWriter.addRow(token, tuple._2());
+ sstableWriter.addRow(token, getBindValuesForColumns(valueMap,
columnNames, tuple._2()));
}
catch (RuntimeException exception)
{
@@ -186,6 +193,16 @@ public class RecordWriter implements Serializable
}
}
+ private static Map<String, Object> getBindValuesForColumns(Map<String,
Object> map, String[] columnNames, Object[] values)
+ {
+ assert values.length == columnNames.length : "Number of values does
not match the number of columns " + values.length + ", " + columnNames.length;
+ for (int i = 0; i < columnNames.length; i++)
+ {
+ map.put(columnNames[i], values[i]);
+ }
+ return map;
+ }
+
private void finalizeSSTable(StreamSession streamSession,
int partitionId,
SSTableWriter sstableWriter,
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index bf982ef..df6f155 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -91,15 +91,14 @@ public class SSTableWriter
return CASSANDRA_VERSION_PREFIX + lowestCassandraVersion;
}
- public void addRow(BigInteger token, Object[] values) throws IOException
+ public void addRow(BigInteger token, Map<String, Object> boundValues)
throws IOException
{
if (minToken == null)
{
minToken = token;
}
maxToken = token;
-
- cqlSSTableWriter.addRow(values);
+ cqlSSTableWriter.addRow(boundValues);
}
public void close(BulkWriterContext writerContext, int partitionId) throws
IOException
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index 03772bd..5749a07 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -171,6 +171,16 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
+ public static Converter<?> getIntegerConverter()
+ {
+ return INTEGER_CONVERTER;
+ }
+
+ public static Converter<?> getLongConverter()
+ {
+ return LONG_CONVERTER;
+ }
+
private static Converter<?> determineCustomConvert(CqlField.CqlCustom
customType)
{
Preconditions.checkArgument(customType.name().equalsIgnoreCase(CUSTOM),
"Non-custom types are not supported");
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
new file mode 100644
index 0000000..21a07be
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+
+import java.io.Serializable;
+import java.time.Duration;
+
+public final class TTLOption implements Serializable
+{
+ private static final TTLOption FOREVER = new TTLOption(0);
+ private final String ttlColumnName;
+ private final Integer ttlInSeconds;
+
+ private TTLOption(String ttlColumnName)
+ {
+ this.ttlColumnName = ttlColumnName;
+ this.ttlInSeconds = null;
+ }
+
+ private TTLOption(int ttlInSeconds)
+ {
+ this.ttlInSeconds = ttlInSeconds;
+ this.ttlColumnName = null;
+ }
+
+ public static TTLOption from(String ttl)
+ {
+ if (ttl == null)
+ {
+ return FOREVER;
+ }
+ try
+ {
+ return new TTLOption(Integer.parseInt(ttl));
+ }
+ catch (Exception e)
+ {
+
+ return new TTLOption(ttl);
+ }
+ }
+
+ /**
+ * TTL option for write with a constant TTL. When same values for TTL
should be used for all rows in a bulk write
+ * call use this option.
+ *
+ * @param ttlInSeconds ttl value in seconds
+ * @return TTLOption
+ */
+ public static String constant(int ttlInSeconds)
+ {
+ return String.valueOf(ttlInSeconds);
+ }
+
+ /**
+ * TTL option for write with a constant TTL. When same values for TTL
should be used for all rows in a bulk write
+ * call use this option.
+ *
+ * @param duration ttl value in Duration
+ * @return TTLOption
+ */
+ public static String constant(Duration duration)
+ {
+ return String.valueOf(duration.getSeconds());
+ }
+
+ /**
+ * TTL option for writes with TTL per Row. When different TTL has to be
used for different rows in a bulk write
+ * call use this option. It expects the input RDD to supply the TTL values
as an additional column at each row of
+ * the RDD. The TTL value provider column is selected by {@code
ttlColumnName}
+ *
+ * @param ttlColumnName column name which has TTL values for each row
+ * @return TTLOption
+ */
+ public static String perRow(String ttlColumnName)
+ {
+ return ttlColumnName;
+ }
+
+ public static TTLOption forever()
+ {
+ return FOREVER;
+ }
+
+ public String columnName()
+ {
+ return ttlColumnName;
+ }
+
+ public boolean withTTl()
+ {
+ return !this.equals(FOREVER)
+ && (ttlColumnName != null || ttlInSeconds != null);
+ }
+
+ @Override
+ public String toString()
+ {
+ if (ttlColumnName != null && !ttlColumnName.isEmpty())
+ {
+ return ":" + ttlColumnName;
+ }
+ if (ttlInSeconds != null)
+ {
+ return Integer.toString(ttlInSeconds);
+ }
+ return null;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 6e5b0ae..fa3371f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.base.Preconditions;
+import org.apache.cassandra.spark.data.CqlField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +47,15 @@ public class TableSchema implements Serializable
final List<SqlToCqlTypeConverter.Converter<?>> converters;
private final List<Integer> keyFieldPositions;
private final WriteMode writeMode;
+ private final TTLOption ttlOption;
+ private final TimestampOption timestampOption;
- public TableSchema(StructType dfSchema, TableInfoProvider tableInfo,
WriteMode writeMode)
+ public TableSchema(StructType dfSchema, TableInfoProvider tableInfo,
WriteMode writeMode,
+ TTLOption ttlOption, TimestampOption timestampOption)
{
this.writeMode = writeMode;
+ this.ttlOption = ttlOption;
+ this.timestampOption = timestampOption;
validateDataFrameCompatibility(dfSchema, tableInfo);
validateNoSecondaryIndexes(tableInfo);
@@ -58,7 +64,7 @@ public class TableSchema implements Serializable
this.modificationStatement = getModificationStatement(dfSchema,
tableInfo);
this.partitionKeyColumns = getPartitionKeyColumnNames(tableInfo);
this.partitionKeyColumnTypes = getPartitionKeyColumnTypes(tableInfo);
- this.converters = getConverters(dfSchema, tableInfo);
+ this.converters = getConverters(dfSchema, tableInfo, ttlOption,
timestampOption);
LOGGER.info("Converters: {}", converters);
this.keyFieldPositions = getKeyFieldPositions(dfSchema,
tableInfo.getColumnNames(), getRequiredKeyColumns(tableInfo));
}
@@ -100,12 +106,24 @@ public class TableSchema implements Serializable
}
private static List<SqlToCqlTypeConverter.Converter<?>>
getConverters(StructType dfSchema,
-
TableInfoProvider tableInfo)
+
TableInfoProvider tableInfo,
+
TTLOption ttlOption,
+
TimestampOption timestampOption)
{
return Arrays.stream(dfSchema.fieldNames())
- .map(tableInfo::getColumnType)
- .map(SqlToCqlTypeConverter::getConverter)
- .collect(Collectors.toList());
+ .map(fieldName -> {
+ if (fieldName.equals(ttlOption.columnName()))
+ {
+ return
SqlToCqlTypeConverter.getIntegerConverter();
+ }
+ if (fieldName.equals(timestampOption.columnName()))
+ {
+ return SqlToCqlTypeConverter.getLongConverter();
+ }
+ CqlField.CqlType cqlType =
tableInfo.getColumnType(fieldName);
+ return SqlToCqlTypeConverter.getConverter(cqlType);
+ })
+ .collect(Collectors.toList());
}
private static List<ColumnType<?>>
getPartitionKeyColumnTypes(TableInfoProvider tableInfo)
@@ -130,7 +148,7 @@ public class TableSchema implements Serializable
switch (writeMode)
{
case INSERT:
- return getInsertStatement(dfSchema, tableInfo);
+ return getInsertStatement(dfSchema, tableInfo, ttlOption,
timestampOption);
case DELETE_PARTITION:
return getDeleteStatement(dfSchema, tableInfo);
default:
@@ -138,15 +156,38 @@ public class TableSchema implements Serializable
}
}
- private static String getInsertStatement(StructType dfSchema,
TableInfoProvider tableInfo)
+ private static String getInsertStatement(StructType dfSchema,
TableInfoProvider tableInfo,
+ TTLOption ttlOption,
TimestampOption timestampOption)
{
- String insertStatement = String.format("INSERT INTO %s.%s (%s) VALUES
(%s);",
- tableInfo.getKeyspaceName(),
- tableInfo.getName(),
- String.join(",",
dfSchema.fieldNames()),
-
Arrays.stream(dfSchema.fieldNames())
- .map(field -> "?")
-
.collect(Collectors.joining(",")));
+ List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
+ .filter(fieldName ->
!fieldName.equals(ttlOption.columnName()))
+ .filter(fieldName ->
!fieldName.equals(timestampOption.columnName()))
+ .collect(Collectors.toList());
+ StringBuilder stringBuilder = new StringBuilder("INSERT INTO ")
+ .append(tableInfo.getKeyspaceName())
+ .append(".").append(tableInfo.getName())
+ .append(columnNames.stream().collect(Collectors.joining(",", "
(", ") ")))
+ .append("VALUES")
+ .append(columnNames.stream().map(columnName -> ":" +
columnName).collect(Collectors.joining(",", " (", ")")));
+ if (ttlOption.withTTl() && timestampOption.withTimestamp())
+ {
+ stringBuilder.append(" USING TIMESTAMP ")
+ .append(timestampOption)
+ .append(" AND TTL ")
+ .append(ttlOption);
+ }
+ else if (timestampOption.withTimestamp())
+ {
+ stringBuilder.append(" USING TIMESTAMP ")
+ .append(timestampOption);
+ }
+ else if (ttlOption.withTTl())
+ {
+ stringBuilder.append(" USING TTL ")
+ .append(ttlOption);
+ }
+ stringBuilder.append(";");
+ String insertStatement = stringBuilder.toString();
LOGGER.info("CQL insert statement for the RDD {}", insertStatement);
return insertStatement;
@@ -174,7 +215,7 @@ public class TableSchema implements Serializable
switch (writeMode)
{
case INSERT:
- validateDataframeFieldsInTable(tableInfo, dfFields);
+ validateDataframeFieldsInTable(tableInfo, dfFields, ttlOption,
timestampOption);
return;
case DELETE_PARTITION:
validateOnlyPartitionKeyColumnsInDataframe(tableInfo,
dfFields);
@@ -204,12 +245,16 @@ public class TableSchema implements Serializable
.collect(Collectors.joining(",")));
}
- private static void validateDataframeFieldsInTable(TableInfoProvider
tableInfo, Set<String> dfFields)
+ private static void validateDataframeFieldsInTable(TableInfoProvider
tableInfo, Set<String> dfFields,
+ TTLOption ttlOption,
TimestampOption timestampOption)
{
// Make sure all fields in DF schema are part of table
- String unknownFields = dfFields.stream()
+ List<String> unknownFields = dfFields.stream()
.filter(columnName ->
!tableInfo.columnExists(columnName))
- .collect(Collectors.joining(","));
+ .filter(columnName ->
!columnName.equals(ttlOption.columnName()))
+ .filter(columnName ->
!columnName.equals(timestampOption.columnName()))
+ .collect(Collectors.toList());
+
Preconditions.checkArgument(unknownFields.isEmpty(), "Unknown fields
in data frame => " + unknownFields);
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
new file mode 100644
index 0000000..06307b9
--- /dev/null
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+public final class TimestampOption implements Serializable
+{
+ private static final TimestampOption NOW = new
TimestampOption(System.currentTimeMillis() * 1000);
+ private String timestampColumnName;
+ private Long timeStampInMicroSeconds;
+
+ private TimestampOption(String timestampColumnName)
+ {
+ this.timestampColumnName = timestampColumnName;
+ }
+
+ private TimestampOption(Long timeStampInMicroSeconds)
+ {
+ this.timeStampInMicroSeconds = timeStampInMicroSeconds;
+ }
+
+ public static TimestampOption from(String timestamp)
+ {
+ if (timestamp == null)
+ {
+ return NOW;
+ }
+ try
+ {
+ return new TimestampOption(Long.parseLong(timestamp));
+ }
+ catch (Exception e)
+ {
+
+ return new TimestampOption(timestamp);
+ }
+ }
+
+ /**
+ * Timestamp option for write with a constant timestamp. When same values
for timestamp should be used for all rows in
+ * a bulk write call use this option.
+ *
+ * @param timestampInMicroSeconds timestamp value in microseconds
+ * @return timestamp option
+ */
+ public static String constant(long timestampInMicroSeconds)
+ {
+ return String.valueOf(timestampInMicroSeconds);
+ }
+
+ /**
+ * Timestamp option for write with a constant timestamp. When same values
for timestamp should be used for all rows in
+ * a bulk write call use this option.
+ *
+ * @param duration timestamp value in Duration
+ * @return timestamp option
+ */
+ public static String constant(Duration duration)
+ {
+ return String.valueOf(duration.get(ChronoUnit.MICROS));
+ }
+
+ /**
+ * Timestamp option for writes with timestamp per Row. When different
timestamp has to be used for different rows in
+ * a bulk write call use this option. It expects the input RDD to supply
the timestamp values as an additional
+ * column at each row of the RDD. The timestamp value provider column is
selected by {@code timeStampColumnName}
+ *
+ * @param timeStampColumnName column name which has timestamp values for
each row
+ * @return timestamp option
+ */
+ public static String perRow(String timeStampColumnName)
+ {
+ return timeStampColumnName;
+ }
+
+ public static TimestampOption now()
+ {
+ return NOW;
+ }
+
+ public String columnName()
+ {
+ return timestampColumnName;
+ }
+
+ public boolean withTimestamp()
+ {
+ return !this.equals(NOW)
+ && (timestampColumnName != null || timeStampInMicroSeconds !=
null);
+ }
+
+ @Override
+ public String toString()
+ {
+ if (timestampColumnName != null && !timestampColumnName.isEmpty())
+ {
+ return ":" + timestampColumnName;
+ }
+ if (timeStampInMicroSeconds != null)
+ {
+ return Long.toString(timeStampInMicroSeconds);
+ }
+ return null;
+ }
+}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 7b48df8..d824f05 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -43,5 +43,7 @@ public enum WriterOptions implements WriterOption
TRUSTSTORE_BASE64_ENCODED,
SIDECAR_PORT,
ROW_BUFFER_MODE,
- SSTABLE_DATA_SIZE_IN_MB
+ SSTABLE_DATA_SIZE_IN_MB,
+ TTL,
+ TIMESTAMP
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 262b7df..4cf29ca 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -85,6 +85,7 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
private CommitResultSupplier crSupplier = (uuids, dc) -> new
RemoteCommitResult(true, Collections.emptyList(), uuids, null);
private Predicate<CassandraInstance> uploadRequestConsumer = instance ->
true;
+ private TTLOption ttlOption = TTLOption.forever();
public MockBulkWriterContext(CassandraRing<RingInstance> ring,
String cassandraVersion,
@@ -111,6 +112,7 @@ public class MockBulkWriterContext implements
BulkWriterContext, ClusterInfo, Jo
.withPartitionKeyColumnTypes(partitionKeyColumnTypes)
.withWriteMode(WriteMode.INSERT)
.withDataFrameSchema(validDataFrameSchema)
+ .withTTLSetting(ttlOption)
.build();
this.jobId = java.util.UUID.randomUUID();
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
index 4003f54..716b7ee 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
@@ -64,13 +65,13 @@ public class MockTableWriter implements SSTableWriter
}
@Override
- public void addRow(Object... values)
+ public void addRow(Map<String, Object> values) throws IOException
{
if (addRowThrows)
{
throw new RuntimeException("Failed to write because addRow
throws");
}
- rows.add(values);
+ rows.add(values.values().toArray());
}
@Override
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 4e60ec6..1ccdc72 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -48,6 +48,7 @@ public class RecordWriterTest
public static final int REPLICA_COUNT = 3;
public static final int FILES_PER_SSTABLE = 8;
public static final int UPLOADED_TABLES = 3;
+ private static final String[] COLUMN_NAMES = {"id", "date", "course",
"marks"};
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -75,17 +76,51 @@ public class RecordWriterTest
@Test
public void testSuccessfulWrite()
{
- rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new);
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
- rw.write(data);
- Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
- assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
- assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
- List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
- for (UploadRequest ur: requests)
- {
- assertNotNull(ur.fileHash);
- }
+ validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
+ }
+
+ @Test
+ public void testWriteWithConstantTTL()
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
+ validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+ }
+
+ @Test
+ public void testWriteWithTTLColumn()
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, false);
+ String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+ validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
+ }
+
+ @Test
+ public void testWriteWithConstantTimestamp()
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, false);
+ validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+ }
+
+ @Test
+ public void testWriteWithTimestampColumn()
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
false, true);
+ String[] columnNamesWithTimestamp = {"id", "date", "course", "marks",
"timestamp"};
+ validateSuccessfulWrite(bulkWriterContext, data,
columnNamesWithTimestamp);
+ }
+
+ @Test
+ public void testWriteWithTimestampAndTTLColumn()
+ {
+ MockBulkWriterContext bulkWriterContext = new
MockBulkWriterContext(ring);
+ Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true,
true, true);
+ String[] columnNames = {"id", "date", "course", "marks", "ttl",
"timestamp"};
+ validateSuccessfulWrite(bulkWriterContext, data, columnNames);
}
@Test
@@ -93,7 +128,7 @@ public class RecordWriterTest
{
// TODO: Add better error handling with human-readable exception
messages in SSTableReader::new
exception.expect(RuntimeException.class);
- rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new
SSTableWriter(tw.setOutDir(path), path));
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw.setOutDir(path), path));
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
rw.write(data);
Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
@@ -104,7 +139,7 @@ public class RecordWriterTest
@Test
public void testWriteWithOutOfRangeTokenFails()
{
- rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new
SSTableWriter(tw, folder.getRoot().toPath()));
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
exception.expectMessage("Received Token 5765203080415074583 outside of
expected range [-9223372036854775808‥0]");
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, false);
rw.write(data);
@@ -113,7 +148,7 @@ public class RecordWriterTest
@Test
public void testAddRowThrowingFails()
{
- rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new
SSTableWriter(tw, folder.getRoot().toPath()));
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
tw.setAddRowThrows(true);
exception.expectMessage("java.lang.RuntimeException: Failed to write
because addRow throws");
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
@@ -125,7 +160,7 @@ public class RecordWriterTest
{
// Mock context returns a 60-minute allowable time skew, so we use
something just outside the limits
long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61);
- rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new
SSTableWriter(tw, folder.getRoot().toPath()));
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc,
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
writerContext.setTimeProvider(() -> System.currentTimeMillis() -
sixtyOneMinutesInMillis);
exception.expectMessage("Time skew between Spark and Cassandra is too
large. Allowable skew is 60 minutes. Spark executor time is ");
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
@@ -138,16 +173,52 @@ public class RecordWriterTest
// Mock context returns a 60-minute allowable time skew, so we use
something just inside the limits
long fiftyNineMinutesInMillis = TimeUnit.MINUTES.toMillis(59);
long remoteTime = System.currentTimeMillis() -
fiftyNineMinutesInMillis;
- rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new);
+ rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc,
SSTableWriter::new);
writerContext.setTimeProvider(() -> remoteTime); // Return a very low
"current time" to make sure we fail if skew is too bad
Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
rw.write(data);
}
+ private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
+ Iterator<Tuple2<DecoratedKey,
Object[]>> data,
+ String[] columnNames)
+ {
+ RecordWriter rw = new RecordWriter(writerContext, columnNames, () ->
tc, SSTableWriter::new);
+ rw.write(data);
+ Map<CassandraInstance, List<UploadRequest>> uploads =
writerContext.getUploads();
+ assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should
upload to 3 replicas
+ assertThat(uploads.values().stream().mapToInt(List::size).sum(),
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+ List<UploadRequest> requests =
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+ for (UploadRequest ur: requests)
+ {
+ assertNotNull(ur.fileHash);
+ }
+ }
+
private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int
numValues, boolean onlyInRange)
+ {
+ return generateData(numValues, onlyInRange, false, false);
+ }
+ private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int
numValues, boolean onlyInRange, boolean withTTL, boolean withTimestamp)
{
Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0,
integer -> integer + 1).mapToObj(index -> {
- Object[] columns = {index, index, "foo" + index, index};
+ Object[] columns;
+ if (withTTL && withTimestamp)
+ {
+ columns = new Object[]{index, index, "foo" + index, index,
index * 100, System.currentTimeMillis() * 1000};
+ }
+ else if (withTimestamp)
+ {
+ columns = new Object[]{index, index, "foo" + index, index,
System.currentTimeMillis() * 1000};
+ }
+ else if (withTTL)
+ {
+ columns = new Object[]{index, index, "foo" + index, index,
index * 100};
+ }
+ else
+ {
+ columns = new Object[]{index, index, "foo" + index, index};
+ }
return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
});
if (onlyInRange)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
index 8bee484..3b3511c 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
import java.util.Arrays;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -90,7 +91,7 @@ public class SSTableWriterTest
{
MockBulkWriterContext writerContext = new MockBulkWriterContext(ring,
version, ConsistencyLevel.CL.LOCAL_QUORUM);
SSTableWriter tw = new SSTableWriter(writerContext,
tmpDir.getRoot().toPath());
- tw.addRow(BigInteger.ONE, new Object[]{1, 1, "foo", 1});
+ tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1,
"course", "foo", "marks", 1));
tw.close(writerContext, 1);
try (DirectoryStream<Path> dataFileStream =
Files.newDirectoryStream(tw.getOutDir(), "*Data.db"))
{
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 0c6d5db..eb69631 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -24,6 +24,7 @@ import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -67,6 +68,7 @@ public class StreamSessionConsistencyTest
ImmutableMap.of("DC1", 3, "DC2", 3),
"test",
6);
+ private static final Map<String, Object> COLUMN_BIND_VALUES =
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@@ -119,8 +121,7 @@ public class StreamSessionConsistencyTest
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
tr.close(writerContext, 1);
streamSession.scheduleStream(tr);
if (shouldFail)
@@ -157,8 +158,7 @@ public class StreamSessionConsistencyTest
boolean shouldFail = calculateFailure(dc1Failures.get(),
dc2Failures.get());
writerContext.setUploadSupplier(instance ->
dcFailures.get(instance.getDataCenter()).getAndDecrement() <= 0);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
tr.close(writerContext, 1);
streamSession.scheduleStream(tr);
if (shouldFail)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
index bf79c31..e5e5ab1 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
@@ -23,11 +23,13 @@ import java.io.IOException;
import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import org.junit.Before;
@@ -52,6 +54,7 @@ import static org.junit.Assert.assertTrue;
public class StreamSessionTest
{
public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1
ranges with LOCAL_QUORUM";
+ private static final Map<String, Object> COLUMN_BOUND_VALUES =
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private static final int FILES_PER_SSTABLE = 8;
@@ -90,8 +93,7 @@ public class StreamSessionTest
) throws IOException, ExecutionException, InterruptedException
{
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
ss.close(); // Force "execution" of futures
@@ -121,8 +123,7 @@ public class StreamSessionTest
public void testMismatchedTokenRangeFails() throws IOException
{
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(9999L), row);
+ tr.addRow(BigInteger.valueOf(9999L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
IllegalStateException illegalStateException =
assertThrows(IllegalStateException.class,
() ->
ss.scheduleStream(tr));
@@ -170,8 +171,7 @@ public class StreamSessionTest
{
assertThrows(RuntimeException.class, () -> {
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
tableWriter.getOutDir());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
tableWriter.removeOutDir();
ss.scheduleStream(tr);
@@ -190,8 +190,7 @@ public class StreamSessionTest
writerContext.setInstancesAreAvailable(false);
ss = new StreamSession(writerContext, "sessionId", range, executor);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () ->
ss.close());
@@ -234,8 +233,7 @@ public class StreamSessionTest
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
ss.close(); // Force "execution" of futures
@@ -267,8 +265,7 @@ public class StreamSessionTest
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
RuntimeException exception = assertThrows(RuntimeException.class, ()
-> ss.close()); // Force "execution" of futures
@@ -289,8 +286,7 @@ public class StreamSessionTest
{
writerContext.setUploadSupplier(instance -> false);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter,
folder.getRoot().toPath());
- Object[] row = {0, 1, "course", 2};
- tr.addRow(BigInteger.valueOf(102L), row);
+ tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () ->
ss.close());
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index c045492..895ca06 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -72,7 +72,44 @@ public class TableSchemaTest
TableSchema schema = getValidSchemaBuilder()
.build();
assertThat(schema.modificationStatement,
- is(equalTo("INSERT INTO test.test (id,date,course,marks)
VALUES (?,?,?,?);")));
+ is(equalTo("INSERT INTO test.test (id,date,course,marks)
VALUES (:id,:date,:course,:marks);")));
+ }
+
+ @Test
+ public void testInsertStatementWithConstantTTL()
+ {
+ TableSchema schema =
getValidSchemaBuilder().withTTLSetting(TTLOption.from("1000")).build();
+ assertThat(schema.modificationStatement, is(equalTo("INSERT INTO
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL
1000;")));
+ }
+
+ @Test
+ public void testInsertStatementWithTTLColumn()
+ {
+ TableSchema schema =
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).build();
+ assertThat(schema.modificationStatement, is(equalTo("INSERT INTO
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL
:ttl;")));
+ }
+
+ @Test
+ public void testInsertStatementWithConstantTimestamp()
+ {
+ TableSchema schema =
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("1000")).build();
+ String expectedQuery = "INSERT INTO test.test (id,date,course,marks)
VALUES (:id,:date,:course,:marks) USING TIMESTAMP 1000;";
+ assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+ }
+
+ @Test
+ public void testInsertStatementWithTimestampColumn()
+ {
+ TableSchema schema =
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("timestamp")).build();
+ String expectedQuery = "INSERT INTO test.test (id,date,course,marks)
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp;";
+ assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+ }
+ @Test
+ public void testInsertStatementWithTTLAndTimestampColumn()
+ {
+ TableSchema schema =
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).withTimeStampSetting(TimestampOption.from("timestamp")).build();
+ String expectedQuery = "INSERT INTO test.test (id,date,course,marks)
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp AND TTL :ttl;";
+ assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
}
@Test
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index f393f35..18ce2a6 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.cassandra.spark.common.schema.ColumnType;
import org.apache.cassandra.spark.data.CqlField;
+import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -151,6 +152,8 @@ public final class TableSchemaTestCommon
private ColumnType[] partitionKeyColumnTypes;
private StructType dataFrameSchema;
private WriteMode writeMode = null;
+ private TTLOption ttlOption = TTLOption.forever();
+ private TimestampOption timestampOption = TimestampOption.now();
public MockTableSchemaBuilder withCqlColumns(@NotNull Map<String,
CqlField.CqlType> cqlColumns)
{
@@ -207,6 +210,18 @@ public final class TableSchemaTestCommon
return this;
}
+ public MockTableSchemaBuilder withTTLSetting(TTLOption ttlOption)
+ {
+ this.ttlOption = ttlOption;
+ return this;
+ }
+
+ public MockTableSchemaBuilder withTimeStampSetting(TimestampOption
timestampOption)
+ {
+ this.timestampOption = timestampOption;
+ return this;
+ }
+
public TableSchema build()
{
Objects.requireNonNull(cqlColumns,
@@ -228,7 +243,15 @@ public final class TableSchemaTestCommon
partitionKeyColumnTypes,
primaryKeyColumnNames,
cassandraVersion);
- return new TableSchema(dataFrameSchema, tableInfoProvider,
writeMode);
+ if (ttlOption.withTTl() && ttlOption.columnName() != null)
+ {
+ dataFrameSchema = dataFrameSchema.add("ttl",
DataTypes.IntegerType);
+ }
+ if (timestampOption.withTimestamp() &&
timestampOption.columnName() != null)
+ {
+ dataFrameSchema = dataFrameSchema.add("timestamp",
DataTypes.IntegerType);
+ }
+ return new TableSchema(dataFrameSchema, tableInfoProvider,
writeMode, ttlOption, timestampOption);
}
}
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
index 9ae1d52..9ec752d 100644
---
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
+++
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
@@ -21,8 +21,9 @@ package org.apache.cassandra.bridge;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Map;
public interface SSTableWriter extends Closeable
{
- void addRow(Object... values) throws IOException;
+ void addRow(Map<String, Object> values) throws IOException;
}
diff --git
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index c53fc25..05fb8bd 100644
---
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -20,6 +20,7 @@
package org.apache.cassandra.bridge;
import java.io.IOException;
+import java.util.Map;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.dht.IPartitioner;
@@ -62,7 +63,7 @@ public class SSTableWriterImplementation implements
SSTableWriter
}
@Override
- public void addRow(Object... values) throws IOException
+ public void addRow(Map<String, Object> values) throws IOException
{
try
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]