anishshri-db commented on code in PR #50667:
URL: https://github.com/apache/spark/pull/50667#discussion_r2055003563


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -259,26 +260,33 @@ case class TransformWithStateExec(
   private def handleInputRows(keyRow: UnsafeRow, valueRowIter: 
Iterator[InternalRow]):
     Iterator[InternalRow] = {
 
-    val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
-
-    val keyObj = getKeyObj(keyRow)  // convert key to objects
-    val valueObjIter = valueRowIter.map(getValueObj.apply)
-
-    // The statefulProcessor's handleInputRows method may create an eager 
iterator,
-    // and in that case, the implicit key needs to be set now. However, it 
could return
-    // a lazy iterator, in which case the implicit key should be set when the 
actual
-    // methods on the iterator are invoked. This is done with the wrapper class
-    // at the end of this method.
-    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
-    val mappedIterator = statefulProcessor.handleInputRows(
-      keyObj,
-      valueObjIter,
-      new TimerValuesImpl(batchTimestampMs, 
eventTimeWatermarkForEviction)).map { obj =>
-      getOutputRow(obj)
+    try {
+      val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType)
+
+      val keyObj = getKeyObj(keyRow) // convert key to objects
+      val valueObjIter = valueRowIter.map(getValueObj.apply)
+
+      // The statefulProcessor's handleInputRows method may create an eager 
iterator,
+      // and in that case, the implicit key needs to be set now. However, it 
could return
+      // a lazy iterator, in which case the implicit key should be set when 
the actual
+      // methods on the iterator are invoked. This is done with the wrapper 
class
+      // at the end of this method.
+      ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+      val mappedIterator = statefulProcessor.handleInputRows(

Review Comment:
   We need to do this for all functions on stateful processor right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to