anishshri-db commented on code in PR #50600: URL: https://github.com/apache/spark/pull/50600#discussion_r2047870718
########## sql/core/src/main/scala/org/apache/spark/sql/classic/RelationalGroupedDataset.scala: ########## @@ -479,6 +505,24 @@ class RelationalGroupedDataset protected[sql]( timeModeStr: String, initialState: RelationalGroupedDataset, eventTimeColumnName: String): DataFrame = { + _transformWithStateInPySpark( + func, + outputStructType, + outputModeStr, + timeModeStr, + initialState, + eventTimeColumnName, + TransformWithStateInPySpark.UserFacingDataType.PANDAS) + } + + private def _transformWithStateInPySpark( + func: Column, + outputStructType: StructType, + outputModeStr: String, + timeModeStr: String, + initialState: RelationalGroupedDataset, + eventTimeColumnName: String, + userFacingDataType: TransformWithStateInPySpark.UserFacingDataType.Value): DataFrame = { Review Comment: Should we use a simpler name here maybe ? instead of `userFacingDataType` ? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala: ########## @@ -178,16 +178,18 @@ case class FlatMapGroupsInPandasWithState( * @param outputAttrs used to define the output rows * @param outputMode defines the output mode for the statefulProcessor * @param timeMode the time mode semantics of the stateful processor for timers and TTL. + * // FIXME: document `userFacingDataType` Review Comment: Lets add it in this PR itself ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala: ########## @@ -55,7 +55,11 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging // Seq of operator names who uses state schema v3 and TWS related options. // This Seq was used in checks before reading state schema files. - private val twsShortNameSeq = Seq("transformWithStateExec", "transformWithStateInPandasExec") + private val twsShortNameSeq = Seq( + "transformWithStateExec", + "transformWithStateInPandasExec", + "transformWithStateInPySparkExec" Review Comment: Will this work as it is or we need to do more work to integrate with state data source reader ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala: ########## @@ -494,7 +521,7 @@ object TransformWithStateInPandasExec { stateStoreCkptIds = None ) - new TransformWithStateInPandasExec( + new TransformWithStateInPySparkExec( Review Comment: Are we changing the name of the physical operator also for the current pandas tws operator ? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ########## @@ -105,7 +105,7 @@ object UnsupportedOperationChecker extends Logging { case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => true case d: DeduplicateWithinWatermark if d.isStreaming => true case t: TransformWithState if t.isStreaming => true - case t: TransformWithStateInPandas if t.isStreaming => true Review Comment: Don't we need to retain this also ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org