micheal-o commented on code in PR #50045: URL: https://github.com/apache/spark/pull/50045#discussion_r2069927112
########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -324,6 +324,17 @@ "The change log writer version cannot be <version>." ] }, + "KEY_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ Review Comment: Let's avoid duplicating this message here. We already have another error class with this same message, so instead of duplicating, we can pass the message from that exception instance into this exception instance. I have added another comment in the code where you do the exception creation to give more context ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ########## @@ -426,7 +423,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with private lazy val loadedMaps = new util.TreeMap[Long, HDFSBackedStateStoreMap]( Ordering[Long].reverse) private lazy val baseDir = stateStoreId.storeCheckpointLocation() - private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) + private[spark] lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf) Review Comment: nit: why at `spark` pkg level? ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -354,6 +365,17 @@ "<loggingId>: RocksDB instance could not be acquired by <newAcquiredThreadInfo> for operationType=<operationType> as it was not released by <acquiredThreadInfo> after <timeWaitedMs> ms.", "Thread holding the lock has trace: <stackTraceOutput>" ] + }, + "VALUE_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ + "The streaming query failed to validate written state for value row for stateStore=<stateStoreID>.", Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ########## @@ -919,13 +916,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with storeFiles } - private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + private[spark] def compressStream(outputStream: DataOutputStream): DataOutputStream = { Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ########## @@ -919,13 +916,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with storeFiles } - private def compressStream(outputStream: DataOutputStream): DataOutputStream = { + private[spark] def compressStream(outputStream: DataOutputStream): DataOutputStream = { val compressed = CompressionCodec.createCodec(sparkConf, storeConf.compressionCodec) .compressedOutputStream(outputStream) new DataOutputStream(compressed) } - private def decompressStream(inputStream: DataInputStream): DataInputStream = { + private[spark] def decompressStream(inputStream: DataInputStream): DataInputStream = { Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala: ########## @@ -435,15 +452,31 @@ class StateStoreFailedToGetChangelogWriter(version: Long, e: Throwable) messageParameters = Map("version" -> version.toString), cause = e) -class StateStoreKeyRowFormatValidationFailure(errorMsg: String) +class StateStoreKeyRowFormatValidationFailure(errorMsg: String, stateStoreID: String) extends SparkRuntimeException( errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", - messageParameters = Map("errorMsg" -> errorMsg)) + messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> stateStoreID)) + with ConvertableToCannotLoadStoreError { + override def convertToCannotLoadStoreError(): SparkException = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.KEY_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> stateStoreID), + cause = null) + } + } -class StateStoreValueRowFormatValidationFailure(errorMsg: String) +class StateStoreValueRowFormatValidationFailure(errorMsg: String, stateStoreID: String) extends SparkRuntimeException( errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", - messageParameters = Map("errorMsg" -> errorMsg)) + messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> stateStoreID)) + with ConvertableToCannotLoadStoreError { + override def convertToCannotLoadStoreError(): SparkException = { Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala: ########## @@ -435,15 +452,31 @@ class StateStoreFailedToGetChangelogWriter(version: Long, e: Throwable) messageParameters = Map("version" -> version.toString), cause = e) -class StateStoreKeyRowFormatValidationFailure(errorMsg: String) +class StateStoreKeyRowFormatValidationFailure(errorMsg: String, stateStoreID: String) extends SparkRuntimeException( errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", - messageParameters = Map("errorMsg" -> errorMsg)) + messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> stateStoreID)) + with ConvertableToCannotLoadStoreError { + override def convertToCannotLoadStoreError(): SparkException = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.KEY_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> stateStoreID), Review Comment: instead of duplicating params and error message here. You can just call `this.message` which contains the full message from the previous error class, and then you can pass it into the new exception object here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org