HeartSaVioR commented on code in PR #49747: URL: https://github.com/apache/spark/pull/49747#discussion_r1936722556
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + Seq(true, false).foreach { colFamiliesEnabled => + test(s"rocksdb range scan - variable size non-ordering columns with non-zero start ordinal " + + s"with colFamiliesEnabled=$colFamiliesEnabled") { + + tryWithProviderResource(newStoreProvider(keySchema, + RangeKeyScanStateEncoderSpec( + keySchema, Seq(1)), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + // use non-default col family if column families are enabled + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchema, valueSchema, + RangeKeyScanStateEncoderSpec(keySchema, Seq(1))) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L, Review Comment: nit: Is it intentional to use Long constants here while we cast to Int again? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + Seq(true, false).foreach { colFamiliesEnabled => + test(s"rocksdb range scan - variable size non-ordering columns with non-zero start ordinal " + + s"with colFamiliesEnabled=$colFamiliesEnabled") { + + tryWithProviderResource(newStoreProvider(keySchema, + RangeKeyScanStateEncoderSpec( + keySchema, Seq(1)), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + // use non-default col family if column families are enabled + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + keySchema, valueSchema, + RangeKeyScanStateEncoderSpec(keySchema, Seq(1))) + } + + val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 2L, 8L, + -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L) + timerTimestamps.foreach { ts => + val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter) Review Comment: nit: Looks like this could be easily extracted out to the inner method to avoid being redundant. Either getting ts or Array/List of ts. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -453,6 +510,68 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + Seq(true, false).foreach { colFamiliesEnabled => + Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes => + test(s"rocksdb range scan multiple ordering columns - with non-zero start ordinal - " + + s"variable size non-ordering columns with colFamiliesEnabled=$colFamiliesEnabled " + + s"sortIndexes=${sortIndexes.mkString(",")}") { + + val testSchema: StructType = StructType( + Seq(StructField("key1", StringType, false), + StructField("key2", LongType, false), + StructField("key3", IntegerType, false))) + + val schemaProj = UnsafeProjection.create(Array[DataType](StringType, LongType, IntegerType)) + + tryWithProviderResource(newStoreProvider(testSchema, + RangeKeyScanStateEncoderSpec(testSchema, sortIndexes), colFamiliesEnabled)) { provider => + val store = provider.getStore(0) + + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent(cfName, + testSchema, valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, sortIndexes)) + } + + val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000), + (1L, 27), (1L, 394), (1L, 5), (3L, 980), + (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), (-1020L, 2), + (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 2323), (-3122L, 323)) + timerTimestamps.foreach { ts => + // order by long col first and then by int col + val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](UTF8String + .fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), ts._1, + ts._2))) + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } + + val result = store.iterator(cfName).map { kv => + val keyRow = kv.key + val key = (keyRow.getString(0), keyRow.getLong(1), keyRow.getInt(2)) + (key._2, key._3) Review Comment: nit: why not just inline? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(exception.getMessage.contains("Found long, expecting union")) } - testWithColumnFamiliesAndEncodingTypes( - "rocksdb range scan multiple non-contiguous ordering columns", - TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled => - val testSchema: StructType = StructType( - Seq( - StructField("ordering1", LongType, false), - StructField("key2", StringType, false), - StructField("ordering2", IntegerType, false), - StructField("string2", StringType, false), - StructField("ordering3", DoubleType, false) + Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes => + testWithColumnFamiliesAndEncodingTypes( + s"rocksdb range scan multiple non-contiguous ordering columns " + + s"and sortIndexes=${sortIndexes.mkString(",")}", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val testSchema: StructType = StructType( + Seq( + StructField("ordering1", LongType, false), + StructField("key2", StringType, false), + StructField("ordering2", IntegerType, false), + StructField("string2", StringType, false), + StructField("ordering3", DoubleType, false) + ) ) - ) - val testSchemaProj = UnsafeProjection.create(Array[DataType]( + val testSchemaProj = UnsafeProjection.create(Array[DataType]( immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*)) - val rangeScanOrdinals = Seq(0, 2, 4) - - tryWithProviderResource( - newStoreProvider( - testSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), - colFamiliesEnabled - ) - ) { provider => - val store = provider.getStore(0) + // Multiply by 2 to get the actual ordinals in the row Review Comment: nit: it does not seem to simplify the logic. (0, 1, 2) * 2 vs (0, 2, 4) does not have a huge difference given that someone needs to know about the schema to understand this. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(exception.getMessage.contains("Found long, expecting union")) } - testWithColumnFamiliesAndEncodingTypes( - "rocksdb range scan multiple non-contiguous ordering columns", - TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled => - val testSchema: StructType = StructType( - Seq( - StructField("ordering1", LongType, false), - StructField("key2", StringType, false), - StructField("ordering2", IntegerType, false), - StructField("string2", StringType, false), - StructField("ordering3", DoubleType, false) + Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes => + testWithColumnFamiliesAndEncodingTypes( + s"rocksdb range scan multiple non-contiguous ordering columns " + + s"and sortIndexes=${sortIndexes.mkString(",")}", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val testSchema: StructType = StructType( + Seq( + StructField("ordering1", LongType, false), + StructField("key2", StringType, false), + StructField("ordering2", IntegerType, false), + StructField("string2", StringType, false), + StructField("ordering3", DoubleType, false) + ) ) - ) - val testSchemaProj = UnsafeProjection.create(Array[DataType]( + val testSchemaProj = UnsafeProjection.create(Array[DataType]( immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*)) - val rangeScanOrdinals = Seq(0, 2, 4) - - tryWithProviderResource( - newStoreProvider( - testSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), - colFamiliesEnabled - ) - ) { provider => - val store = provider.getStore(0) + // Multiply by 2 to get the actual ordinals in the row + val rangeScanOrdinals = sortIndexes.map(_ * 2) - val cfName = if (colFamiliesEnabled) "testColFamily" else "default" - if (colFamiliesEnabled) { - store.createColFamilyIfAbsent( - cfName, + tryWithProviderResource( + newStoreProvider( testSchema, - valueSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals) + RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), + colFamiliesEnabled ) - } + ) { provider => + val store = provider.getStore(0) - val orderedInput = Seq( - // Make sure that the first column takes precedence, even if the - // later columns are greater - (-2L, 0, 99.0), - (-1L, 0, 98.0), - (0L, 0, 97.0), - (2L, 0, 96.0), - // Make sure that the second column takes precedence, when the first - // column is all the same - (3L, -2, -1.0), - (3L, -1, -2.0), - (3L, 0, -3.0), - (3L, 2, -4.0), - // Finally, make sure that the third column takes precedence, when the - // first two ordering columns are the same. - (4L, -1, -127.0), - (4L, -1, 0.0), - (4L, -1, 64.0), - (4L, -1, 127.0) - ) - val scrambledInput = Random.shuffle(orderedInput) - - scrambledInput.foreach { record => - val keyRow = testSchemaProj.apply( - new GenericInternalRow( - Array[Any]( - record._1, - UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), - record._2, - UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), - record._3 - ) + val cfName = if (colFamiliesEnabled) "testColFamily" else "default" + if (colFamiliesEnabled) { + store.createColFamilyIfAbsent( + cfName, + testSchema, + valueSchema, + RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals) ) + } + + val orderedInput = Seq( + // Make sure that the first column takes precedence, even if the + // later columns are greater + (-2L, 0, 99.0), + (-1L, 0, 98.0), + (0L, 0, 97.0), + (2L, 0, 96.0), + // Make sure that the second column takes precedence, when the first + // column is all the same + (3L, -2, -1.0), + (3L, -1, -2.0), + (3L, 0, -3.0), + (3L, 2, -4.0), + // Finally, make sure that the third column takes precedence, when the + // first two ordering columns are the same. + (4L, -1, -127.0), + (4L, -1, 0.0), + (4L, -1, 64.0), + (4L, -1, 127.0) ) + val scrambledInput = Random.shuffle(orderedInput) + + scrambledInput.foreach { record => + val keyRow = testSchemaProj.apply( + new GenericInternalRow( + Array[Any]( + record._1, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), + record._2, + UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString), + record._3 + ) + ) + ) - // The value is just a "dummy" value of 1 - val valueRow = dataToValueRow(1) - store.put(keyRow, valueRow, cfName) - assert(valueRowToData(store.get(keyRow, cfName)) === 1) - } + // The value is just a "dummy" value of 1 + val valueRow = dataToValueRow(1) + store.put(keyRow, valueRow, cfName) + assert(valueRowToData(store.get(keyRow, cfName)) === 1) + } - val result = store - .iterator(cfName) - .map { kv => - val keyRow = kv.key - val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) - (key._1, key._2, key._3) + val result = store + .iterator(cfName) + .map { kv => + val keyRow = kv.key + val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4)) + (key._1, key._2, key._3) Review Comment: ditto, not sure whether the readability will be regressed if we inline. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(exception.getMessage.contains("Found long, expecting union")) } - testWithColumnFamiliesAndEncodingTypes( - "rocksdb range scan multiple non-contiguous ordering columns", - TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled => - val testSchema: StructType = StructType( - Seq( - StructField("ordering1", LongType, false), - StructField("key2", StringType, false), - StructField("ordering2", IntegerType, false), - StructField("string2", StringType, false), - StructField("ordering3", DoubleType, false) + Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { sortIndexes => + testWithColumnFamiliesAndEncodingTypes( + s"rocksdb range scan multiple non-contiguous ordering columns " + + s"and sortIndexes=${sortIndexes.mkString(",")}", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val testSchema: StructType = StructType( + Seq( + StructField("ordering1", LongType, false), + StructField("key2", StringType, false), + StructField("ordering2", IntegerType, false), + StructField("string2", StringType, false), + StructField("ordering3", DoubleType, false) + ) ) - ) - val testSchemaProj = UnsafeProjection.create(Array[DataType]( + val testSchemaProj = UnsafeProjection.create(Array[DataType]( immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): _*)) - val rangeScanOrdinals = Seq(0, 2, 4) - - tryWithProviderResource( - newStoreProvider( - testSchema, - RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals), - colFamiliesEnabled - ) - ) { provider => - val store = provider.getStore(0) + // Multiply by 2 to get the actual ordinals in the row Review Comment: I see how do you use this to simplify; effectively doing projection. Looks OK to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org