I would do the foreachpartition on the larger dataframe, assuming each element of the dataframe is meant to be output as a file.
The problem is largely around the startup cost of each .write, the s3 code is pretty slow, by default includes writing to a temporary file first, etc. On 8 Jul 2016 18:29, Andy Davidson <a...@santacruzintegration.com> wrote: Hi Ewan Currently I split my dataframe into n smaller dataframes can call write.().json(“S3://“) Each data frame becomes a single S3 object. I assume for your solution to work I would need to reparation(1) each of the smaller sets so that they are written as a single s3 object. I am also considering using a java executorService and thread pool. Its easy to do. Each thread would call df.write.json(“s3”://); One advantage of this is that I do not need to make any assumptions about how spark is implemented. I assume the thread pool is running on the driver so the slaves do not incur any extra overhead. Thanks Andy From: Ewan Leith <ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>> Date: Friday, July 8, 2016 at 8:52 AM To: Cody Koeninger <c...@koeninger.org<mailto:c...@koeninger.org>>, Andrew Davidson <a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>> Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: RE: is dataframe.write() async? Streaming performance problem Writing (or reading) small files from spark to s3 can be seriously slow. You'll get much higher throughput by doing a df.foreachPartition(partition => ...) and inside each partition, creating an aws s3 client then doing a partition.foreach and uploading the files using that s3 client with its own threadpool. As long as you create the s3 client inside the foreachPartition, and close it after the partition.foreach(...) is done, you shouldn't have any issues. Something roughly like this from the DStream docs: df.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } Hope this helps, Ewan -----Original Message----- From: Cody Koeninger [mailto:c...@koeninger.org] Sent: 08 July 2016 15:31 To: Andy Davidson <a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>> Cc: user @spark <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: is dataframe.write() async? Streaming performance problem Maybe obvious, but what happens when you change the s3 write to a println of all the data? That should identify whether it's the issue. count() and read.json() will involve additional tasks (run through the items in the rdd to count them, likewise to infer the schema) but for 300 records that shouldn't be much of an issue. On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>> wrote: I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka direct stream approach. I am running into performance problems. My processing time is > than my window size. Changing window sizes, adding cores and executor memory does not change performance. I am having a lot of trouble identifying the problem by at the metrics provided for streaming apps in the spark application web UI. I think my performance problem has to with writing the data to S3. My app receives very complicated JSON. My program is simple, It sorts the data into a small set of sets and writes each set as a separate S3 object. The mini batch data has at most 300 events so I do not think shuffle is an issue. DataFrame rawDF = sqlContext.read().json(jsonRDD).cache(); … Explode tagCol … DataFrame rulesDF = activityDF.select(tagCol).distinct(); Row[] rows = rulesDF.select(tagCol).collect(); List<String> tags = new ArrayList<String>(100); for (Row row : rows) { Object tag = row.get(0); tags.add(tag.toString()); } I think the for loop bellow is where the bottle neck is. Is write async() ? If not is there an easy to to vectorize/parallelize this for loop or do I have to create the threads my self? Is creating threads in spark a bad idea? for(String tag : tags) { DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag)); if (saveDF.count() >= 1) { // I do not think count() is an issue performance is about 34 ms String dirPath = “s3n://myBucket" + File.separator + date + File.separator + tag + File.separator + milliSeconds; saveDF.write().json(dirPath); } } Any suggestions would be greatly appreciated Andy --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>