For computing mapGroupsWithState, can you check the following. - How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState) - How long each task is taking? - How many cores does the cluster have?
On Thu, Jan 18, 2018 at 11:28 PM, chris-sw <christiaan....@semmelwise.nl> wrote: > Hi, > > I recently did some experiments with stateful structured streaming by using > flatmapgroupswithstate. The streaming application is quit simple: It > receives data from Kafka, feed it to the stateful operator > (flatmapgroupswithstate) and sinks the output to console. > During a test with small datasets (3-5 records per batch) I experienced > long > batch runs. > > Taking a look at the log I see an explosion of tasks with these log > entries: > ----- > 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO > org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID > 287) > 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO > org.apache.spark.sql.execution.streaming.state. > HDFSBackedStateStoreProvider > - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir = > /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update > 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO > org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty > blocks out of 1 blocks > 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO > org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote > fetches in 0 ms > 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO > org.apache.spark.sql.execution.streaming.state. > HDFSBackedStateStoreProvider > - Committed version 2 for > HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary- > 8b418cec-0378-4324-a916-6e3864500d56/state/0/85] > to file > /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta > 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO > org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID > 287). 2212 bytes result sent to driver > ----- > > A batch run takes approx. 5 seconds and it seems most of the time it is > doing tasks like above. > I created several apps using the non-Spark SQL approach with mapWithState > but never experienced these long batch runs. > > Anyone has this experience as well or can help me finding a solution for > these long runs. > > Regards, > > Chris > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >