HeartSaVioR commented on code in PR #49983: URL: https://github.com/apache/spark/pull/49983#discussion_r1957769009
########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala: ########## @@ -1471,6 +1471,75 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " + + "effect when restarting from Spark 3.5.4") { + // Spark 3.5.4 is the only release we accidentally introduced the incorrect config. + // We just need to confirm that current Spark version will apply the fix of SPARK-49699 when + // the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead + // of "on and off", because that may expose more possibility to break. + + val problematicConfName = "spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan" + + withTempDir { dir => + val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4") + assert(input != null, "cannot find test resource") + val inputDir = new File(input.toURI) + + // Copy test files to tempDir so that we won't modify the original data. + FileUtils.copyDirectory(inputDir, dir) + + // Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset + // advancement continues from the last run. + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + inputData.addData(1, 2, 3, 4) + inputData.addData(5, 6, 7, 8) + + testStream(df)( + StartStream(checkpointLocation = dir.getCanonicalPath), + AddData(inputData, 9, 10, 11, 12), + ProcessAllAvailable(), + AssertOnQuery { q => + val confValue = q.lastExecution.sparkSession.conf.get( + SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN) + assert(confValue === false, + "The value for the incorrect config in offset metadata should be respected as the " + + "value of the fixed config") + + val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath) + def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = { Review Comment: I know this is a bit hard to understand from non-streaming expert folks. Please let me know if you need some further explanation to justify the logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org