Hi TD,

Thanks for taking the time to review my question.
Answers to your questions:
- How many tasks are being launched in the reduce stage (that is, the stage 
after the shuffle, that is computing mapGroupsWithState)
In the dashboard I count 200 tasks in the stage containing: Exchange -> 
WholeStageCodegen -> FlatMapGroupsWithState -> WholeStageCodegen.
I understand that flatmapgroupswithstate uses 200 partitions for storing the 
state by default. Can I control the partitioning within flatmapgroupswithstate?
Besides that I guess 200 partitions is far too large for the dataset I’m using 
in my small test, but I cannot imagine a larger dataset would perform better 
with this/my setup.

- How long each task is taking?
The 75th percentile states 54ms (with a max of 2sec for a single task). Most of 
the time is ‘Computing Time’ according to the Event Timeline.

- How many cores does the cluster have?
The cluster is small and has 2 workers, both using 1 core. I’m wondering how 
Spark determines the number of cores when using Docker (a single host with 
multiple spark containers).

Regards,
Chris


From: Tathagata Das <tathagata.das1...@gmail.com>
Date: Tuesday, 23 January 2018 at 00:04
To: Christiaan Ras <christiaan....@semmelwise.nl>
Cc: user <user@spark.apache.org>
Subject: Re: [Spark structured streaming] Use of (flat)mapgroupswithstate takes 
long time

For computing mapGroupsWithState, can you check the following.
- How many tasks are being launched in the reduce stage (that is, the stage 
after the shuffle, that is computing mapGroupsWithState)
- How long each task is taking?
- How many cores does the cluster have?


On Thu, Jan 18, 2018 at 11:28 PM, chris-sw 
<christiaan....@semmelwise.nl<mailto:christiaan....@semmelwise.nl>> wrote:
Hi,

I recently did some experiments with stateful structured streaming by using
flatmapgroupswithstate. The streaming application is quit simple: It
receives data from Kafka, feed it to the stateful operator
(flatmapgroupswithstate) and sinks the output to console.
During a test with small datasets (3-5 records per batch) I experienced long
batch runs.

Taking a look at the log I see an explosion of tasks with these log entries:
-----
2018-01-18 13:33:46,668 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Running task 85.0 in stage 3.0 (TID
287)
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Retrieved version 1 of HDFSStateStoreProvider[id = (op=0, part=85), dir =
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85] for update
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 0 non-empty
blocks out of 1 blocks
2018-01-18 13:33:46,672 [Executor task launch worker for task 287] INFO
org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 0 remote
fetches in 0 ms
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
- Committed version 2 for
HDFSStateStore[id=(op=0,part=85),dir=/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85]
to file
/tmp/temporary-8b418cec-0378-4324-a916-6e3864500d56/state/0/85/2.delta
2018-01-18 13:33:46,691 [Executor task launch worker for task 287] INFO
org.apache.spark.executor.Executor - Finished task 85.0 in stage 3.0 (TID
287). 2212 bytes result sent to driver
-----

A batch run takes approx. 5 seconds and it seems most of the time it is
doing tasks like above.
I created several apps using the non-Spark SQL approach with mapWithState
but never experienced these long batch runs.

Anyone has this experience as well or can help me finding a solution for
these long runs.

Regards,

Chris



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>

Reply via email to