HeartSaVioR commented on code in PR #47528:
URL: https://github.com/apache/spark/pull/47528#discussion_r1703618747


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala:
##########
@@ -63,6 +63,12 @@ object StateDataSourceErrors {
     new StateDataSourceReadStateSchemaFailure(sourceOptions, cause)
   }
 
+  def failedToReadOperatorMetadata(
+    checkpointLocation: String,

Review Comment:
   nit: 2 more spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -104,7 +104,15 @@ class StateMetadataTable extends Table with SupportsRead 
with SupportsMetadataCo
       if (!options.containsKey("path")) {
         throw StateDataSourceErrors.requiredOptionUnspecified(PATH)
       }
-      new StateMetadataScan(options.get("path"))
+
+      val checkpointLocation = options.get("path")
+      var batchId = Option(options.get("batchId")).map(_.toLong)
+      // if a batchId is provided, use it. Otherwise, use the last committed 
batch. If there is no
+      // committed batch, use batchId 0.
+      batchId = Some(batchId.getOrElse(OperatorStateMetadataUtils

Review Comment:
   nit: Why not 
   
   ```
   batchId = 
batchId.getOrElse(OperatorStateMetadataUtils.getLastCommittedBatch(SparkSession.active,
 checkpointLocation).getOrElse(0L))
   ```
   
   which won't require us to do `.get` at all?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -291,30 +312,54 @@ class OperatorStateMetadataV2Writer(
 
 class OperatorStateMetadataV2Reader(
     stateCheckpointPath: Path,
-    hadoopConf: Configuration) extends OperatorStateMetadataReader {
+    hadoopConf: Configuration,
+    batchId: Long) extends OperatorStateMetadataReader {
+
+  // Check that the requested batchId is available in the checkpoint directory
+  val baseCheckpointDir = stateCheckpointPath.getParent.getParent
+  val lastAvailOffset = 
listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L)
+  if (batchId > lastAvailOffset) {
+    throw 
StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, 
batchId)
+  }
 
   private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
   private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
 
   fm.mkdirs(metadataDirPath.getParent)
+
   override def version: Int = 2
 
-  private def listBatches(): Array[Long] = {
+  // List the available offsets in the offset directory
+  private def listOffsets(baseCheckpointDir: Path): Array[Long] = {
+    val offsetLog = new Path(baseCheckpointDir, "offsets")

Review Comment:
   nit: DIR_NAME_OFFSETS rather than "offsets" while we are here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -207,12 +217,9 @@ class StateMetadataPartitionReader(
       } else {
         1
       }
-      OperatorStateMetadataReader.createReader(
-        operatorIdPath, hadoopConf, operatorStateMetadataVersion).read() match 
{
-        case Some(metadata) => metadata
-        case None => OperatorStateMetadataV1(OperatorInfoV1(opId, null),
-          Array(StateStoreMetadataV1(null, -1, -1)))
-      }
+      val reader = OperatorStateMetadataReader.createReader(
+        operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId)
+      reader.read().get

Review Comment:
   What's the change? Do we ensure it will never be `None`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -166,18 +169,36 @@ object OperatorStateMetadataUtils extends Logging {
           s"version=${operatorStateMetadata.version}")
     }
   }
+
+  def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): 
Long = {
+    val offsetLog = new OffsetSeqLog(session,
+      new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+    offsetLog.getLatest() match {

Review Comment:
   nit: probably could be shorten like 
`offsetLog.getLatest().map(_._1).getOrElse(throw 
StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))`
   
   But current seems to be OK as well if we feel easier to read.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -133,6 +154,21 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
         Array(StateStoreMetadataV2("default", 0, numShufflePartitions, 
checkpointDir.toString)),
         "")
       checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 
2)
+
+      // Verify that the state store metadata is not available for invalid 
batches.
+      val ex = intercept[Exception] {
+        val invalidBatchId = 
OperatorStateMetadataUtils.getLastOffsetBatch(spark,
+          checkpointDir.toString) + 1
+        checkOperatorStateMetadata(checkpointDir.toString, 0, 
expectedMetadata, 2,
+          Some(invalidBatchId))
+      }
+      assert(ex.getMessage.contains("Failed to read the operator metadata"))

Review Comment:
   nit: please use `checkError` for exception defined with error class framework



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -439,8 +447,9 @@ case class TransformWithStateExec(
       new Path(getStateInfo.checkpointLocation,
         s"${getStateInfo.operatorId.toString}")
 
-    val storeNamePath = new Path(stateCheckpointPath, storeName)
-    new Path(new Path(storeNamePath, "_metadata"), "schema")
+    val stateSchemaPath = new Path(stateCheckpointPath, "_stateSchema")

Review Comment:
   This makes the structure of the directory for previous schema vs new schema 
very differently. Is this change something you want to propose for future, 
which other operators would follow up?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -166,18 +169,36 @@ object OperatorStateMetadataUtils extends Logging {
           s"version=${operatorStateMetadata.version}")
     }
   }
