From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Thursday, July 27, 2017 3:08 AM
To: Zhang, Lubo <lubo.zh...@intel.com>
Cc: dev@spark.apache.org
Subject: Re: Questions about Stateful Operations in SS

Hello Lubo,

The idea of timeouts is to make a best-effort and last-resort effort to process 
a key, when it has not received data for a while. With processing time timeout 
is 1 minute, the system guarantees that it will not timeout unless at least 1 
minute has passed. Defining a precise timing on when the timeout is triggered, 
is really hard for many reasons (distributed system, lack of precise 
clock-synch, need for deterministic re-executions for fault-tolerance, etc.). 
We made a design decision to process timed out data after processing the data 
in a batch, but that choice should not affect your business logic if your logic 
is constructed the right way. So your business logic should set loosely defined 
timeout durations, and not depend on the exactly timing of when the timeouts 
are hit.

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo 
<lubo.zh...@intel.com<mailto:lubo.zh...@intel.com>> wrote:
Hi all

I have a question about the Stateful  operations [map/flatmap]GroupsWithState 
in Structured streaming. Issue are as follows:

Take StructuredSessionization case for example, first I input two words like 
apache and spark in batch 0, then input another word Hadoop in batch 1 until 
timeout happens (here the timeout type is ProcessingTimeout). So I can see both 
words apache and spark are outputed since each group state is timedout. But if 
I input the same word apache in batch 1 which already existed in batch 0, the 
result shows only spark is expired.  I deep into this and find the code 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131
  deal with the group state update, it first process new group data and set the 
flag hasTimedout to be false in update func , which result the key already 
timedout to be just update. I know the timeout function call will not occur 
until there is new data to trigger, but  I am wondering why don’t we first 
process timeout keys, so we can retrieve the expired data exist in batch 0 in 
user-given function

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, 
remove the state
      ToDO;
      state.remove()

} else if (state.exists) {
}
}


Thanks
Lubo


Reply via email to