jingz-db commented on code in PR #49488: URL: https://github.com/apache/spark/pull/49488#discussion_r1947035847
########## sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala: ########## @@ -526,6 +553,71 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV]( } } + override protected[sql] def transformWithStateHelper[U: Encoder, S: Encoder]( + statefulProcessor: StatefulProcessor[K, V, U], + timeMode: TimeMode, + outputMode: OutputMode, + initialState: Option[sql.KeyValueGroupedDataset[K, S]] = None, + eventTimeColumnName: String = ""): Dataset[U] = { + val outputEncoder = agnosticEncoderFor[U] + val stateEncoder = agnosticEncoderFor[S] + + val inputEncoders: Seq[AgnosticEncoder[_]] = Seq(kEncoder, stateEncoder, ivEncoder) + val dummyGroupingFunc = SparkUserDefinedFunction( Review Comment: This dummy UDF is to pass over `AgnosticEncoder` of type info for `K, U, S` to `SparkConnectPlanner` in a dummy no-op UDF. I got the idea from here: https://github.com/apache/spark/blob/master/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/KeyValueGroupedDataset.scala#L696. In spark connect, `KeyValueGroupedDataset` was passed to connect server as proto, and resolved as `UntypedKeyValueGroupedDataset` in `SparkConnectPlanner` (because proto is "untyped"). So on spark connect server, we will need to get the Scala type info from `AgnosticEncoder`. In `GroupMap` type defined in proto message [here](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1011), `AgnosticEncoder` of input key type `K`, output type `U` are wrapped in `CommonInlineUserDefinedFunction` object. As TWS is also one of the operator in this `GroupMap`, I chose to transmit the encoders in this no-op UDF. On spark connect server side, we will only be reading out the encoders from `func`, the function in the dummy udf will not be used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org