Hi all

I have a question about the Stateful  operations [map/flatmap]GroupsWithState 
in Structured streaming. Issue are as follows:

Take StructuredSessionization case for example, first I input two words like 
apache and spark in batch 0, then input another word Hadoop in batch 1 until 
timeout happens (here the timeout type is ProcessingTimeout). So I can see both 
words apache and spark are outputed since each group state is timedout. But if 
I input the same word apache in batch 1 which already existed in batch 0, the 
result shows only spark is expired.  I deep into this and find the code 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131
  deal with the group state update, it first process new group data and set the 
flag hasTimedout to be false in update func , which result the key already 
timedout to be just update. I know the timeout function call will not occur 
until there is new data to trigger, but  I am wondering why don't we first 
process timeout keys, so we can retrieve the expired data exist in batch 0 in 
user-given function

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, 
remove the state
      ToDO;
      state.remove()

} else if (state.exists) {
}
}


Thanks
Lubo

Reply via email to