micheal-o commented on code in PR #50045:
URL: https://github.com/apache/spark/pull/50045#discussion_r2069927112


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -324,6 +324,17 @@
           "The change log writer version cannot be <version>."
         ]
       },
+      "KEY_ROW_FORMAT_VALIDATION_FAILURE" : {
+        "message" : [

Review Comment:
   Let's avoid duplicating this message here. We already have another error 
class with this same message, so instead of duplicating, we can pass the 
message from that exception instance into this exception instance. I have added 
another comment in the code where you do the exception creation to give more 
context



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -426,7 +423,7 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   private lazy val loadedMaps = new util.TreeMap[Long, 
HDFSBackedStateStoreMap](
     Ordering[Long].reverse)
   private lazy val baseDir = stateStoreId.storeCheckpointLocation()
-  private lazy val fm = CheckpointFileManager.create(baseDir, hadoopConf)
+  private[spark] lazy val fm = CheckpointFileManager.create(baseDir, 
hadoopConf)

Review Comment:
   nit: why at `spark` pkg level?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -354,6 +365,17 @@
           "<loggingId>: RocksDB instance could not be acquired by 
<newAcquiredThreadInfo> for operationType=<operationType> as it was not 
released by <acquiredThreadInfo> after <timeWaitedMs> ms.",
           "Thread holding the lock has trace: <stackTraceOutput>"
         ]
+      },
+      "VALUE_ROW_FORMAT_VALIDATION_FAILURE" : {
+        "message" : [
+          "The streaming query failed to validate written state for value row 
for stateStore=<stateStoreID>.",

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -919,13 +916,13 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     storeFiles
   }
 
-  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+  private[spark] def compressStream(outputStream: DataOutputStream): 
DataOutputStream = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -919,13 +916,13 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     storeFiles
   }
 
-  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+  private[spark] def compressStream(outputStream: DataOutputStream): 
DataOutputStream = {
     val compressed = CompressionCodec.createCodec(sparkConf, 
storeConf.compressionCodec)
       .compressedOutputStream(outputStream)
     new DataOutputStream(compressed)
   }
 
-  private def decompressStream(inputStream: DataInputStream): DataInputStream 
= {
+  private[spark] def decompressStream(inputStream: DataInputStream): 
DataInputStream = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala:
##########
@@ -435,15 +452,31 @@ class StateStoreFailedToGetChangelogWriter(version: Long, 
e: Throwable)
     messageParameters = Map("version" -> version.toString),
     cause = e)
 
-class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
+class StateStoreKeyRowFormatValidationFailure(errorMsg: String, stateStoreID: 
String)
   extends SparkRuntimeException(
     errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",
-    messageParameters = Map("errorMsg" -> errorMsg))
+    messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> 
stateStoreID))
+  with ConvertableToCannotLoadStoreError {
+    override def convertToCannotLoadStoreError(): SparkException = {
+      new SparkException(
+        errorClass = 
"CANNOT_LOAD_STATE_STORE.KEY_ROW_FORMAT_VALIDATION_FAILURE",
+        messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> 
stateStoreID),
+        cause = null)
+    }
+  }
 
-class StateStoreValueRowFormatValidationFailure(errorMsg: String)
+class StateStoreValueRowFormatValidationFailure(errorMsg: String, 
stateStoreID: String)
   extends SparkRuntimeException(
     errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
-    messageParameters = Map("errorMsg" -> errorMsg))
+    messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> 
stateStoreID))
+  with ConvertableToCannotLoadStoreError {
+    override def convertToCannotLoadStoreError(): SparkException = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala:
##########
@@ -435,15 +452,31 @@ class StateStoreFailedToGetChangelogWriter(version: Long, 
e: Throwable)
     messageParameters = Map("version" -> version.toString),
     cause = e)
 
-class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
+class StateStoreKeyRowFormatValidationFailure(errorMsg: String, stateStoreID: 
String)
   extends SparkRuntimeException(
     errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",
-    messageParameters = Map("errorMsg" -> errorMsg))
+    messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> 
stateStoreID))
+  with ConvertableToCannotLoadStoreError {
+    override def convertToCannotLoadStoreError(): SparkException = {
+      new SparkException(
+        errorClass = 
"CANNOT_LOAD_STATE_STORE.KEY_ROW_FORMAT_VALIDATION_FAILURE",
+        messageParameters = Map("errorMsg" -> errorMsg, "stateStoreID" -> 
stateStoreID),

Review Comment:
   instead of duplicating params and error message here. You can just call 
`this.message` which contains the full message from the previous error class, 
and then you can pass it into the new exception object here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to