+
+  def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): 
Long = {
+    val offsetLog = new OffsetSeqLog(session,
+      new Path(checkpointLocation, DIR_NAME_OFFSETS).toString)
+    offsetLog.getLatest() match {
+      case Some((lastId, _)) => lastId
+      case None => throw StateDataSourceErrors.offsetLogUnavailable(0, 
checkpointLocation)
+    }
+  }
+
+  def getLastCommittedBatch(session: SparkSession, checkpointLocation: 
String): Option[Long] = {
+    val commitLog = new CommitLog(session, new Path(checkpointLocation, 
DIR_NAME_COMMITS).toString)
+    commitLog.getLatest() match {

Review Comment:
   nit: `commitLog.getLatest().map(_._1)`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -291,30 +312,54 @@ class OperatorStateMetadataV2Writer(
 
 class OperatorStateMetadataV2Reader(
     stateCheckpointPath: Path,
-    hadoopConf: Configuration) extends OperatorStateMetadataReader {
+    hadoopConf: Configuration,
+    batchId: Long) extends OperatorStateMetadataReader {
+
+  // Check that the requested batchId is available in the checkpoint directory
+  val baseCheckpointDir = stateCheckpointPath.getParent.getParent
+  val lastAvailOffset = 
listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L)
+  if (batchId > lastAvailOffset) {
+    throw 
StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, 
batchId)
+  }
 
   private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
   private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
 
   fm.mkdirs(metadataDirPath.getParent)
+
   override def version: Int = 2
 
-  private def listBatches(): Array[Long] = {
+  // List the available offsets in the offset directory
+  private def listOffsets(baseCheckpointDir: Path): Array[Long] = {
+    val offsetLog = new Path(baseCheckpointDir, "offsets")

Review Comment:
   Why not just use `OffsetLog`?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -133,6 +154,21 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
         Array(StateStoreMetadataV2("default", 0, numShufflePartitions, 
checkpointDir.toString)),
         "")
       checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 
2)
+
+      // Verify that the state store metadata is not available for invalid 
batches.
+      val ex = intercept[Exception] {
+        val invalidBatchId = 
OperatorStateMetadataUtils.getLastOffsetBatch(spark,
+          checkpointDir.toString) + 1
+        checkOperatorStateMetadata(checkpointDir.toString, 0, 
expectedMetadata, 2,
+          Some(invalidBatchId))
+      }
+      assert(ex.getMessage.contains("Failed to read the operator metadata"))
+
+      val ex1 = intercept[Exception] {
+        checkOperatorStateMetadata(checkpointDir.toString, 0, 
expectedMetadata, 2,
+          Some(-1))
+      }
+      assert(ex1.getMessage.contains("Failed to read the operator metadata"))

Review Comment:
   nit: please use `checkError` for exception defined with error class framework



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to