Ah most interesting—thanks.
So it seems sc.textFile(longFileList) has to read all metadata before starting
the read for partitioning purposes so what you do is not use it?
You create a task per file that reads one file (in parallel) per task without
scanning for _all_ metadata. Can’t argue with the logic but perhaps Spark
should incorporate something like this in sc.textFile? My case can’t be that
unusual especially since I am periodically processing micro-batches from Spark
Streaming. In fact Actually I have to scan HDFS to create the longFileList to
begin with so get file status and therefore probably all the metadata needed by
sc.textFile. Your method would save one scan, which is good.
Might a better sc.textFile take a beginning URI, a file pattern regex, and a
recursive flag? Then one scan could create all metadata automatically for a
large subset of people using the function, something like
sc.textFile(beginDir: String, filePattern: String = “^part.*”, recursive:
Boolean = false)
I fact it should be easy to create BetterSC that overrides the textFile method
with a re-implementation that only requires one scan to get metadata.
Just thinking on email…
On Mar 14, 2015, at 11:11 AM, Michael Armbrust <[email protected]> wrote:
Here is how I have dealt with many small text files (on s3 though this should
generalize) in the past:
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E
<http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E>
From Michael Armbrust <[email protected]
<mailto:[email protected]>>
Subject Re: S3NativeFileSystem inefficient implementation when calling
sc.textFile
Date Thu, 27 Nov 2014 03:20:14 GMT
In the past I have worked around this problem by avoiding sc.textFile().
Instead I read the data directly inside of a Spark job. Basically, you
start with an RDD where each entry is a file in S3 and then flatMap that
with something that reads the files and returns the lines.
Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe
<https://gist.github.com/marmbrus/fff0b058f134fa7752fe>
Using this class you can do something like:
sc.parallelize("s3n://mybucket/file1" :: "s3n://mybucket/file1" ... ::
Nil).flatMap(new ReadLinesSafe(_))
You can also build up the list of files by running a Spark job:
https://gist.github.com/marmbrus/15e72f7bc22337cf6653
<https://gist.github.com/marmbrus/15e72f7bc22337cf6653>
Michael
On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel <[email protected]
<mailto:[email protected]>> wrote:
It’s a long story but there are many dirs with smallish part-xxxx files in them
so we create a list of the individual files as input to
sparkContext.textFile(fileList). I suppose we could move them and rename them
to be contiguous part-xxxx files in one dir. Would that be better than passing
in a long list of individual filenames? We could also make the part files much
larger by collecting the smaller ones. But would any of this make a difference
in IO speed?
I ask because using the long file list seems to read, what amounts to a not
very large data set rather slowly. If it were all in large part files in one
dir I’d expect it to go much faster but this is just intuition.
On Mar 14, 2015, at 9:58 AM, Koert Kuipers <[email protected]
<mailto:[email protected]>> wrote:
why can you not put them in a directory and read them as one input? you will
get a task per file, but spark is very fast at executing many tasks (its not a
jvm per task).
On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel <[email protected]
<mailto:[email protected]>> wrote:
Any advice on dealing with a large number of separate input files?
On Mar 13, 2015, at 4:06 PM, Pat Ferrel <[email protected]
<mailto:[email protected]>> wrote:
We have many text files that we need to read in parallel. We can create a comma
delimited list of files to pass in to sparkContext.textFile(fileList). The list
can get very large (maybe 10000) and is all on hdfs.
The question is: what is the most performant way to read them? Should they be
broken up and read in groups appending the resulting RDDs or should we just
pass in the entire list at once? In effect I’m asking if Spark does some
optimization of whether we should do it explicitly. If the later, what rule
might we use depending on our cluster setup?
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
<mailto:[email protected]>
For additional commands, e-mail: [email protected]
<mailto:[email protected]>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
<mailto:[email protected]>
For additional commands, e-mail: [email protected]
<mailto:[email protected]>