HeartSaVioR commented on code in PR #50572:
URL: https://github.com/apache/spark/pull/50572#discussion_r2051420015


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2569,10 +2569,11 @@ object SQLConf {
       .internal()
       .doc("State format version used by streaming join operations in a 
streaming query. " +
         "State between versions are tend to be incompatible, so state format 
version shouldn't " +
-        "be modified after running.")
+        "be modified after running. Version 3 uses a single state store with 
virtual column " +
+        "families instead of four stores.")

Review Comment:
   Do we support this only in RocksDB state store provider? If then, shall we 
describe this as well?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -464,3 +465,11 @@ class OperatorStateMetadataV2FileManager(
     0
   }
 }
+
+case class StreamingJoinOperatorProperties(useVirtualColumnFamilies: Boolean) {

Review Comment:
   It does not seem to be strictly required since we have state format version 
3 and state reader should understand it anyway. I'm still OK to have this; 
overhead seems to be negligible.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1640,6 +1904,34 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite 
{
       }
     }
   }
+
+  test("SPARK-51779 - " +

Review Comment:
   Shall we modify the existing test `left semi early state exclusion on left` 
to perform restart instead of adding a new test?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -403,55 +409,26 @@ class SymmetricHashJoinStateManager(
   def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = 
keyProjection(currentKey)
 
   /** Commit all the changes to all the state stores */
-  def commit(): Unit = {
-    keyToNumValues.commit()
-    keyWithIndexToValue.commit()
-  }
+  def commit(): Unit
 
   /** Abort any changes to the state stores if needed */
-  def abortIfNeeded(): Unit = {
-    keyToNumValues.abortIfNeeded()
-    keyWithIndexToValue.abortIfNeeded()
-  }
+  def abortIfNeeded(): Unit
 
   /**
    * Get state store checkpoint information of the two state stores for this 
joiner, after
    * they finished data processing.
+   *
+   * For [[SymmetricHashJoinStateManagerV1]], this returns the information of 
the two stores

Review Comment:
   nit: Let's document this in each derived class. As long as the behavior fits 
to the base class's contract, it should be good.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -2171,4 +2527,65 @@ class StreamingLeftSemiJoinSuite extends 
StreamingJoinSuite {
        */
     }
   }
+
+  test("SPARK-51779 - Restart streaming semi join query (windowed left semi 
join)") {

Review Comment:
   Same with test `windowed left semi join`.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1581,7 +1844,8 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
     }
   }
 
-  test("SPARK-49829 left-outer join, input being unmatched is between WM for 
late event and " +
+  testWithoutVirtualColumnFamilyJoins(

Review Comment:
   Does this mean the bug has reoccured with virtual column family join? Could 
you please leave code comment about why we don't test with virtual column 
family? (Probably good to have a brief code comment for every single test which 
verifies with only one side (either with vcf or without vcf).



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1844,6 +2136,69 @@ class StreamingFullOuterJoinSuite extends 
StreamingJoinSuite {
       assertNumStateRows(total = 15, updated = 7)
     )
   }
+
+  test("SPARK-51779 - Restart streaming outer join query (windowed full outer 
join)") {

Review Comment:
   Same with test `windowed full outer join`



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -2048,7 +2403,8 @@ class StreamingLeftSemiJoinSuite extends 
StreamingJoinSuite {
     )
   }
 
-  test("SPARK-49829 two chained stream-stream left outer joins among three 
input streams") {
+  testWithoutVirtualColumnFamilyJoins(

Review Comment:
   Same, please leave a brief code comment about why this can't run with vcf.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -307,19 +349,23 @@ case class StreamingSymmetricHashJoinExec(
 
     assert(stateInfo.isDefined, "State info not defined")
     val checkpointIds = 
SymmetricHashJoinStateManager.getStateStoreCheckpointIds(
-      partitionId, stateInfo.get)
+      partitionId, stateInfo.get, useVirtualColumnFamilies)
 
     val inputSchema = left.output ++ right.output
     val postJoinFilter =
       Predicate.create(condition.bothSides.getOrElse(Literal(true)), 
inputSchema).eval _
-    val leftSideJoiner = new OneSideHashJoiner(
-      LeftSide, left.output, leftKeys, leftInputIter,
-      condition.leftSideOnly, postJoinFilter, stateWatermarkPredicates.left, 
partitionId,
-      checkpointIds.left.keyToNumValues, checkpointIds.left.valueToNumKeys, 
skippedNullValueCount)
-    val rightSideJoiner = new OneSideHashJoiner(
-      RightSide, right.output, rightKeys, rightInputIter,
-      condition.rightSideOnly, postJoinFilter, stateWatermarkPredicates.right, 
partitionId,
-      checkpointIds.right.keyToNumValues, checkpointIds.right.valueToNumKeys, 
skippedNullValueCount)
+    // Create left and right side hash joiners and store in the joiner manager

Review Comment:
   Is this a required change, or just a refactor?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -931,6 +988,211 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
     )
   }
 
