HeartSaVioR commented on code in PR #49632: URL: https://github.com/apache/spark/pull/49632#discussion_r1936697121
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala: ########## @@ -312,10 +315,10 @@ trait FlatMapGroupsWithStateExecBase val groupedInitialStateIter = GroupedIterator(initStateIter, initialStateGroupAttrs, initialState.output) - // Create a CoGroupedIterator that will group the two iterators together for every key group. - new CoGroupedIterator( - groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap { - case (keyRow, valueRowIter, initialStateRowIter) => + if (skipEmittingInitialStateKeys) { Review Comment: Just make sure we are on the same page. I guess there is no behavioral difference if the first microbatch has a key K and initial state also has a key K. The behavior is different only when initial state has a key K while the first microbatch does not have a key K, and the user function will be called with "empty iterator". If that's the case, why not just use the same codebase but decide whether to call the user function via emptiness and the flag? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala: ########## @@ -533,9 +564,16 @@ object FlatMapGroupsWithStateExec { outputObjAttr: Attribute, timeoutConf: GroupStateTimeout, hasInitialState: Boolean, + skipEmittingInitialStateKeys: Boolean, initialState: SparkPlan, child: SparkPlan): SparkPlan = { if (hasInitialState) { + // we wont support skipping emitting initial state keys for batch queries Review Comment: I'm not sure I get this. For batch it's even simpler - if value iterator is empty for the key K, we even don't need to handle state iterator at all for the key K. What's the technical challenge with this? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ########## @@ -828,7 +830,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val execPlan = python.FlatMapGroupsInPandasWithStateExec( func, groupAttr, outputAttr, stateType, None, stateVersion, outputMode, timeout, batchTimestampMs = None, eventTimeWatermarkForLateEvents = None, - eventTimeWatermarkForEviction = None, planLater(child) + eventTimeWatermarkForEviction = None, + skipEmittingInitialStateKeys = false, Review Comment: This reminds me we do not support initial state in applyInPandasWithState, but it's OK since we are proposing a new feature which covers this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org