HeartSaVioR commented on code in PR #49983:
URL: https://github.com/apache/spark/pull/49983#discussion_r1957769009


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala:
##########
@@ -1471,6 +1471,75 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("SPARK-51187 validate that the incorrect config introduced in 
SPARK-49699 still takes " +
+    "effect when restarting from Spark 3.5.4") {
+    // Spark 3.5.4 is the only release we accidentally introduced the 
incorrect config.
+    // We just need to confirm that current Spark version will apply the fix 
of SPARK-49699 when
+    // the streaming query started from Spark 3.5.4. We should consistently 
apply the fix, instead
+    // of "on and off", because that may expose more possibility to break.
+
+    val problematicConfName = 
"spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan"
+
+    withTempDir { dir =>
+      val input = 
getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
+      assert(input != null, "cannot find test resource")
+      val inputDir = new File(input.toURI)
+
+      // Copy test files to tempDir so that we won't modify the original data.
+      FileUtils.copyDirectory(inputDir, dir)
+
+      // Below is the code we extract checkpoint from Spark 3.5.4. We need to 
make sure the offset
+      // advancement continues from the last run.
+      val inputData = MemoryStream[Int]
+      val df = inputData.toDF()
+
+      inputData.addData(1, 2, 3, 4)
+      inputData.addData(5, 6, 7, 8)
+
+      testStream(df)(
+        StartStream(checkpointLocation = dir.getCanonicalPath),
+        AddData(inputData, 9, 10, 11, 12),
+        ProcessAllAvailable(),
+        AssertOnQuery { q =>
+          val confValue = q.lastExecution.sparkSession.conf.get(
+            SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
+          assert(confValue === false,
+            "The value for the incorrect config in offset metadata should be 
respected as the " +
+              "value of the fixed config")
+
+          val offsetLog = new OffsetSeqLog(spark, new File(dir, 
"offsets").getCanonicalPath)
+          def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: 
Boolean): Unit = {

Review Comment:
   I know this is a bit hard to understand from non-streaming expert folks. 
Please let me know if you need some further explanation to justify the logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to