Hi TD, Thanks for taking the time to review my question. Answers to your questions: - How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState) In the dashboard I count 200 tasks in the stage containing: Exchange -> WholeStageCodegen -> FlatMapGroupsWithState -> WholeStageCodegen. I understand that flatmapgroupswithstate uses 200 partitions for storing the state by default. Can I control the partitioning within flatmapgroupswithstate? Besides that I guess 200 partitions is far too large for the dataset I’m using in my small test, but I cannot imagine a larger dataset would perform better with this/my setup.
- How long each task is taking? The 75th percentile states 54ms (with a max of 2sec for a single task). Most of the time is ‘Computing Time’ according to the Event Timeline. - How many cores does the cluster have? The cluster is small and has 2 workers, both using 1 core. I’m wondering how Spark determines the number of cores when using Docker (a single host with multiple spark containers). Regards, Chris From: Tathagata Das <tathagata.das1...@gmail.com> Date: Tuesday, 23 January 2018 at 00:04 To: Christiaan Ras <christiaan....@semmelwise.nl> Cc: user <user@spark.apache.org> Subject: Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes long time For computing mapGroupsWithState, can you check the following. - How many tasks are being launched in the reduce stage (that is, the stage after the shuffle, that is computing mapGroupsWithState) - How long each task is taking? - How many cores does the cluster have? On Thu, Jan 18, 2018 at 11:28 PM, chris-sw <christiaan....@semmelwise.nl<mailto:christiaan....@semmelwise.nl>> wrote: Hi, I recently did some experiments with stateful structured streaming by using flatmapgroupswithstate. The streaming application is quit simple: It receives data from Kafka, feed it to the stateful operator (flatmapgroupswithstate) and sinks the output to console. During a test with small datasets (3-5 records per batch) I experienced long batch runs. Taking a look at the log I see an explosion of tasks with these log entries: ----- 2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID 287) 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider - Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir = /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty blocks out of 1 blocks 2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote fetches in 0 ms 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider - Committed version 2 for HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] to file /tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta 2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID 287). 2212 bytes result sent to driver ----- A batch run takes approx. 5 seconds and it seems most of the time it is doing tasks like above. I created several apps using the non-Spark SQL approach with mapWithState but never experienced these long batch runs. Anyone has this experience as well or can help me finding a solution for these long runs. Regards, Chris -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>