HeartSaVioR commented on code in PR #50572: URL: https://github.com/apache/spark/pull/50572#discussion_r2052936433
########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -1097,11 +1098,8 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { // Number of internal column family keys should be nonzero for this join implementation assert(numInternalKeys.longValue() > 0) }, - StopStream - ) - - // Restart the query - testStream(joined)( + StopStream, + // Retart the query from the same checkpoint Review Comment: nit: Re`s`tart ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala: ########## @@ -1210,20 +1211,25 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite { import org.apache.spark.sql.functions._ test("left outer early state exclusion on left") { - val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_outer") + withTempDir { checkpointDir => + val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_outer") - testStream(joined)( - MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5), - // The left rows with leftValue <= 4 should generate their outer join row now and - // not get added to the state. - CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 4, updated = 4), - // We shouldn't get more outer join rows when the watermark advances. - MultiAddData(leftInput, 20)(rightInput, 21), - CheckNewAnswer(), - AddData(rightInput, 20), - CheckNewAnswer((20, 30, 40, "60")) - ) + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5), + // The left rows with leftValue <= 4 should generate their outer join row now and + // not get added to the state. + CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 4, updated = 4), + // We shouldn't get more outer join rows when the watermark advances. + MultiAddData(leftInput, 20)(rightInput, 21), + CheckNewAnswer(), + StopStream, + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), Review Comment: nit: let's have an one-liner comment to emphasize it's restarting query. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org