anishshri-db commented on code in PR #50667:
URL: https://github.com/apache/spark/pull/50667#discussion_r2055295233


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -259,48 +260,62 @@ case class TransformWithStateExec(
   private def handleInputRows(keyRow: UnsafeRow, valueRowIter: 
Iterator[InternalRow]):
     Iterator[InternalRow] = {
 
-    val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
-
-    val keyObj = getKeyObj(keyRow)  // convert key to objects
-    val valueObjIter = valueRowIter.map(getValueObj.apply)
-
-    // The statefulProcessor's handleInputRows method may create an eager 
iterator,
-    // and in that case, the implicit key needs to be set now. However, it 
could return
-    // a lazy iterator, in which case the implicit key should be set when the 
actual
-    // methods on the iterator are invoked. This is done with the wrapper class
-    // at the end of this method.
-    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
-    val mappedIterator = statefulProcessor.handleInputRows(
-      keyObj,
-      valueObjIter,
-      new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForEviction)).map { obj =>
-      getOutputRow(obj)
+    try {
+      val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
+
+      val keyObj = getKeyObj(keyRow) // convert key to objects
+      val valueObjIter = valueRowIter.map(getValueObj.apply)
+
+      // The statefulProcessor's handleInputRows method may create an eager 
iterator,
+      // and in that case, the implicit key needs to be set now. However, it 
could return
+      // a lazy iterator, in which case the implicit key should be set when 
the actual
+      // methods on the iterator are invoked. This is done with the wrapper 
class
+      // at the end of this method.
+      ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+      val mappedIterator = statefulProcessor.handleInputRows(
+        keyObj,
+        valueObjIter,
+        new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForEviction)).map { obj =>
+        getOutputRow(obj)
+      }
+      ImplicitGroupingKeyTracker.removeImplicitKey()
+
+      iteratorWithImplicitKeySet(keyObj, mappedIterator)
+    } catch {
+      case sparkThrowable: SparkThrowable =>
+        throw sparkThrowable
+      case e: Exception =>
+        throw TransformWithStateUserFunctionException(e, "handleInputRows")
     }
-    ImplicitGroupingKeyTracker.removeImplicitKey()
-
-    iteratorWithImplicitKeySet(keyObj, mappedIterator)
   }
 
   private def processInitialStateRows(
       keyRow: UnsafeRow,
       initStateIter: Iterator[InternalRow]): Unit = {
 
-    val getInitStateValueObj =
-      ObjectOperator.deserializeRowToObject(initialStateDeserializer, 
initialStateDataAttrs)
-
-    val keyObj = getKeyObj(keyRow) // convert key to objects
-    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
-    val initStateObjIter = initStateIter.map(getInitStateValueObj.apply)
-
-    initStateObjIter.foreach { initState =>
-      // allow multiple initial state rows on the same grouping key for 
integration
-      // with state data source reader with initial state
-      statefulProcessor
-        .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
-        .handleInitialState(keyObj, initState,
-          new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction))
+    try {
+      val getInitStateValueObj =
+        ObjectOperator.deserializeRowToObject(initialStateDeserializer, 
initialStateDataAttrs)
+
+      val keyObj = getKeyObj(keyRow) // convert key to objects
+      ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+      val initStateObjIter = initStateIter.map(getInitStateValueObj.apply)
+
+      initStateObjIter.foreach { initState =>
+        // allow multiple initial state rows on the same grouping key for 
integration
+        // with state data source reader with initial state
+        statefulProcessor
+          .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+          .handleInitialState(keyObj, initState,
+            new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForEviction))
+      }
+      ImplicitGroupingKeyTracker.removeImplicitKey()
+    } catch {
+      case sparkThrowable: SparkThrowable =>

Review Comment:
   We need to check if the underlying exception has the error class already 
defined ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to