jingz-db commented on code in PR #49488: URL: https://github.com/apache/spark/pull/49488#discussion_r1949943757
########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -1031,6 +1033,30 @@ message GroupMap { // (Optional) The schema for the grouped state. optional DataType state_schema = 10; + + // Below fields are only used by TransformWithState + // (Optional) TransformWithState related parameters. + optional TransformWithStateInfo transformWithStateInfo = 11; +} + +// Event time rule needed for chaining of operator of TransformWithState +message UpdateEventTimeWatermarkColumn { + // (Required) Input relation. + Relation input = 1; + + // (Required) Event time column name. + string event_time_col_name = 3; +} + +message TransformWithStateInfo { + // (Required) Bytes for java serialized user-defined stateful processor. Review Comment: > Please try to keep the protocol as language agnostic as possible. Is it possible to get a more generic operator here? I am actually following another streaming operator connect style - [flatmapGroupsWithState](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1023) and [ApplyInPandasWithState](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1059) seems to be in separate protocol. Scala operator is one of the `GroupMap` operator while Python operator is not. So I feel it might be better to have it in separate protocol. Additionally not all the fields in `TransformWithStateInfo` are used by Python. FYI here is the python PR I have in review and it's protocol change: https://github.com/apache/spark/pull/49560/files#diff-77948931f0ba54d1562b74e2f949e1c339a3b5dcd88fd7f7a4411eceb6f23341R1083 ########## sql/connect/common/src/main/protobuf/spark/connect/relations.proto: ########## @@ -1031,6 +1033,30 @@ message GroupMap { // (Optional) The schema for the grouped state. optional DataType state_schema = 10; + + // Below fields are only used by TransformWithState + // (Optional) TransformWithState related parameters. + optional TransformWithStateInfo transformWithStateInfo = 11; +} + +// Event time rule needed for chaining of operator of TransformWithState +message UpdateEventTimeWatermarkColumn { + // (Required) Input relation. + Relation input = 1; + + // (Required) Event time column name. + string event_time_col_name = 3; +} + +message TransformWithStateInfo { + // (Required) Bytes for java serialized user-defined stateful processor. Review Comment: > Please try to keep the protocol as language agnostic as possible. Is it possible to get a more generic operator here? I am actually following another streaming operator connect style - [flatmapGroupsWithState](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1023) and [ApplyInPandasWithState](https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/relations.proto#L1059) seems to be in separate protocol. Scala operator is one of the `GroupMap` operator while Python operator is not. So I feel it might be better to have it in separate protocol. Additionally not all the fields in `TransformWithStateInfo` are used by Python. FYI here is the python PR I have in review and it's proposed protocol change: https://github.com/apache/spark/pull/49560/files#diff-77948931f0ba54d1562b74e2f949e1c339a3b5dcd88fd7f7a4411eceb6f23341R1083 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org