+  testWithVirtualColumnFamilyJoins(
+    "SPARK-51779 Verify StateSchemaV3 writes correct key and value schemas for 
join operator") {
+    withTempDir { checkpointDir =>
+      val input1 = MemoryStream[Int]
+      val input2 = MemoryStream[Int]
+
+      val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as 
"leftValue")
+      val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as 
"rightValue")
+      val joined = df1.join(df2, "key")
+
+      val metadataPathPostfix = "state/0/_stateSchema/default"
+      val stateSchemaPath = new Path(checkpointDir.toString, 
s"$metadataPathPostfix")
+      val hadoopConf = spark.sessionState.newHadoopConf()
+      val fm = CheckpointFileManager.create(stateSchemaPath, hadoopConf)
+
+      val keySchemaForNums = new StructType().add("field0", IntegerType, 
nullable = false)
+      val keySchemaForIndex = keySchemaForNums.add("index", LongType)
+      val numSchema: StructType = new StructType().add("value", LongType)
+      val leftIndexSchema: StructType = new StructType()
+        .add("key", IntegerType, nullable = false)
+        .add("leftValue", IntegerType, nullable = false)
+        .add("matched", BooleanType)
+      val rightIndexSchema: StructType = new StructType()
+        .add("key", IntegerType, nullable = false)
+        .add("rightValue", IntegerType, nullable = false)
+        .add("matched", BooleanType)
+
+      val schemaLeftIndex = StateStoreColFamilySchema(
+        "left-keyWithIndexToValue", 0,
+        keySchemaForIndex, 0,
+        leftIndexSchema,
+        Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)),
+        None
+      )
+      val schemaLeftNum = StateStoreColFamilySchema(
+        "left-keyToNumValues", 0,
+        keySchemaForNums, 0,
+        numSchema,
+        Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)),
+        None
+      )
+      val schemaRightIndex = StateStoreColFamilySchema(
+        "right-keyWithIndexToValue", 0,
+        keySchemaForIndex, 0,
+        rightIndexSchema,
+        Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)),
+        None
+      )
+      val schemaRightNum = StateStoreColFamilySchema(
+        "right-keyToNumValues", 0,
+        keySchemaForNums, 0,
+        numSchema,
+        Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)),
+        None
+      )
+
+      testStream(joined)(
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+        AddData(input1, 1),
+        CheckAnswer(),
+        AddData(input2, 1, 10),
+        CheckNewAnswer((1, 2, 3)),
+        Execute { q =>
+          val schemaFilePath = fm.list(stateSchemaPath).toSeq.head.getPath
+          val providerId = StateStoreProviderId(
+            StateStoreId(checkpointDir.getCanonicalPath, 0, 0), 
q.lastProgress.runId
+          )
+          val checker = new StateSchemaCompatibilityChecker(
+            providerId,
+            hadoopConf,
+            List(schemaFilePath)
+          )
+          val colFamilySeq = checker.readSchemaFile()
+          // Verify schema count and contents
+          assert(colFamilySeq.length == 4)
+          assert(colFamilySeq.map(_.toString).toSet == Set(
+            schemaLeftIndex, schemaLeftNum, schemaRightIndex, schemaRightNum
+          ).map(_.toString))
+        },
+        StopStream
+      )
+    }
+  }
+
+  testWithVirtualColumnFamilyJoins(
+    "SPARK-51779 Restart streaming join query with virtual column families") {
+    withTempDir { checkpointDir =>
+      val input1 = MemoryStream[Int]
+      val input2 = MemoryStream[Int]
+
+      val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as 
"leftValue")
+      val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as 
"rightValue")
+      val joined = df1.join(df2, "key")
+
+      testStream(joined)(
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+        AddData(input1, 1, 2),
+        CheckAnswer(),
+        AddData(input2, 1, 10),
+        CheckNewAnswer((1, 2, 3)),
+        Execute { query =>
+          val numInternalKeys =
+            query.lastProgress
+              .stateOperators(0)
+              .customMetrics
+              .get("rocksdbNumInternalColFamiliesKeys")
+          // Number of internal column family keys should be nonzero for this 
join implementation
+          assert(numInternalKeys.longValue() > 0)
+        },
+        StopStream
+      )
+
+      // Restart the query
+      testStream(joined)(
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+        AddData(input1, 2, 10), // Should join with previous run's data
+        CheckNewAnswer((10, 20, 30)),
+        AddData(input2, 2), // Should also join with previous run's data
+        CheckNewAnswer((2, 4, 6), (2, 4, 6)),
+        Execute { query =>
+          // The join implementation should not have changed between runs
+          val numInternalKeys =
+            query.lastProgress
+              .stateOperators(0)
+              .customMetrics
+              .get("rocksdbNumInternalColFamiliesKeys")
+          // Number of internal column family keys should still be nonzero for 
this join
+          assert(numInternalKeys.longValue() > 0)
+        },
+        StopStream
+      )
+    }
+  }
+
+  testWithVirtualColumnFamilyJoins(
+    "SPARK-51779 Join query using virtual column families with HDFS should 
fail") {
+    withTempDir { checkpointDir =>
+      withSQLConf(
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[HDFSBackedStateStoreProvider].getName
+      ) {
+        val input1 = MemoryStream[Int]
+        val input2 = MemoryStream[Int]
+
+        val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as 
"leftValue")
+        val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as 
"rightValue")
+        val joined = df1.join(df2, "key")
+
+        testStream(joined)(
+          StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+          AddData(input1, 1, 2),
+          ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] 
{
+            (t: Throwable) => {
+              checkError(
+                exception = 
t.asInstanceOf[StateStoreMultipleColumnFamiliesNotSupportedException],
+                condition = 
"UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES",
+                sqlState = Some("0A000"),
+                parameters = Map("stateStoreProvider" -> 
"HDFSBackedStateStoreProvider")
+              )
+            }
+          }
+        )
+      }
+    }
+  }
+
+  testWithVirtualColumnFamilyJoins("SPARK-51779 Restart the join query and 
changing " +
+    "state format versions should reuse the version from offset log") {
+    withTempDir { checkpointDir =>
+      val input1 = MemoryStream[Int]
+      val input2 = MemoryStream[Int]
+
+      val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as 
"leftValue")
+      val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as 
"rightValue")
+      val joined = df1.join(df2, "key")
+
+      testStream(joined)(
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+        AddData(input1, 1, 2),
+        CheckAnswer(),
+        AddData(input2, 1, 10),
+        CheckNewAnswer((1, 2, 3)),
+        Execute { query =>
+          val numInternalKeys =
+            query.lastProgress
+              .stateOperators(0)
+              .customMetrics
+              .get("rocksdbNumInternalColFamiliesKeys")
+          // Number of internal column family keys should be nonzero for this 
join implementation
+          assert(numInternalKeys.longValue() > 0)
+        },
+        StopStream
+      )
+      // Disable the virtual column family join implementation and switch 
versions

Review Comment:
   nit: let's use withSQLConf when changing config - this would help to avoid 
thinking of how config value will be in "shared" SparkSession.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to