Thanks to folks here for the suggestions. I ended up settling on what seems
to be a simple and scalable approach. I am no longer using
sparkContext.textFiles with wildcards (it is too slow when working with a
large number of files). Instead, I have implemented directory traversal as
a Spark job, which enables it to parallelize across the cluster.
First, a couple of functions. One to traverse directories, and another to
get the lines in a file:
def list_file_names(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
def f(path: Path): Seq[String] = {
Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()).
flatMap {
case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath)
case fileStatus ⇒ Seq(fileStatus.getPath.toString)
}
}
f(new Path(path))
}
def read_log_file(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
val file = fs.open(new Path(path))
val source = Source.fromInputStream(file)
source.getLines.toList
}
Next, I generate a list of "root" paths to scan:
val paths =
for {
record_type ← record_types
year ← years
month ← months
day ← days
hour ← hours
} yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/"
}
(In this case, I generate one path per hour per record type.)
Finally, using Spark, I can build an RDD with the contents of every file in
the path list:
val rdd: RDD[String] =
sparkContext.
parallelize(paths, paths.size).
flatMap(list_file_names).
flatMap(read_log_file)
I am posting this info here with the hope that it will be useful to
somebody in the future.
L
On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar <[email protected]>
wrote:
> Hi Landon
>
> I had a problem very similar to your, where we have to process around 5
> million relatively small files on NFS. After trying various options, we did
> something similar to what Matei suggested.
>
> 1) take the original path and find the subdirectories under that path and
> then parallelize the resulting list. you can configure the depth you want
> to
> go down to before sending the paths across the cluster.
>
> def getFileList(srcDir:File, depth:Int) : List[File] = {
> var list : ListBuffer[File] = new ListBuffer[File]()
> if (srcDir.isDirectory()) {
> srcDir.listFiles() .foreach((file: File) =>
> if (file.isFile()) {
> list +=(file)
> } else {
> if (depth > 0 ) {
> list ++= getFileList(file, (depth- 1 ))
> }
> else if (depth < 0) {
> list ++= getFileList(file, (depth))
> }
> else {
> list += file
> }
> })
> }
> else {
> list += srcDir
> }
> list .toList
> }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>
--
*Landon Kuhn*, *Software Architect*, Janrain, Inc. <http://bit.ly/cKKudR>
E: [email protected] | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook <http://bit.ly/9CGHdf> | Twitter
<http://bit.ly/9umxlK> | YouTube <http://bit.ly/N0OiBT> | LinkedIn
<http://bit.ly/a7WZMC> | Blog <http://bit.ly/OI2uOR>
Follow Me: LinkedIn <http://www.linkedin.com/in/landonkuhn>
-------------------------------------------------------------------------------------
*Acquire, understand, and engage your users. Watch our video
<http://bit.ly/janrain-overview> or sign up for a live demo
<http://bit.ly/janraindemo> to see what it's all about.*