HeartSaVioR commented on code in PR #47528: URL: https://github.com/apache/spark/pull/47528#discussion_r1703618747
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceErrors.scala: ########## @@ -63,6 +63,12 @@ object StateDataSourceErrors { new StateDataSourceReadStateSchemaFailure(sourceOptions, cause) } + def failedToReadOperatorMetadata( + checkpointLocation: String, Review Comment: nit: 2 more spaces ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala: ########## @@ -104,7 +104,15 @@ class StateMetadataTable extends Table with SupportsRead with SupportsMetadataCo if (!options.containsKey("path")) { throw StateDataSourceErrors.requiredOptionUnspecified(PATH) } - new StateMetadataScan(options.get("path")) + + val checkpointLocation = options.get("path") + var batchId = Option(options.get("batchId")).map(_.toLong) + // if a batchId is provided, use it. Otherwise, use the last committed batch. If there is no + // committed batch, use batchId 0. + batchId = Some(batchId.getOrElse(OperatorStateMetadataUtils Review Comment: nit: Why not ``` batchId = batchId.getOrElse(OperatorStateMetadataUtils.getLastCommittedBatch(SparkSession.active, checkpointLocation).getOrElse(0L)) ``` which won't require us to do `.get` at all? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -291,30 +312,54 @@ class OperatorStateMetadataV2Writer( class OperatorStateMetadataV2Reader( stateCheckpointPath: Path, - hadoopConf: Configuration) extends OperatorStateMetadataReader { + hadoopConf: Configuration, + batchId: Long) extends OperatorStateMetadataReader { + + // Check that the requested batchId is available in the checkpoint directory + val baseCheckpointDir = stateCheckpointPath.getParent.getParent + val lastAvailOffset = listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L) + if (batchId > lastAvailOffset) { + throw StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, batchId) + } private val metadataDirPath = OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath) private lazy val fm = CheckpointFileManager.create(metadataDirPath, hadoopConf) fm.mkdirs(metadataDirPath.getParent) + override def version: Int = 2 - private def listBatches(): Array[Long] = { + // List the available offsets in the offset directory + private def listOffsets(baseCheckpointDir: Path): Array[Long] = { + val offsetLog = new Path(baseCheckpointDir, "offsets") Review Comment: nit: DIR_NAME_OFFSETS rather than "offsets" while we are here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala: ########## @@ -207,12 +217,9 @@ class StateMetadataPartitionReader( } else { 1 } - OperatorStateMetadataReader.createReader( - operatorIdPath, hadoopConf, operatorStateMetadataVersion).read() match { - case Some(metadata) => metadata - case None => OperatorStateMetadataV1(OperatorInfoV1(opId, null), - Array(StateStoreMetadataV1(null, -1, -1))) - } + val reader = OperatorStateMetadataReader.createReader( + operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId) + reader.read().get Review Comment: What's the change? Do we ensure it will never be `None`? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -166,18 +169,36 @@ object OperatorStateMetadataUtils extends Logging { s"version=${operatorStateMetadata.version}") } } + + def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = { + val offsetLog = new OffsetSeqLog(session, + new Path(checkpointLocation, DIR_NAME_OFFSETS).toString) + offsetLog.getLatest() match { Review Comment: nit: probably could be shorten like `offsetLog.getLatest().map(_._1).getOrElse(throw StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation))` But current seems to be OK as well if we feel easier to read. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ########## @@ -133,6 +154,21 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { Array(StateStoreMetadataV2("default", 0, numShufflePartitions, checkpointDir.toString)), "") checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2) + + // Verify that the state store metadata is not available for invalid batches. + val ex = intercept[Exception] { + val invalidBatchId = OperatorStateMetadataUtils.getLastOffsetBatch(spark, + checkpointDir.toString) + 1 + checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2, + Some(invalidBatchId)) + } + assert(ex.getMessage.contains("Failed to read the operator metadata")) Review Comment: nit: please use `checkError` for exception defined with error class framework ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -439,8 +447,9 @@ case class TransformWithStateExec( new Path(getStateInfo.checkpointLocation, s"${getStateInfo.operatorId.toString}") - val storeNamePath = new Path(stateCheckpointPath, storeName) - new Path(new Path(storeNamePath, "_metadata"), "schema") + val stateSchemaPath = new Path(stateCheckpointPath, "_stateSchema") Review Comment: This makes the structure of the directory for previous schema vs new schema very differently. Is this change something you want to propose for future, which other operators would follow up? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -166,18 +169,36 @@ object OperatorStateMetadataUtils extends Logging { s"version=${operatorStateMetadata.version}") } } + + def getLastOffsetBatch(session: SparkSession, checkpointLocation: String): Long = { + val offsetLog = new OffsetSeqLog(session, + new Path(checkpointLocation, DIR_NAME_OFFSETS).toString) + offsetLog.getLatest() match { + case Some((lastId, _)) => lastId + case None => throw StateDataSourceErrors.offsetLogUnavailable(0, checkpointLocation) + } + } + + def getLastCommittedBatch(session: SparkSession, checkpointLocation: String): Option[Long] = { + val commitLog = new CommitLog(session, new Path(checkpointLocation, DIR_NAME_COMMITS).toString) + commitLog.getLatest() match { Review Comment: nit: `commitLog.getLatest().map(_._1)` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -291,30 +312,54 @@ class OperatorStateMetadataV2Writer( class OperatorStateMetadataV2Reader( stateCheckpointPath: Path, - hadoopConf: Configuration) extends OperatorStateMetadataReader { + hadoopConf: Configuration, + batchId: Long) extends OperatorStateMetadataReader { + + // Check that the requested batchId is available in the checkpoint directory + val baseCheckpointDir = stateCheckpointPath.getParent.getParent + val lastAvailOffset = listOffsets(baseCheckpointDir).lastOption.getOrElse(-1L) + if (batchId > lastAvailOffset) { + throw StateDataSourceErrors.failedToReadOperatorMetadata(baseCheckpointDir.toString, batchId) + } private val metadataDirPath = OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath) private lazy val fm = CheckpointFileManager.create(metadataDirPath, hadoopConf) fm.mkdirs(metadataDirPath.getParent) + override def version: Int = 2 - private def listBatches(): Array[Long] = { + // List the available offsets in the offset directory + private def listOffsets(baseCheckpointDir: Path): Array[Long] = { + val offsetLog = new Path(baseCheckpointDir, "offsets") Review Comment: Why not just use `OffsetLog`? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ########## @@ -133,6 +154,21 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { Array(StateStoreMetadataV2("default", 0, numShufflePartitions, checkpointDir.toString)), "") checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2) + + // Verify that the state store metadata is not available for invalid batches. + val ex = intercept[Exception] { + val invalidBatchId = OperatorStateMetadataUtils.getLastOffsetBatch(spark, + checkpointDir.toString) + 1 + checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2, + Some(invalidBatchId)) + } + assert(ex.getMessage.contains("Failed to read the operator metadata")) + + val ex1 = intercept[Exception] { + checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2, + Some(-1)) + } + assert(ex1.getMessage.contains("Failed to read the operator metadata")) Review Comment: nit: please use `checkError` for exception defined with error class framework -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org