HeartSaVioR commented on code in PR #50123: URL: https://github.com/apache/spark/pull/50123#discussion_r2038640702
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala: ########## @@ -371,7 +371,8 @@ private class StateStoreCoordinator( val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot val maintenanceInterval = sqlConf.streamingMaintenanceInterval - // Use the configured multipliers to determine the proper alert thresholds + // Use the configured multipliers multiplierForMinVersionDiffToLog and Review Comment: What I meant is to add such an explanation to config doc to make easy of use for users. But I just realized that you have explained (although not a direct config name). That seems fine. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala: ########## @@ -544,12 +544,17 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { .queryName("query") .option("checkpointLocation", checkpointLocation.toString) .start() - // Go through several rounds of input to force snapshot uploads - (0 until 5).foreach { _ => - inputData.addData(1, 2, 3) - query.processAllAvailable() - Thread.sleep(500) - } + // Go through two batches to force two snapshot uploads. + // This would be enough to pass the version check for lagging stores. + inputData.addData(1, 2, 3) + query.processAllAvailable() + inputData.addData(1, 2, 3) + query.processAllAvailable() + + // Sleep for the duration of a maintenance interval - which should be enough + // to pass the time check for lagging stores. + Thread.sleep(100) Review Comment: It's OK to have a room for this (say, 500 here). I was concerned because we sleep 5 times for 500 ms. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala: ########## @@ -158,281 +158,162 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { } } - Seq( - ("RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName), - ("HDFSStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName) - ).foreach { - case (providerName, providerClassName) => - test( - s"SPARK-51358: Snapshot uploads in $providerName are properly reported to the coordinator" - ) { - withCoordinatorAndSQLConf( - sc, - SQLConf.SHUFFLE_PARTITIONS.key -> "5", - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", - SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true", - SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2", - SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0" - ) { - case (coordRef, spark) => - import spark.implicits._ - implicit val sqlContext = spark.sqlContext - - // Start a query and run some data to force snapshot uploads - val inputData = MemoryStream[Int] - val aggregated = inputData.toDF().dropDuplicates() - val checkpointLocation = Utils.createTempDir().getAbsoluteFile - val query = aggregated.writeStream - .format("memory") - .outputMode("update") - .queryName("query") - .option("checkpointLocation", checkpointLocation.toString) - .start() - // Add, commit, and wait multiple times to force snapshot versions and time difference - (0 until 6).foreach { _ => - inputData.addData(1, 2, 3) - query.processAllAvailable() - Thread.sleep(500) - } - val streamingQuery = query.asInstanceOf[StreamingQueryWrapper].streamingQuery - val stateCheckpointDir = streamingQuery.lastExecution.checkpointLocation - val latestVersion = streamingQuery.lastProgress.batchId + 1 - - // Verify all stores have uploaded a snapshot and it's logged by the coordinator - (0 until query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { - partitionId => - val storeId = StateStoreId(stateCheckpointDir, 0, partitionId) - val providerId = StateStoreProviderId(storeId, query.runId) - assert(coordRef.getLatestSnapshotVersionForTesting(providerId).get >= 0) - } - // Verify that we should not have any state stores lagging behind - assert(coordRef.getLaggingStoresForTesting(query.runId, latestVersion).isEmpty) - query.stop() - } - } - } + private val allJoinStateStoreNames: Seq[String] = + SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide) - Seq( + /** Lists the state store providers used for a test, and the set of lagging partition IDs */ + private val regularStateStoreProviders = Seq( + ("RocksDBStateStoreProvider", classOf[RocksDBStateStoreProvider].getName, Set.empty[Int]), + ("HDFSStateStoreProvider", classOf[HDFSBackedStateStoreProvider].getName, Set.empty[Int]) + ) + + /** Lists the state store providers used for a test, and the set of lagging partition IDs */ + private val faultyStateStoreProviders = Seq( ( "RocksDBSkipMaintenanceOnCertainPartitionsProvider", - classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName + classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, + Set(0, 1) ), ( "HDFSBackedSkipMaintenanceOnCertainPartitionsProvider", - classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName + classOf[HDFSBackedSkipMaintenanceOnCertainPartitionsProvider].getName, + Set(0, 1) ) - ).foreach { - case (providerName, providerClassName) => - test( - s"SPARK-51358: Snapshot uploads in $providerName are properly reported to the coordinator" - ) { - withCoordinatorAndSQLConf( - sc, - SQLConf.SHUFFLE_PARTITIONS.key -> "5", - SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", - SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", - SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, - RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", - SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true", - SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2", - SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0" - ) { - case (coordRef, spark) => - import spark.implicits._ - implicit val sqlContext = spark.sqlContext - - // Start a query and run some data to force snapshot uploads - val inputData = MemoryStream[Int] - val aggregated = inputData.toDF().dropDuplicates() - val checkpointLocation = Utils.createTempDir().getAbsoluteFile - val query = aggregated.writeStream - .format("memory") - .outputMode("update") - .queryName("query") - .option("checkpointLocation", checkpointLocation.toString) - .start() - // Add, commit, and wait multiple times to force snapshot versions and time difference - (0 until 6).foreach { _ => - inputData.addData(1, 2, 3) - query.processAllAvailable() - Thread.sleep(500) - } - val streamingQuery = query.asInstanceOf[StreamingQueryWrapper].streamingQuery - val stateCheckpointDir = streamingQuery.lastExecution.checkpointLocation - val latestVersion = streamingQuery.lastProgress.batchId + 1 - - (0 until query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { - partitionId => - val storeId = StateStoreId(stateCheckpointDir, 0, partitionId) - val providerId = StateStoreProviderId(storeId, query.runId) - if (partitionId <= 1) { - // Verify state stores in partition 0/1 are lagging and didn't upload anything - assert(coordRef.getLatestSnapshotVersionForTesting(providerId).getOrElse(0) == 0) - } else { - // Verify other stores uploaded a snapshot and it's logged by the coordinator - assert(coordRef.getLatestSnapshotVersionForTesting(providerId).get >= 0) - } - } - // We should have two state stores (id 0 and 1) that are lagging behind at this point - val laggingStores = coordRef.getLaggingStoresForTesting(query.runId, latestVersion) - assert(laggingStores.size == 2) - assert(laggingStores.forall(_.storeId.partitionId <= 1)) - query.stop() + ) + + private val allStateStoreProviders = + regularStateStoreProviders ++ faultyStateStoreProviders + + /** + * Verifies snapshot upload RPC messages from state stores are registered and verifies + * the coordinator detected the correct lagging partitions. + */ + private def verifySnapshotUploadEvents( + coordRef: StateStoreCoordinatorRef, + query: StreamingQuery, + badPartitions: Set[Int], + storeNames: Seq[String] = Seq(StateStoreId.DEFAULT_STORE_NAME)): Unit = { + val streamingQuery = query.asInstanceOf[StreamingQueryWrapper].streamingQuery + val stateCheckpointDir = streamingQuery.lastExecution.checkpointLocation + val latestVersion = streamingQuery.lastProgress.batchId + 1 + + // Verify all stores have uploaded a snapshot and it's logged by the coordinator + (0 until query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { + partitionId => + // Verify for every store name listed + storeNames.foreach { storeName => + val storeId = StateStoreId(stateCheckpointDir, 0, partitionId, storeName) + val providerId = StateStoreProviderId(storeId, query.runId) + val latestSnapshotVersion = coordRef.getLatestSnapshotVersionForTesting(providerId) + if (badPartitions.contains(partitionId)) { + assert(latestSnapshotVersion.getOrElse(0) == 0) + } else { + assert(latestSnapshotVersion.get >= 0) + } } - } + } + // Verify that only the bad partitions are all marked as lagging. + // Join queries should have all their state stores marked as lagging, + // which would be 4 stores per partition instead of 1. + val laggingStores = coordRef.getLaggingStoresForTesting(query.runId, latestVersion) + assert(laggingStores.size == badPartitions.size * storeNames.size) + assert(laggingStores.map(_.storeId.partitionId).toSet == badPartitions) } - private val allJoinStateStoreNames: Seq[String] = - SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide) + /** Sets up a stateful dropDuplicate query for testing */ + private def setUpStatefulQuery( + inputData: MemoryStream[Int], queryName: String): StreamingQuery = { + // Set up a stateful drop duplicate query + val aggregated = inputData.toDF().dropDuplicates() Review Comment: nit: while we are here and there are other comments, probably better to name as `deduplicated`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org