I would do the foreachpartition on the larger dataframe, assuming each element 
of the dataframe is meant to be output as a file.

The problem is largely around the startup cost of each .write, the s3 code is 
pretty slow, by default includes writing to a temporary file first, etc.

On 8 Jul 2016 18:29, Andy Davidson <a...@santacruzintegration.com> wrote:
Hi Ewan

Currently I split my dataframe into n smaller dataframes can call 
write.().json(“S3://“)

Each data frame becomes a single S3 object.

I assume for your solution to work I would need to reparation(1) each of the 
smaller sets so that they are written as a single s3 object.


I am also considering using a java executorService and thread pool. Its easy to 
do. Each thread would call df.write.json(“s3”://); One advantage of this is 
that I do not need to make any assumptions about how spark is implemented.

I assume the thread pool is running on the driver so the slaves do not incur 
any extra overhead.

Thanks

Andy

From: Ewan Leith <ewan.le...@realitymine.com<mailto:ewan.le...@realitymine.com>>
Date: Friday, July 8, 2016 at 8:52 AM
To: Cody Koeninger <c...@koeninger.org<mailto:c...@koeninger.org>>, Andrew 
Davidson <a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>>
Cc: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: RE: is dataframe.write() async? Streaming performance problem

Writing (or reading) small files from spark to s3 can be seriously slow.

You'll get much higher throughput by doing a df.foreachPartition(partition => 
...) and inside each partition, creating an aws s3 client then doing a 
partition.foreach and uploading the files using that s3 client with its own 
threadpool.

As long as you create the s3 client inside the foreachPartition, and close it 
after the partition.foreach(...) is done, you shouldn't have any issues.

Something roughly like this from the DStream docs:

  df.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }

Hope this helps,
Ewan

-----Original Message-----
From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: 08 July 2016 15:31
To: Andy Davidson 
<a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>>
Cc: user @spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: is dataframe.write() async? Streaming performance problem

Maybe obvious, but what happens when you change the s3 write to a println of 
all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the items in 
the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson 
<a...@santacruzintegration.com<mailto:a...@santacruzintegration.com>> wrote:
I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using
kafka direct stream approach. I am running into performance problems.
My processing time is > than my window size. Changing window sizes,
adding cores and executor memory does not change performance. I am
having a lot of trouble identifying the problem by at the metrics
provided for streaming apps in the spark application web UI.

I think my performance problem has to with writing the data to S3.

My app receives very complicated JSON. My program is simple, It sorts
the data into a small set of sets and writes each set as a separate S3 object.
The mini batch data has at most 300 events so I do not think shuffle
is an issue.

DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();

… Explode tagCol …


DataFrame rulesDF = activityDF.select(tagCol).distinct();

Row[] rows = rulesDF.select(tagCol).collect();

List<String> tags = new ArrayList<String>(100);

for (Row row : rows) {

Object tag = row.get(0);

tags.add(tag.toString());

}


I think the for loop bellow is where the bottle neck is. Is write async() ?


If not is there an easy to to vectorize/parallelize this for loop or
do I have to create the threads my self?


Is creating threads in spark a bad idea?



for(String tag : tags) {

DataFrame saveDF =
activityDF.filter(activityDF.col(tagCol).equalTo(tag));

if (saveDF.count() >= 1) { // I do not think count() is an issue
performance is about 34 ms

String dirPath = “s3n://myBucket" + File.separator + date +
File.separator + tag + File.separator +  milliSeconds;

saveDF.write().json(dirPath);

}

}


Any suggestions would be greatly appreciated


Andy



---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


---------------------------------------------------------------------
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Reply via email to