jingz-db commented on code in PR #49488:
URL: https://github.com/apache/spark/pull/49488#discussion_r1947035847


##########
sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala:
##########
@@ -526,6 +553,71 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
     }
   }
 
+  override protected[sql] def transformWithStateHelper[U: Encoder, S: Encoder](
+      statefulProcessor: StatefulProcessor[K, V, U],
+      timeMode: TimeMode,
+      outputMode: OutputMode,
+      initialState: Option[sql.KeyValueGroupedDataset[K, S]] = None,
+      eventTimeColumnName: String = ""): Dataset[U] = {
+    val outputEncoder = agnosticEncoderFor[U]
+    val stateEncoder = agnosticEncoderFor[S]
+
+    val inputEncoders: Seq[AgnosticEncoder[_]] = Seq(kEncoder, stateEncoder, 
ivEncoder)
+    val dummyGroupingFunc = SparkUserDefinedFunction(

Review Comment:
   This dummy UDF is to pass over `AgnosticEncoder` of `K, U, S` to 
`SparkConnectPlanner` in a dummy no-op UDF. I got the idea from here: 
https://github.com/apache/spark/blob/master/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala#L696.
   In spark connect, `KeyValueGroupedDataset` was passed to connect server and 
resolved as `UntypedKeyValueGroupedDataset`. So on spark connect server, we 
will need to get the Scala type from `AgnosticEncoder`. In `GroupMap` type 
defined in proto message 
[here](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1011),
 `AgnosticEncoder` of input key type `K`, output type `U` are wrapped in 
`CommonInlineUserDefinedFunction` object. As TWS is also one of the operator in 
this `GroupMap`, I chose to transmit the encoders in this no-op UDF. On spark 
connect server side, we will only be reading out the encoders from `func`, the 
function in the dummy udf will not be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to