Hi all I have a question about the Stateful operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:
Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired. I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131 deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but I am wondering why don't we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = { if (state.hasTimedOut) { // If called when timing out, remove the state ToDO; state.remove() } else if (state.exists) { } } Thanks Lubo