yifan-c commented on code in PR #109: URL: https://github.com/apache/cassandra-analytics/pull/109#discussion_r2074359925
########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestUtils.java: ########## @@ -36,7 +36,7 @@ public final class TestUtils private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0); // public static final int ROW_COUNT = 10_000; - public static final int ROW_COUNT = 1_000; + public static final int ROW_COUNT = 100; Review Comment: can you revert the change? I assume you lower the row count for local testing. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -389,19 +391,52 @@ private Map<String, Object> getBindValuesForColumns(Map<String, Object> map, Str private Object maybeConvertUdt(Object value) { + if (value instanceof List) + { + List<Object> resultList = new ArrayList<>(); + for (Object entry : (List<?>) value) + { + resultList.add(maybeConvertUdt(entry)); + } + + return resultList; + } + + if (value instanceof Set) + { + Set<Object> resultList = new HashSet<>(); + for (Object entry : (Set<?>) value) + { + resultList.add(maybeConvertUdt(entry)); + } + + return resultList; + } + + if (value instanceof Map) + { + Map<Object, Object> resultMap = new HashMap<>(); + for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) + { + resultMap.put(maybeConvertUdt(entry.getKey()), maybeConvertUdt(entry.getValue())); + } + + return resultMap; + } Review Comment: those 3 branches will copy the list unnecessarily if they do _not_ contain UDT in the table schema. It would produce a lot of short-lived objects. Note that there are millions or more rows to write. One idea to skip the unnecessary conversion is to check the table schema. 1. `RecordWriter` has access to the table schema, e.g. `CqlTable`. 2. `CqlTable` can mark the columns where contain UDT. 3. Instead of calling `maybeConvertUdt` on every column in each row, first check whether `maybeConvertUdt` should be invoked for this column. Now, since you know which columns contain UDT, you can even drop "maybe" from the method name. It would clarify the code too. ########## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java: ########## @@ -59,6 +59,59 @@ class BulkWriteUdtTest extends SharedClusterSparkIntegrationTestBase + " id BIGINT PRIMARY KEY,\n" + " nested " + NESTED_FIELD_UDT_NAME + ");"; + // UDT with list, set and map in it + public static final String UDT_WITH_COLLECTIONS_TYPE_NAME = "udt_with_collections"; + public static final String UDT_WITH_COLLECTIONS_TYPE_CREATE = "CREATE TYPE " + TEST_KEYSPACE + "." + UDT_WITH_COLLECTIONS_TYPE_NAME + + " (f1 list<text>, f2 set<int>, f3 map<int, text>);"; + + // table with list of UDTs, and UDT itself has collections in it + public static final QualifiedName LIST_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "list_of_udt_src"); + public static final QualifiedName LIST_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "list_of_udt_dest"); + public static final String LIST_OF_UDT_TABLE_CREATE = "CREATE TABLE %s.%s (\n" + + " id BIGINT PRIMARY KEY,\n" + + " udtlist frozen<list<frozen<" + UDT_WITH_COLLECTIONS_TYPE_NAME + ">>>)"; + + // table with set of UDTs, and UDT itself has collections in it + public static final QualifiedName SET_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "set_of_udt_src"); + public static final QualifiedName SET_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "set_of_udt_dest"); + public static final String SET_OF_UDT_TABLE_CREATE = "CREATE TABLE %s.%s (\n" + + " id BIGINT PRIMARY KEY,\n" + + " udtset frozen<set<frozen<" + UDT_WITH_COLLECTIONS_TYPE_NAME + ">>>)"; + + // table with map of UDTs, and UDT itself has collections in it + public static final QualifiedName MAP_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "map_of_udt_src"); + public static final QualifiedName MAP_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "map_of_udt_dest"); + public static final String MAP_OF_UDT_TABLE_CREATE = "CREATE TABLE %s.%s (\n" + + " id BIGINT PRIMARY KEY,\n" + + " udtmap frozen<map<frozen<" + UDT_WITH_COLLECTIONS_TYPE_NAME + ">, frozen<" + UDT_WITH_COLLECTIONS_TYPE_NAME + ">>>)"; + + // udt with list of UDTs inside it + public static final String UDT_WITH_LIST_OF_UDT_TYPE_NAME = "udt_with_list_of_udt_type"; + public static final String UDT_WITH_LIST_OF_UDT_TYPE_CREATE = "CREATE TYPE " + TEST_KEYSPACE + "." + UDT_WITH_LIST_OF_UDT_TYPE_NAME + + " (innerudt list<frozen<" + TWO_FIELD_UDT_NAME + ">>);"; + public static final QualifiedName UDT_WITH_LIST_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_list_of_udt_src"); + public static final QualifiedName UDT_WITH_LIST_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_list_of_udt_dest"); + + // udt with set of UDTs inside it + public static final String UDT_WITH_SET_OF_UDT_TYPE_NAME = "udt_with_set_of_udt_type"; + public static final String UDT_WITH_SET_OF_UDT_TYPE_CREATE = "CREATE TYPE " + TEST_KEYSPACE + "." + UDT_WITH_SET_OF_UDT_TYPE_NAME + + " (innerudt set<frozen<" + TWO_FIELD_UDT_NAME + ">>);"; + public static final QualifiedName UDT_WITH_SET_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_set_of_udt_src"); + public static final QualifiedName UDT_WITH_SET_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_set_of_udt_dest"); + + // udt with map of UDTs inside it + public static final String UDT_WITH_MAP_OF_UDT_TYPE_NAME = "udt_with_map_of_udt_type"; + public static final String UDT_WITH_MAP_OF_UDT_TYPE_CREATE = "CREATE TYPE " + TEST_KEYSPACE + "." + UDT_WITH_MAP_OF_UDT_TYPE_NAME + + " (innerudt map<frozen<" + TWO_FIELD_UDT_NAME + ">, frozen<" + TWO_FIELD_UDT_NAME + ">>);"; + public static final QualifiedName UDT_WITH_MAP_OF_UDT_SOURCE_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_map_of_udt_src"); + public static final QualifiedName UDT_WITH_MAP_OF_UDT_DEST_TABLE = new QualifiedName(TEST_KEYSPACE, "udt_with_map_of_udt_dest"); + + // Table with UDT which contains either a list or set or map of UDTs inside it + public static final String UDT_WITH_COLLECTION_OF_UDT_TABLE_CREATE = "CREATE TABLE %s.%s (\n" + + " id BIGINT PRIMARY KEY,\n" + + " outerudt frozen<%s>)"; + + Review Comment: There is no test covering the scenario of UDTs in tuple. Are you going to update this patch or address in a follow-up? (I am OK with either of them) ########## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java: ########## @@ -32,7 +32,7 @@ import org.apache.spark.sql.SparkSession; import org.jetbrains.annotations.NotNull; -import static org.apache.cassandra.testing.TestUtils.DC1_RF3; +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; Review Comment: Curious about the reason of the change? If unnecessary, please revert. ########## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteUdtTest.java: ########## @@ -88,17 +141,298 @@ void testWriteWithNestedUdt() assertThat(result.hasNext()).isTrue(); validateWritesWithDriverResultSet(df.collectAsList(), queryAllDataWithDriver(NESTED_TABLE_NAME), - BulkWriteUdtTest::defaultRowFormatter); + BulkWriteUdtTest::udtRowFormatter); + } + + @Test + void testListOfUdts() + { + int numRowsInserted = populateListOfUdts(); + + // Create a spark frame with the data inserted during the setup + Dataset<Row> sourceData = bulkReaderDataFrame(LIST_OF_UDT_SOURCE_TABLE).load(); + assertThat(sourceData.count()).isEqualTo(numRowsInserted); + + // Insert the dataset containing list of UDTs, and UDT itself has collections in it + bulkWriterDataFrameWriter(sourceData, LIST_OF_UDT_DEST_TABLE).save(); + validateWritesWithDriverResultSet(sourceData.collectAsList(), + queryAllDataWithDriver(LIST_OF_UDT_DEST_TABLE), + BulkWriteUdtTest::listOfUdtRowFormatter); + } + + private int populateListOfUdts() + { + // table(id, list<udt(list<>, set<>, map<>)>) + // insert list of UDTs, and each UDT has a list, set and map + String insertIntoListOfUdts = "INSERT INTO %s (id, udtlist) VALUES (%d, [{f1:['value %d'], f2:{%d}, f3:{%d : 'value %d'}}])"; + + int i = 0; + for (; i < ROW_COUNT; i++) + { + cluster.schemaChangeIgnoringStoppedInstances(String.format(insertIntoListOfUdts, LIST_OF_UDT_SOURCE_TABLE, i, i, i, i, i)); + } + + // test null cases + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id) values (%d)", + LIST_OF_UDT_SOURCE_TABLE, i++)); + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id, udtlist) values (%d, null)", + LIST_OF_UDT_SOURCE_TABLE, i++)); + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id, udtlist) values (%d, [{f1:null, f2:null, f3:null}])", + LIST_OF_UDT_SOURCE_TABLE, i++)); + + return i; + } + + @Test + void testSetOfUdts() + { + int numRowsInserted = populateSetOfUdts(); + // Create a spark frame with the data inserted during the setup + Dataset<Row> sourceData = bulkReaderDataFrame(SET_OF_UDT_SOURCE_TABLE).load(); + assertThat(sourceData.count()).isEqualTo(numRowsInserted); + + // Insert the dataset containing set of UDTs, and UDT itself has collections in it + bulkWriterDataFrameWriter(sourceData, SET_OF_UDT_DEST_TABLE).save(); + validateWritesWithDriverResultSet(sourceData.collectAsList(), + queryAllDataWithDriver(SET_OF_UDT_DEST_TABLE), + BulkWriteUdtTest::setOfUdtRowFormatter); + } + + private int populateSetOfUdts() + { + // table(id, set<udt(list<>, set<>, map<>)>) + // insert set of UDTs, and UDT has a list, set and map inside it + String insertIntoSetOfUdts = "INSERT INTO %s (id, udtset) VALUES (%d, " + + "{{f1:['value %d'], f2:{%d}, f3:{%d : 'value %d'}}})"; + + int i = 0; + for (; i < ROW_COUNT; i++) + { + cluster.schemaChangeIgnoringStoppedInstances(String.format(insertIntoSetOfUdts, SET_OF_UDT_SOURCE_TABLE, + i, i, i, i, i)); + } + + // test null cases + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id) values (%d)", + SET_OF_UDT_SOURCE_TABLE, i++)); + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id, udtset) values (%d, null)", + SET_OF_UDT_SOURCE_TABLE, i++)); + cluster.schemaChangeIgnoringStoppedInstances(String.format("insert into %s (id, udtset) values (%d, " + + "{{f1:null, f2:null, f3:null}})", + SET_OF_UDT_SOURCE_TABLE, i++)); + Review Comment: `schemaChangeIgnoringStoppedInstances` is to change schema. It is not a proper use of the api to insert data. I think you want to use `cluster.coordinator(1).executeWithResult(QUERY)` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org