liviazhu-db commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r1977990287


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -129,10 +158,17 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
+private class StateStoreCoordinator(
+    override val rpcEnv: RpcEnv,
+    val sqlConf: SQLConf)
+    extends ThreadSafeRpcEndpoint

Review Comment:
   nit: one indent too many



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1467,6 +1475,11 @@ class RocksDB(
         log"time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms. " +
         log"Current lineage: ${MDC(LogKeys.LINEAGE, lineageManager)}")
       lastUploadedSnapshotVersion.set(snapshot.version)
+      // Report to coordinator that the snapshot has been uploaded when
+      // changelog checkpointing is enabled, since that is when stores can lag 
behind.
+      if(enableChangelogCheckpointing) {

Review Comment:
   nit: space after if



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -155,16 +156,296 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
       StateStore.stop()
     }
   }
+
+  test(
+    "SPARK-51358: Snapshot uploads in RocksDB are not reported if changelog " +
+    "checkpointing is disabled"
+  ) {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "false"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        val stateCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+        // Verify stores do not report snapshot upload events to the 
coordinator.
+        // As a result, all stores will return nothing as the latest version
+        (0 until 
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId 
=>
+          val providerId =
+            StateStoreProviderId(StateStoreId(stateCheckpointDir, 0, 
partitionId), query.runId)
+          assert(coordRef.getLatestSnapshotVersion(providerId).isEmpty)
+        }
+        query.stop()
+    }
+  }
+
+  test("SPARK-51358: Snapshot uploads in RocksDB are properly reported to the 
coordinator") {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key -> 
"2"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        val stateCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+        // Verify all stores have uploaded a snapshot and it's logged by the 
coordinator
+        (0 until 
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId 
=>
+          val providerId =
+            StateStoreProviderId(StateStoreId(stateCheckpointDir, 0, 
partitionId), query.runId)
+          assert(coordRef.getLatestSnapshotVersion(providerId).get >= 0)
+        }
+        // Verify that we should not have any state stores lagging behind
+        assert(coordRef.getLaggingStores().isEmpty)
+        query.stop()
+    }
+  }
+
+  test(
+    "SPARK-51358: Snapshot uploads in 
RocksDBSkipMaintenanceOnCertainPartitionsProvider " +
+    "are properly reported to the coordinator"
+  ) {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+        classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key -> 
"2"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)

Review Comment:
   Why are you doing this twice rather than all together?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -38,9 +38,14 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{NonFateSharingCache, Utils}
 
+/** Trait representing the different events reported from RocksDB instance */
+trait RocksDBEventListener {

Review Comment:
   Why RocksDBEventListener? Are we not adding this to HDFS later too?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -155,16 +156,296 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
       StateStore.stop()
     }
   }
+
+  test(
+    "SPARK-51358: Snapshot uploads in RocksDB are not reported if changelog " +
+    "checkpointing is disabled"
+  ) {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "false"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        val stateCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+        // Verify stores do not report snapshot upload events to the 
coordinator.
+        // As a result, all stores will return nothing as the latest version
+        (0 until 
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId 
=>
+          val providerId =
+            StateStoreProviderId(StateStoreId(stateCheckpointDir, 0, 
partitionId), query.runId)
+          assert(coordRef.getLatestSnapshotVersion(providerId).isEmpty)
+        }
+        query.stop()
+    }
+  }
+
+  test("SPARK-51358: Snapshot uploads in RocksDB are properly reported to the 
coordinator") {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key -> 
"2"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        val stateCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+        // Verify all stores have uploaded a snapshot and it's logged by the 
coordinator
+        (0 until 
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId 
=>
+          val providerId =
+            StateStoreProviderId(StateStoreId(stateCheckpointDir, 0, 
partitionId), query.runId)
+          assert(coordRef.getLatestSnapshotVersion(providerId).get >= 0)
+        }
+        // Verify that we should not have any state stores lagging behind
+        assert(coordRef.getLaggingStores().isEmpty)
+        query.stop()
+    }
+  }
+
+  test(
+    "SPARK-51358: Snapshot uploads in 
RocksDBSkipMaintenanceOnCertainPartitionsProvider " +
+    "are properly reported to the coordinator"
+  ) {
+    withCoordinatorAndSQLConf(
+      sc,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+        classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_COORDINATOR_MIN_SNAPSHOT_VERSION_DELTA_TO_LOG.key -> 
"2"
+    ) {
+      case (coordRef, spark) =>
+        import spark.implicits._
+        implicit val sqlContext = spark.sqlContext
+
+        // Start a query and run some data to force snapshot uploads
+        val inputData = MemoryStream[Int]
+        val aggregated = inputData.toDF().dropDuplicates()
+        val checkpointLocation = Utils.createTempDir().getAbsoluteFile
+        val query = aggregated.writeStream
+          .format("memory")
+          .outputMode("update")
+          .queryName("query")
+          .option("checkpointLocation", checkpointLocation.toString)
+          .start()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        inputData.addData(1, 2, 3)
+        query.processAllAvailable()
+        val stateCheckpointDir =
+          
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.checkpointLocation
+
+        (0 until 
query.sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)).foreach { partitionId 
=>
+          val providerId =
+            StateStoreProviderId(StateStoreId(stateCheckpointDir, 0, 
partitionId), query.runId)
+          if(partitionId <= 1) {

Review Comment:
   nit: space after if



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -134,6 +134,9 @@ class RocksDB(
   rocksDbOptions.setStatistics(new Statistics())
   private val nativeStats = rocksDbOptions.statistics()
 
+  // Stores a StateStoreProvider reference for event callback such as snapshot 
upload reports
+  private var providerListener: Option[RocksDBEventListener] = None

Review Comment:
   It seems like this should only be set once upon initialization and shouldn't 
change. Can you add it to the constructor arguments and change to a val so it 
doesn't accidentally get modified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to