HeartSaVioR commented on code in PR #50572: URL: https://github.com/apache/spark/pull/50572#discussion_r2051420015
########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2569,10 +2569,11 @@ object SQLConf { .internal() .doc("State format version used by streaming join operations in a streaming query. " + "State between versions are tend to be incompatible, so state format version shouldn't " + - "be modified after running.") + "be modified after running. Version 3 uses a single state store with virtual column " + + "families instead of four stores.") Review Comment: Do we support this only in RocksDB state store provider? If then, shall we describe this as well? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -464,3 +465,11 @@ class OperatorStateMetadataV2FileManager( 0 } } + +case class StreamingJoinOperatorProperties(useVirtualColumnFamilies: Boolean) { Review Comment: It does not seem to be strictly required since we have state format version 3 and state reader should understand it anyway. I'm still OK to have this; overhead seems to be negligible. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -1640,6 +1904,34 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite { } } } + + test("SPARK-51779 - " + Review Comment: Shall we modify the existing test `left semi early state exclusion on left` to perform restart instead of adding a new test? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala: ########## @@ -403,55 +409,26 @@ class SymmetricHashJoinStateManager( def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = keyProjection(currentKey) /** Commit all the changes to all the state stores */ - def commit(): Unit = { - keyToNumValues.commit() - keyWithIndexToValue.commit() - } + def commit(): Unit /** Abort any changes to the state stores if needed */ - def abortIfNeeded(): Unit = { - keyToNumValues.abortIfNeeded() - keyWithIndexToValue.abortIfNeeded() - } + def abortIfNeeded(): Unit /** * Get state store checkpoint information of the two state stores for this joiner, after * they finished data processing. + * + * For [[SymmetricHashJoinStateManagerV1]], this returns the information of the two stores Review Comment: nit: Let's document this in each derived class. As long as the behavior fits to the base class's contract, it should be good. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -2171,4 +2527,65 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite { */ } } + + test("SPARK-51779 - Restart streaming semi join query (windowed left semi join)") { Review Comment: Same with test `windowed left semi join`. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -1581,7 +1844,8 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite { } } - test("SPARK-49829 left-outer join, input being unmatched is between WM for late event and " + + testWithoutVirtualColumnFamilyJoins( Review Comment: Does this mean the bug has reoccured with virtual column family join? Could you please leave code comment about why we don't test with virtual column family? (Probably good to have a brief code comment for every single test which verifies with only one side (either with vcf or without vcf). ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -1844,6 +2136,69 @@ class StreamingFullOuterJoinSuite extends StreamingJoinSuite { assertNumStateRows(total = 15, updated = 7) ) } + + test("SPARK-51779 - Restart streaming outer join query (windowed full outer join)") { Review Comment: Same with test `windowed full outer join` ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -2048,7 +2403,8 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite { ) } - test("SPARK-49829 two chained stream-stream left outer joins among three input streams") { + testWithoutVirtualColumnFamilyJoins( Review Comment: Same, please leave a brief code comment about why this can't run with vcf. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -307,19 +349,23 @@ case class StreamingSymmetricHashJoinExec( assert(stateInfo.isDefined, "State info not defined") val checkpointIds = SymmetricHashJoinStateManager.getStateStoreCheckpointIds( - partitionId, stateInfo.get) + partitionId, stateInfo.get, useVirtualColumnFamilies) val inputSchema = left.output ++ right.output val postJoinFilter = Predicate.create(condition.bothSides.getOrElse(Literal(true)), inputSchema).eval _ - val leftSideJoiner = new OneSideHashJoiner( - LeftSide, left.output, leftKeys, leftInputIter, - condition.leftSideOnly, postJoinFilter, stateWatermarkPredicates.left, partitionId, - checkpointIds.left.keyToNumValues, checkpointIds.left.valueToNumKeys, skippedNullValueCount) - val rightSideJoiner = new OneSideHashJoiner( - RightSide, right.output, rightKeys, rightInputIter, - condition.rightSideOnly, postJoinFilter, stateWatermarkPredicates.right, partitionId, - checkpointIds.right.keyToNumValues, checkpointIds.right.valueToNumKeys, skippedNullValueCount) + // Create left and right side hash joiners and store in the joiner manager Review Comment: Is this a required change, or just a refactor? ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -931,6 +988,211 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { ) } + testWithVirtualColumnFamilyJoins( + "SPARK-51779 Verify StateSchemaV3 writes correct key and value schemas for join operator") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + val metadataPathPostfix = "state/0/_stateSchema/default" + val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(stateSchemaPath, hadoopConf) + + val keySchemaForNums = new StructType().add("field0", IntegerType, nullable = false) + val keySchemaForIndex = keySchemaForNums.add("index", LongType) + val numSchema: StructType = new StructType().add("value", LongType) + val leftIndexSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("leftValue", IntegerType, nullable = false) + .add("matched", BooleanType) + val rightIndexSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("rightValue", IntegerType, nullable = false) + .add("matched", BooleanType) + + val schemaLeftIndex = StateStoreColFamilySchema( + "left-keyWithIndexToValue", 0, + keySchemaForIndex, 0, + leftIndexSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), + None + ) + val schemaLeftNum = StateStoreColFamilySchema( + "left-keyToNumValues", 0, + keySchemaForNums, 0, + numSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), + None + ) + val schemaRightIndex = StateStoreColFamilySchema( + "right-keyWithIndexToValue", 0, + keySchemaForIndex, 0, + rightIndexSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), + None + ) + val schemaRightNum = StateStoreColFamilySchema( + "right-keyToNumValues", 0, + keySchemaForNums, 0, + numSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), + None + ) + + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { q => + val schemaFilePath = fm.list(stateSchemaPath).toSeq.head.getPath + val providerId = StateStoreProviderId( + StateStoreId(checkpointDir.getCanonicalPath, 0, 0), q.lastProgress.runId + ) + val checker = new StateSchemaCompatibilityChecker( + providerId, + hadoopConf, + List(schemaFilePath) + ) + val colFamilySeq = checker.readSchemaFile() + // Verify schema count and contents + assert(colFamilySeq.length == 4) + assert(colFamilySeq.map(_.toString).toSet == Set( + schemaLeftIndex, schemaLeftNum, schemaRightIndex, schemaRightNum + ).map(_.toString)) + }, + StopStream + ) + } + } + + testWithVirtualColumnFamilyJoins( + "SPARK-51779 Restart streaming join query with virtual column families") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1, 2), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { query => + val numInternalKeys = + query.lastProgress + .stateOperators(0) + .customMetrics + .get("rocksdbNumInternalColFamiliesKeys") + // Number of internal column family keys should be nonzero for this join implementation + assert(numInternalKeys.longValue() > 0) + }, + StopStream + ) + + // Restart the query + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 2, 10), // Should join with previous run's data + CheckNewAnswer((10, 20, 30)), + AddData(input2, 2), // Should also join with previous run's data + CheckNewAnswer((2, 4, 6), (2, 4, 6)), + Execute { query => + // The join implementation should not have changed between runs + val numInternalKeys = + query.lastProgress + .stateOperators(0) + .customMetrics + .get("rocksdbNumInternalColFamiliesKeys") + // Number of internal column family keys should still be nonzero for this join + assert(numInternalKeys.longValue() > 0) + }, + StopStream + ) + } + } + + testWithVirtualColumnFamilyJoins( + "SPARK-51779 Join query using virtual column families with HDFS should fail") { + withTempDir { checkpointDir => + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[HDFSBackedStateStoreProvider].getName + ) { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1, 2), + ExpectFailure[StateStoreMultipleColumnFamiliesNotSupportedException] { + (t: Throwable) => { + checkError( + exception = t.asInstanceOf[StateStoreMultipleColumnFamiliesNotSupportedException], + condition = "UNSUPPORTED_FEATURE.STATE_STORE_MULTIPLE_COLUMN_FAMILIES", + sqlState = Some("0A000"), + parameters = Map("stateStoreProvider" -> "HDFSBackedStateStoreProvider") + ) + } + } + ) + } + } + } + + testWithVirtualColumnFamilyJoins("SPARK-51779 Restart the join query and changing " + + "state format versions should reuse the version from offset log") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1, 2), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { query => + val numInternalKeys = + query.lastProgress + .stateOperators(0) + .customMetrics + .get("rocksdbNumInternalColFamiliesKeys") + // Number of internal column family keys should be nonzero for this join implementation + assert(numInternalKeys.longValue() > 0) + }, + StopStream + ) + // Disable the virtual column family join implementation and switch versions Review Comment: nit: let's use withSQLConf when changing config - this would help to avoid thinking of how config value will be in "shared" SparkSession. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org