HeartSaVioR commented on code in PR #50572:
URL: https://github.com/apache/spark/pull/50572#discussion_r2052936433


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1097,11 +1098,8 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite 
{
           // Number of internal column family keys should be nonzero for this 
join implementation
           assert(numInternalKeys.longValue() > 0)
         },
-        StopStream
-      )
-
-      // Restart the query
-      testStream(joined)(
+        StopStream,
+        // Retart the query from the same checkpoint

Review Comment:
   nit: Re`s`tart



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1210,20 +1211,25 @@ class StreamingOuterJoinSuite extends 
StreamingJoinSuite {
   import org.apache.spark.sql.functions._
 
   test("left outer early state exclusion on left") {
-    val (leftInput, rightInput, joined) = 
setupWindowedJoinWithLeftCondition("left_outer")
+    withTempDir { checkpointDir =>
+      val (leftInput, rightInput, joined) = 
setupWindowedJoinWithLeftCondition("left_outer")
 
-    testStream(joined)(
-      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
-      // The left rows with leftValue <= 4 should generate their outer join 
row now and
-      // not get added to the state.
-      CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, 
null)),
-      assertNumStateRows(total = 4, updated = 4),
-      // We shouldn't get more outer join rows when the watermark advances.
-      MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckNewAnswer(),
-      AddData(rightInput, 20),
-      CheckNewAnswer((20, 30, 40, "60"))
-    )
+      testStream(joined)(
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+        MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+        // The left rows with leftValue <= 4 should generate their outer join 
row now and
+        // not get added to the state.
+        CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, 
null)),
+        assertNumStateRows(total = 4, updated = 4),
+        // We shouldn't get more outer join rows when the watermark advances.
+        MultiAddData(leftInput, 20)(rightInput, 21),
+        CheckNewAnswer(),
+        StopStream,
+        StartStream(checkpointLocation = checkpointDir.getCanonicalPath),

Review Comment:
   nit: let's have an one-liner comment to emphasize it's restarting query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to