ericm-db commented on code in PR #50667: URL: https://github.com/apache/spark/pull/50667#discussion_r2055297035
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -259,48 +260,62 @@ case class TransformWithStateExec( private def handleInputRows(keyRow: UnsafeRow, valueRowIter: Iterator[InternalRow]): Iterator[InternalRow] = { - val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType) - - val keyObj = getKeyObj(keyRow) // convert key to objects - val valueObjIter = valueRowIter.map(getValueObj.apply) - - // The statefulProcessor's handleInputRows method may create an eager iterator, - // and in that case, the implicit key needs to be set now. However, it could return - // a lazy iterator, in which case the implicit key should be set when the actual - // methods on the iterator are invoked. This is done with the wrapper class - // at the end of this method. - ImplicitGroupingKeyTracker.setImplicitKey(keyObj) - val mappedIterator = statefulProcessor.handleInputRows( - keyObj, - valueObjIter, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)).map { obj => - getOutputRow(obj) + try { + val getOutputRow = ObjectOperator.wrapObjectToRow(outputObjectType) + + val keyObj = getKeyObj(keyRow) // convert key to objects + val valueObjIter = valueRowIter.map(getValueObj.apply) + + // The statefulProcessor's handleInputRows method may create an eager iterator, + // and in that case, the implicit key needs to be set now. However, it could return + // a lazy iterator, in which case the implicit key should be set when the actual + // methods on the iterator are invoked. This is done with the wrapper class + // at the end of this method. + ImplicitGroupingKeyTracker.setImplicitKey(keyObj) + val mappedIterator = statefulProcessor.handleInputRows( + keyObj, + valueObjIter, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)).map { obj => + getOutputRow(obj) + } + ImplicitGroupingKeyTracker.removeImplicitKey() + + iteratorWithImplicitKeySet(keyObj, mappedIterator) + } catch { + case sparkThrowable: SparkThrowable => + throw sparkThrowable + case e: Exception => + throw TransformWithStateUserFunctionException(e, "handleInputRows") } - ImplicitGroupingKeyTracker.removeImplicitKey() - - iteratorWithImplicitKeySet(keyObj, mappedIterator) } private def processInitialStateRows( keyRow: UnsafeRow, initStateIter: Iterator[InternalRow]): Unit = { - val getInitStateValueObj = - ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs) - - val keyObj = getKeyObj(keyRow) // convert key to objects - ImplicitGroupingKeyTracker.setImplicitKey(keyObj) - val initStateObjIter = initStateIter.map(getInitStateValueObj.apply) - - initStateObjIter.foreach { initState => - // allow multiple initial state rows on the same grouping key for integration - // with state data source reader with initial state - statefulProcessor - .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]] - .handleInitialState(keyObj, initState, - new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)) + try { + val getInitStateValueObj = + ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs) + + val keyObj = getKeyObj(keyRow) // convert key to objects + ImplicitGroupingKeyTracker.setImplicitKey(keyObj) + val initStateObjIter = initStateIter.map(getInitStateValueObj.apply) + + initStateObjIter.foreach { initState => + // allow multiple initial state rows on the same grouping key for integration + // with state data source reader with initial state + statefulProcessor + .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]] + .handleInitialState(keyObj, initState, + new TimerValuesImpl(batchTimestampMs, eventTimeWatermarkForEviction)) + } + ImplicitGroupingKeyTracker.removeImplicitKey() + } catch { + case sparkThrowable: SparkThrowable => Review Comment: Is the existing code not sufficient? This is how ForeachBatch does it: ``` try { batchWriter(ds, batchId) } catch { // The user code can throw any type of exception. case NonFatal(e) if !e.isInstanceOf[SparkThrowable] => throw ForeachBatchUserFuncException(e) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org