HeartSaVioR commented on code in PR #49632:
URL: https://github.com/apache/spark/pull/49632#discussion_r1936697121


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -312,10 +315,10 @@ trait FlatMapGroupsWithStateExecBase
       val groupedInitialStateIter =
         GroupedIterator(initStateIter, initialStateGroupAttrs, 
initialState.output)
 
-      // Create a CoGroupedIterator that will group the two iterators together 
for every key group.
-      new CoGroupedIterator(
-          groupedChildDataIter, groupedInitialStateIter, 
groupingAttributes).flatMap {
-        case (keyRow, valueRowIter, initialStateRowIter) =>
+      if (skipEmittingInitialStateKeys) {

Review Comment:
   Just make sure we are on the same page.
   
   I guess there is no behavioral difference if the first microbatch has a key 
K and initial state also has a key K. The behavior is different only when 
initial state has a key K while the first microbatch does not have a key K, and 
the user function will be called with "empty iterator".
   
   If that's the case, why not just use the same codebase but decide whether to 
call the user function via emptiness and the flag?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -533,9 +564,16 @@ object FlatMapGroupsWithStateExec {
       outputObjAttr: Attribute,
       timeoutConf: GroupStateTimeout,
       hasInitialState: Boolean,
+      skipEmittingInitialStateKeys: Boolean,
       initialState: SparkPlan,
       child: SparkPlan): SparkPlan = {
     if (hasInitialState) {
+      // we wont support skipping emitting initial state keys for batch queries

Review Comment:
   I'm not sure I get this. For batch it's even simpler - if value iterator is 
empty for the key K, we even don't need to handle state iterator at all for the 
key K. What's the technical challenge with this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -828,7 +830,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         val execPlan = python.FlatMapGroupsInPandasWithStateExec(
           func, groupAttr, outputAttr, stateType, None, stateVersion, 
outputMode, timeout,
           batchTimestampMs = None, eventTimeWatermarkForLateEvents = None,
-          eventTimeWatermarkForEviction = None, planLater(child)
+          eventTimeWatermarkForEviction = None,
+          skipEmittingInitialStateKeys = false,

Review Comment:
   This reminds me we do not support initial state in applyInPandasWithState, 
but it's OK since we are proposing a new feature which covers this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to