HeartSaVioR commented on code in PR #49747:
URL: https://github.com/apache/spark/pull/49747#discussion_r1936722556


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  Seq(true, false).foreach { colFamiliesEnabled =>
+    test(s"rocksdb range scan - variable size non-ordering columns with 
non-zero start ordinal " +
+      s"with colFamiliesEnabled=$colFamiliesEnabled") {
+
+      tryWithProviderResource(newStoreProvider(keySchema,
+        RangeKeyScanStateEncoderSpec(
+          keySchema, Seq(1)), colFamiliesEnabled)) { provider =>
+        val store = provider.getStore(0)
+
+        // use non-default col family if column families are enabled
+        val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+        if (colFamiliesEnabled) {
+          store.createColFamilyIfAbsent(cfName,
+            keySchema, valueSchema,
+            RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
+        }
+
+        val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 
2L, 8L,

Review Comment:
   nit: Is it intentional to use Long constants here while we cast to Int again?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -343,6 +343,63 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  Seq(true, false).foreach { colFamiliesEnabled =>
+    test(s"rocksdb range scan - variable size non-ordering columns with 
non-zero start ordinal " +
+      s"with colFamiliesEnabled=$colFamiliesEnabled") {
+
+      tryWithProviderResource(newStoreProvider(keySchema,
+        RangeKeyScanStateEncoderSpec(
+          keySchema, Seq(1)), colFamiliesEnabled)) { provider =>
+        val store = provider.getStore(0)
+
+        // use non-default col family if column families are enabled
+        val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+        if (colFamiliesEnabled) {
+          store.createColFamilyIfAbsent(cfName,
+            keySchema, valueSchema,
+            RangeKeyScanStateEncoderSpec(keySchema, Seq(1)))
+        }
+
+        val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, -1L, 90L, 1L, 
2L, 8L,
+          -230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L)
+        timerTimestamps.foreach { ts =>
+          val keyRow = dataToKeyRow(Random.alphanumeric.filter(_.isLetter)

Review Comment:
   nit: Looks like this could be easily extracted out to the inner method to 
avoid being redundant. Either getting ts or Array/List of ts.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -453,6 +510,68 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  Seq(true, false).foreach { colFamiliesEnabled =>
+    Seq(Seq(1, 2), Seq(2, 1)).foreach { sortIndexes =>
+      test(s"rocksdb range scan multiple ordering columns - with non-zero 
start ordinal - " +
+        s"variable size non-ordering columns with 
colFamiliesEnabled=$colFamiliesEnabled " +
+        s"sortIndexes=${sortIndexes.mkString(",")}") {
+
+        val testSchema: StructType = StructType(
+          Seq(StructField("key1", StringType, false),
+            StructField("key2", LongType, false),
+            StructField("key3", IntegerType, false)))
+
+        val schemaProj = UnsafeProjection.create(Array[DataType](StringType, 
LongType, IntegerType))
+
+        tryWithProviderResource(newStoreProvider(testSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, sortIndexes), 
colFamiliesEnabled)) { provider =>
+          val store = provider.getStore(0)
+
+          val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+          if (colFamiliesEnabled) {
+            store.createColFamilyIfAbsent(cfName,
+              testSchema, valueSchema,
+              RangeKeyScanStateEncoderSpec(testSchema, sortIndexes))
+          }
+
+          val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), 
(4200L, 68), (90L, 2000),
+            (1L, 27), (1L, 394), (1L, 5), (3L, 980),
+            (-1L, 232), (-1L, 3455), (-6109L, 921455), (-9808344L, 1), 
(-1020L, 2),
+            (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210), (-4344L, 
2323), (-3122L, 323))
+          timerTimestamps.foreach { ts =>
+            // order by long col first and then by int col
+            val keyRow = schemaProj.apply(new 
GenericInternalRow(Array[Any](UTF8String
+              .fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString), ts._1,
+              ts._2)))
+            val valueRow = dataToValueRow(1)
+            store.put(keyRow, valueRow, cfName)
+            assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+          }
+
+          val result = store.iterator(cfName).map { kv =>
+            val keyRow = kv.key
+            val key = (keyRow.getString(0), keyRow.getLong(1), 
keyRow.getInt(2))
+            (key._2, key._3)

Review Comment:
   nit: why not just inline?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     assert(exception.getMessage.contains("Found long, expecting union"))
   }
 
-  testWithColumnFamiliesAndEncodingTypes(
-    "rocksdb range scan multiple non-contiguous ordering columns",
-    TestWithBothChangelogCheckpointingEnabledAndDisabled ) { 
colFamiliesEnabled =>
-    val testSchema: StructType = StructType(
-      Seq(
-        StructField("ordering1", LongType, false),
-        StructField("key2", StringType, false),
-        StructField("ordering2", IntegerType, false),
-        StructField("string2", StringType, false),
-        StructField("ordering3", DoubleType, false)
+  Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { 
sortIndexes =>
+    testWithColumnFamiliesAndEncodingTypes(
+      s"rocksdb range scan multiple non-contiguous ordering columns " +
+        s"and sortIndexes=${sortIndexes.mkString(",")}",
+      TestWithBothChangelogCheckpointingEnabledAndDisabled) { 
colFamiliesEnabled =>
+      val testSchema: StructType = StructType(
+        Seq(
+          StructField("ordering1", LongType, false),
+          StructField("key2", StringType, false),
+          StructField("ordering2", IntegerType, false),
+          StructField("string2", StringType, false),
+          StructField("ordering3", DoubleType, false)
+        )
       )
-    )
 
-    val testSchemaProj = UnsafeProjection.create(Array[DataType](
+      val testSchemaProj = UnsafeProjection.create(Array[DataType](
         immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): 
_*))
-    val rangeScanOrdinals = Seq(0, 2, 4)
-
-    tryWithProviderResource(
-      newStoreProvider(
-        testSchema,
-        RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
-        colFamiliesEnabled
-      )
-    ) { provider =>
-      val store = provider.getStore(0)
+      // Multiply by 2 to get the actual ordinals in the row

Review Comment:
   nit: it does not seem to simplify the logic. (0, 1, 2) * 2 vs (0, 2, 4) does 
not have a huge difference given that someone needs to know about the schema to 
understand this.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     assert(exception.getMessage.contains("Found long, expecting union"))
   }
 
-  testWithColumnFamiliesAndEncodingTypes(
-    "rocksdb range scan multiple non-contiguous ordering columns",
-    TestWithBothChangelogCheckpointingEnabledAndDisabled ) { 
colFamiliesEnabled =>
-    val testSchema: StructType = StructType(
-      Seq(
-        StructField("ordering1", LongType, false),
-        StructField("key2", StringType, false),
-        StructField("ordering2", IntegerType, false),
-        StructField("string2", StringType, false),
-        StructField("ordering3", DoubleType, false)
+  Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { 
sortIndexes =>
+    testWithColumnFamiliesAndEncodingTypes(
+      s"rocksdb range scan multiple non-contiguous ordering columns " +
+        s"and sortIndexes=${sortIndexes.mkString(",")}",
+      TestWithBothChangelogCheckpointingEnabledAndDisabled) { 
colFamiliesEnabled =>
+      val testSchema: StructType = StructType(
+        Seq(
+          StructField("ordering1", LongType, false),
+          StructField("key2", StringType, false),
+          StructField("ordering2", IntegerType, false),
+          StructField("string2", StringType, false),
+          StructField("ordering3", DoubleType, false)
+        )
       )
-    )
 
-    val testSchemaProj = UnsafeProjection.create(Array[DataType](
+      val testSchemaProj = UnsafeProjection.create(Array[DataType](
         immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): 
_*))
-    val rangeScanOrdinals = Seq(0, 2, 4)
-
-    tryWithProviderResource(
-      newStoreProvider(
-        testSchema,
-        RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
-        colFamiliesEnabled
-      )
-    ) { provider =>
-      val store = provider.getStore(0)
+      // Multiply by 2 to get the actual ordinals in the row
+      val rangeScanOrdinals = sortIndexes.map(_ * 2)
 
-      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
-      if (colFamiliesEnabled) {
-        store.createColFamilyIfAbsent(
-          cfName,
+      tryWithProviderResource(
+        newStoreProvider(
           testSchema,
-          valueSchema,
-          RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
+          RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
+          colFamiliesEnabled
         )
-      }
+      ) { provider =>
+        val store = provider.getStore(0)
 
-      val orderedInput = Seq(
-        // Make sure that the first column takes precedence, even if the
-        // later columns are greater
-        (-2L, 0, 99.0),
-        (-1L, 0, 98.0),
-        (0L, 0, 97.0),
-        (2L, 0, 96.0),
-        // Make sure that the second column takes precedence, when the first
-        // column is all the same
-        (3L, -2, -1.0),
-        (3L, -1, -2.0),
-        (3L, 0, -3.0),
-        (3L, 2, -4.0),
-        // Finally, make sure that the third column takes precedence, when the
-        // first two ordering columns are the same.
-        (4L, -1, -127.0),
-        (4L, -1, 0.0),
-        (4L, -1, 64.0),
-        (4L, -1, 127.0)
-      )
-      val scrambledInput = Random.shuffle(orderedInput)
-
-      scrambledInput.foreach { record =>
-        val keyRow = testSchemaProj.apply(
-          new GenericInternalRow(
-            Array[Any](
-              record._1,
-              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
-              record._2,
-              
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
-              record._3
-            )
+        val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+        if (colFamiliesEnabled) {
+          store.createColFamilyIfAbsent(
+            cfName,
+            testSchema,
+            valueSchema,
+            RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals)
           )
+        }
+
+        val orderedInput = Seq(
+          // Make sure that the first column takes precedence, even if the
+          // later columns are greater
+          (-2L, 0, 99.0),
+          (-1L, 0, 98.0),
+          (0L, 0, 97.0),
+          (2L, 0, 96.0),
+          // Make sure that the second column takes precedence, when the first
+          // column is all the same
+          (3L, -2, -1.0),
+          (3L, -1, -2.0),
+          (3L, 0, -3.0),
+          (3L, 2, -4.0),
+          // Finally, make sure that the third column takes precedence, when 
the
+          // first two ordering columns are the same.
+          (4L, -1, -127.0),
+          (4L, -1, 0.0),
+          (4L, -1, 64.0),
+          (4L, -1, 127.0)
         )
+        val scrambledInput = Random.shuffle(orderedInput)
+
+        scrambledInput.foreach { record =>
+          val keyRow = testSchemaProj.apply(
+            new GenericInternalRow(
+              Array[Any](
+                record._1,
+                
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+                record._2,
+                
UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 
1).mkString),
+                record._3
+              )
+            )
+          )
 
-        // The value is just a "dummy" value of 1
-        val valueRow = dataToValueRow(1)
-        store.put(keyRow, valueRow, cfName)
-        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
-      }
+          // The value is just a "dummy" value of 1
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
 
-      val result = store
-        .iterator(cfName)
-        .map { kv =>
-          val keyRow = kv.key
-          val key = (keyRow.getLong(0), keyRow.getInt(2), keyRow.getDouble(4))
-          (key._1, key._2, key._3)
+        val result = store
+          .iterator(cfName)
+          .map { kv =>
+            val keyRow = kv.key
+            val key = (keyRow.getLong(0), keyRow.getInt(2), 
keyRow.getDouble(4))
+            (key._1, key._2, key._3)

Review Comment:
   ditto, not sure whether the readability will be regressed if we inline.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1065,97 +1184,114 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     assert(exception.getMessage.contains("Found long, expecting union"))
   }
 
-  testWithColumnFamiliesAndEncodingTypes(
-    "rocksdb range scan multiple non-contiguous ordering columns",
-    TestWithBothChangelogCheckpointingEnabledAndDisabled ) { 
colFamiliesEnabled =>
-    val testSchema: StructType = StructType(
-      Seq(
-        StructField("ordering1", LongType, false),
-        StructField("key2", StringType, false),
-        StructField("ordering2", IntegerType, false),
-        StructField("string2", StringType, false),
-        StructField("ordering3", DoubleType, false)
+  Seq(Seq(0, 1, 2), Seq(0, 2, 1), Seq(2, 1, 0), Seq(2, 0, 1)).foreach { 
sortIndexes =>
+    testWithColumnFamiliesAndEncodingTypes(
+      s"rocksdb range scan multiple non-contiguous ordering columns " +
+        s"and sortIndexes=${sortIndexes.mkString(",")}",
+      TestWithBothChangelogCheckpointingEnabledAndDisabled) { 
colFamiliesEnabled =>
+      val testSchema: StructType = StructType(
+        Seq(
+          StructField("ordering1", LongType, false),
+          StructField("key2", StringType, false),
+          StructField("ordering2", IntegerType, false),
+          StructField("string2", StringType, false),
+          StructField("ordering3", DoubleType, false)
+        )
       )
-    )
 
-    val testSchemaProj = UnsafeProjection.create(Array[DataType](
+      val testSchemaProj = UnsafeProjection.create(Array[DataType](
         immutable.ArraySeq.unsafeWrapArray(testSchema.fields.map(_.dataType)): 
_*))
-    val rangeScanOrdinals = Seq(0, 2, 4)
-
-    tryWithProviderResource(
-      newStoreProvider(
-        testSchema,
-        RangeKeyScanStateEncoderSpec(testSchema, rangeScanOrdinals),
-        colFamiliesEnabled
-      )
-    ) { provider =>
-      val store = provider.getStore(0)
+      // Multiply by 2 to get the actual ordinals in the row

Review Comment:
   I see how do you use this to simplify; effectively doing projection. Looks 
OK to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to