anishshri-db commented on code in PR #50667: URL: https://github.com/apache/spark/pull/50667#discussion_r2055296936
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -259,48 +260,62 @@ case class TransformWithStateExec( private def handleInputRows(keyRow: UnsafeRow, valueRowIter: Iterator[InternalRow]): Iterator[InternalRow] = { - val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType) - - val keyObj = getKeyObj(keyRow) // convert key to objects - val valueObjIter = valueRowIter.map(getValueObj.apply) - - // The statefulProcessor's handleInputRows method may create an eager iterator, - // and in that case, the implicit key needs to be set now. However, it could return - // a lazy iterator, in which case the implicit key should be set when the actual - // methods on the iterator are invoked. This is done with the wrapper class - // at the end of this method. - ImplicitGroupingKeyTracker.setImplicitKey(keyObj) - val mappedIterator = statefulProcessor.handleInputRows( - keyObj, - valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)).map { obj => - getOutputRow(obj) + try { + val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType) + + val keyObj = getKeyObj(keyRow) // convert key to objects + val valueObjIter = valueRowIter.map(getValueObj.apply) + + // The statefulProcessor's handleInputRows method may create an eager iterator, + // and in that case, the implicit key needs to be set now. However, it could return + // a lazy iterator, in which case the implicit key should be set when the actual + // methods on the iterator are invoked. This is done with the wrapper class + // at the end of this method. + ImplicitGroupingKeyTracker.setImplicitKey(keyObj) + val mappedIterator = statefulProcessor.handleInputRows( Review Comment: Do we need to cover `init` too ? also - will this cover all the paths on the driver as well ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org