anishshri-db commented on code in PR #50600:
URL: https://github.com/apache/spark/pull/50600#discussion_r2047870718


##########
sql/core/src/main/scala/org/apache/spark/sql/classic/RelationalGroupedDataset.scala:
##########
@@ -479,6 +505,24 @@ class RelationalGroupedDataset protected[sql](
       timeModeStr: String,
       initialState: RelationalGroupedDataset,
       eventTimeColumnName: String): DataFrame = {
+    _transformWithStateInPySpark(
+      func,
+      outputStructType,
+      outputModeStr,
+      timeModeStr,
+      initialState,
+      eventTimeColumnName,
+      TransformWithStateInPySpark.UserFacingDataType.PANDAS)
+  }
+
+  private def _transformWithStateInPySpark(
+      func: Column,
+      outputStructType: StructType,
+      outputModeStr: String,
+      timeModeStr: String,
+      initialState: RelationalGroupedDataset,
+      eventTimeColumnName: String,
+      userFacingDataType: 
TransformWithStateInPySpark.UserFacingDataType.Value): DataFrame = {

Review Comment:
   Should we use a simpler name here maybe ? instead of `userFacingDataType` ?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala:
##########
@@ -178,16 +178,18 @@ case class FlatMapGroupsInPandasWithState(
  * @param outputAttrs used to define the output rows
  * @param outputMode defines the output mode for the statefulProcessor
  * @param timeMode the time mode semantics of the stateful processor for 
timers and TTL.
+ * // FIXME: document `userFacingDataType`

Review Comment:
   Lets add it in this PR itself ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -55,7 +55,11 @@ class StateDataSource extends TableProvider with 
DataSourceRegister with Logging
 
   // Seq of operator names who uses state schema v3 and TWS related options.
   // This Seq was used in checks before reading state schema files.
-  private val twsShortNameSeq = Seq("transformWithStateExec", 
"transformWithStateInPandasExec")
+  private val twsShortNameSeq = Seq(
+    "transformWithStateExec",
+    "transformWithStateInPandasExec",
+    "transformWithStateInPySparkExec"

Review Comment:
   Will this work as it is or we need to do more work to integrate with state 
data source reader ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala:
##########
@@ -494,7 +521,7 @@ object TransformWithStateInPandasExec {
       stateStoreCkptIds = None
     )
 
-    new TransformWithStateInPandasExec(
+    new TransformWithStateInPySparkExec(

Review Comment:
   Are we changing the name of the physical operator also for the current 
pandas tws operator ?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -105,7 +105,7 @@ object UnsupportedOperationChecker extends Logging {
     case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
     case d: DeduplicateWithinWatermark if d.isStreaming => true
     case t: TransformWithState if t.isStreaming => true
-    case t: TransformWithStateInPandas if t.isStreaming => true

Review Comment:
   Don't we need to retain this also ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to