Ok thanks. Actually we ran something very similar this weekend. It
works but is very slow.
The Spark method I included in my original post is about 5-6 times
faster. Just wondering if there is something even faster than that.
I see this as being a recurring problem over the next few months.
*From:*Cheng Lian [mailto:lian.cs....@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files
Probably parquet-tools and the following shell script helps:
root="/path/to/your/data"
for f in `find $root -type f -name "*.parquet"`; do
parquet-schema $f 2&>1 /dev/null
if [ ! -z $? ]; then echo $f; fi
end
This should print out all non-Parquet files under $root. Please refer
to this link to see how to build and install parquet-tools
https://github.com/Parquet/parquet-mr/issues/321
Cheng
On 9/28/15 4:29 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:
Sure. FI would just like to remove ones that fail the basic checks
done by the Parquet readFooters function, in that their length is
wrong or magic number is incorrect, which throws exceptions in the
read method.
Errors like:
java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file
(too small)
and
java.io.IOException: Could not read footer:
java.lang.RuntimeException: data.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [54, -4,
-10, -102]
Backstory: We had a migration from one cluster to another and
thousands of incomplete files were transferred. In addition, they
are still trying to handle the kickouts from their write methods
(they are converting from a proprietary binary format). A lot of
that is captured in the Splunk logs and will improve in the coming
weeks as they continue tuning, but on the reading end I want to
make sure we’re in sync about what needs to be re-converted and
re-transferred.
Thanks,
Jordan
*From:*Cheng Lian [mailto:lian.cs....@gmail.com]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files
Could you please elaborate on what kind of errors are those bad
Parquet files causing? In what ways are they miswritten?
Cheng
On 9/28/15 4:03 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:
Ah, yes, I see that it has been turned off now, that’s why it
wasn’t working. Thank you, this is helpful! The problem now
is to filter out bad (miswritten) Parquet files, as they are
causing this operation to fail.
Any suggestions on detecting them quickly and easily?
*From:*Cheng Lian [mailto:lian.cs....@gmail.com]
*Sent:* Monday, September 28, 2015 5:56 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>; mich...@databricks.com
<mailto:mich...@databricks.com>
*Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many parquet files
Also, you may find more details in the programming guide:
-
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
-
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration
Cheng
On 9/28/15 3:54 PM, Cheng Lian wrote:
I guess you're probably using Spark 1.5? Spark SQL does
support schema merging, but we disabled it by default
since 1.5 because it introduces extra performance costs
(it's turned on by default in 1.4 and 1.3).
You may enable schema merging via either the Parquet data
source specific option "mergeSchema":
sqlContext.read.option("mergeSchema", "true").parquet(path)
or the global SQL option "spark.sql.parquet.mergeSchema":
sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
sqlContext.read.parquet(path)
Cheng
On 9/28/15 3:45 PM, jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com> wrote:
Dear Michael,
Thank you very much for your help.
I should have mentioned in my original email, I did
try the sequence notation. It doesn’t seem to have
the desired effect. Maybe I should say that each one
of these files has a different schema. When I use
that call, I’m not ending up with a data frame with
columns from all of the files taken together, but just
one of them. I’m tracing through the code trying to
understand exactly what is happening with the
Seq[String] call. Maybe you know? Is it trying to do
some kind of schema merging?
Also, it seems that even if I could get it to work, it
would require some parsing of the resulting schemas to
find the invalid files. I would like to capture these
errors on read.
The parquet files currently average about 60 MB in
size, with min about 40 MB and max about 500 or so. I
could coalesce, but they do correspond to logical
entities and there are a number of use-case specific
reasons to keep them separate.
Thanks,
Jordan
*From:*Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Monday, September 28, 2015 4:02 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>
<mailto:jordan.tho...@accenture.com>
*Cc:* user <user@spark.apache.org>
<mailto:user@spark.apache.org>
*Subject:* Re: Performance when iterating over many
parquet files
Another note: for best performance you are going to
want your parquet files to be pretty big (100s of
mb). You could coalesce them and write them out for
more efficient repeat querying.
On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
<mich...@databricks.com
<mailto:mich...@databricks.com>> wrote:
sqlContext.read.parquet
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
takes lists of files.
val fileList =
sc.textFile("file_list.txt").collect() // this
works but using spark is possibly overkill
val dataFrame = sqlContext.read.parquet(fileList: _*)
On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
<jordan.tho...@accenture.com
<mailto:jordan.tho...@accenture.com>> wrote:
We are working with use cases where we need to
do batch processing on a large
number (hundreds of thousands) of Parquet
files. The processing is quite
similar per file. There are a many aggregates
that are very SQL-friendly
(computing averages, maxima, minima,
aggregations on single columns with
some selection criteria). There are also some
processing that is more
advanced time-series processing (continuous
wavelet transforms and the
like). This all seems like a good use case
for Spark.
But I'm having performance problems. Let's
take a look at something very
simple, which simply checks whether the
parquet files are readable.
Code that seems natural but doesn't work:
import scala.util.{Try, Success, Failure} val
parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
=> x._1)
My understanding is that this doesn't work
because sqlContext can't be used
inside of a transformation like "map" (or
inside an action). That it only
makes sense in the driver. Thus, it becomes a
null reference in the above
code, so all reads fail.
Code that works:
import scala.util.{Try, Success, Failure} val
parquetFiles =
sc.textFile("file_list.txt") val successes =
parquetFiles.collect().map(x =>
(x,
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
=> x._1)
This works because the collect() means that
everything happens back on the
driver. So the sqlContext object makes sense
and everything works fine.
But it is slow. I'm using yarn-client mode on
a 6-node cluster with 17
executors, 40 GB ram on driver, 19GB on
executors. And it takes about 1
minute to execute for 100 parquet files.
Which is too long. Recall we need
to do this across hundreds of thousands of files.
I realize it is possible to parallelize after
the read:
import scala.util.{Try, Success, Failure} val
parquetFiles =
sc.textFile("file_list.txt") val
intermediate_successes =
parquetFiles.collect().map(x => (x,
Try(sqlContext.read.parquet(x))))
val dist_successes = sc.parallelize(successes)
val successes =
dist_successes.filter(_._2.isSuccess).map(x =>
x._1)
But this does not improve performance
substantially. It seems the
bottleneck is that the reads are happening
sequentially.
Is there a better way to do this?
Thanks,
Jordan
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
Sent from the Apache Spark User List mailing
list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail:
user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>
------------------------------------------------------------------------
This message is for the designated recipient only and
may contain privileged, proprietary, or otherwise
confidential information. If you have received it in
error, please notify the sender immediately and delete
the original. Any other use of the e-mail by you is
prohibited. Where allowed by local law, electronic
communications with Accenture and its affiliates,
including e-mail and instant messaging (including
content), may be scanned by our systems for the
purposes of information security and assessment of
internal compliance with Accenture policy.
______________________________________________________________________________________
www.accenture.com <http://www.accenture.com>