Thanks!
Let me update the status.
I have copied the DirectOutputCommitter to my local. And set:
Conf.set("spark.hadoop.mapred.output.committer.class",
"org.****.DirectOutputCommitter")
It works perfectly.
Thanks everyone J
Regards,
Shuai
From: Aaron Davidson [mailto:[email protected]]
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; [email protected]
Subject: Re: Spark will process _temporary folder on S3 is very slow and always
cause failure
Actually, this is the more relevant JIRA (which is resolved):
https://issues.apache.org/jira/browse/SPARK-3595
6352 is about saveAsParquetFile, which is not in use here.
Here is a DirectOutputCommitter implementation:
https://gist.github.com/aarondav/c513916e72101bbe14ec
and it can be configured in Spark with:
sparkConf.set("spark.hadoop.mapred.output.committer.class",
classOf[DirectOutputCommitter].getName)
On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid <[email protected]> wrote:
I'm not super familiar w/ S3, but I think the issue is that you want to use a
different output committers with "object" stores, that don't have a simple move
operation. There have been a few other threads on S3 & outputcommitters. I
think the most relevant for you is most probably this open JIRA:
https://issues.apache.org/jira/browse/SPARK-6352
On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng <[email protected]> wrote:
Hi All,
I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as
a single node cluster for test. The data I use to sort is around 4GB and sit on
S3, output will also on S3.
I just connect spark-shell to the local cluster and run the code in the script
(because I just want a benchmark now).
My job is as simple as:
val parquetFile =
sqlContext.parquetFile("s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,")
parquetFile.registerTempTable("Test")
val sortedResult = sqlContext.sql("SELECT * FROM Test order by time").map { row
=> { row.mkString("\t") } }
sortedResult.saveAsTextFile("s3n://myplace,");
The job takes around 6 mins to finish the sort when I am monitoring the
process. After I notice the process stop at:
15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
<console>:31, took 581.304992 s
At that time, the spark actually just write all the data to the _temporary
folder first, after all sub-tasks finished, it will try to move all the ready
result from _temporary folder to the final location. This process might be
quick locally (because it will just be a cut/paste), but it looks like very
slow on my S3, it takes a few second to move one file (usually there will be
200 partitions). And then it raise exceptions after it move might be 40-50
files.
org.apache.http.NoHttpResponseException: The target server failed to respond
at
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)
at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)
at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)
at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)
at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)
I try several times, but never get the full job finished. I am not sure
anything wrong here, but I use something very basic and I can see the job has
finished and all result on the S3 under temporary folder, but then it raise the
exception and fail.
Any special setting I should do here when deal with S3?
I don’t know what is the issue here, I never see MapReduce has similar issue.
So it could not be S3’s problem.
Regards,
Shuai