anishshri-db commented on code in PR #47528:
URL: https://github.com/apache/spark/pull/47528#discussion_r1704549390


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -291,30 +312,54 @@ class OperatorStateMetadataV2Writer(
 
 class OperatorStateMetadataV2Reader(
     stateCheckpointPath: Path,
-    hadoopConf: Configuration) extends OperatorStateMetadataReader {
+    hadoopConf: Configuration,
+    batchId: Long) extends OperatorStateMetadataReader {
+
+  // Check that the requested batchId is available in the checkpoint directory
+  val baseCheckpointDir = stateCheckpointPath.getParent.getParent
+  val lastAvailOffset = 
listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L)
+  if (batchId > lastAvailOffset) {
+    throw 
StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, 
batchId)
+  }
 
   private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
   private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
 
   fm.mkdirs(metadataDirPath.getParent)
+
   override def version: Int = 2
 
-  private def listBatches(): Array[Long] = {
+  // List the available offsets in the offset directory
+  private def listOffsets(baseCheckpointDir: Path): Array[Long] = {
+    val offsetLog = new Path(baseCheckpointDir, "offsets")

Review Comment:
   I did try this Jungtaek but not able to get the spark session in this 
context on the reader. Even if I pass the active session, while trying to 
access this I was getting that the spark session is not defined/available



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to