Oh I see, then probably this one, basically the parallel Spark version of my last script, using ParquetFileReader:

import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.format.converter.ParquetMetadataConverter

val badFiles = sc.parallelize(paths).mapPartitions { iterator =>
  val conf = new Configuration()
  iterator.filter { path =>
    Try(ParquetFileReader.readFooter(
      conf, path, ParquetMetadataConverter.SKIP_ROW_GROUPS)).isFailure
  }
}.collect()


Cheng

On 9/28/15 4:48 PM, jordan.tho...@accenture.com wrote:

Ok thanks. Actually we ran something very similar this weekend. It works but is very slow.

The Spark method I included in my original post is about 5-6 times faster. Just wondering if there is something even faster than that. I see this as being a recurring problem over the next few months.

*From:*Cheng Lian [mailto:lian.cs....@gmail.com]
*Sent:* Monday, September 28, 2015 6:46 PM
*To:* Thomas, Jordan <jordan.tho...@accenture.com>; mich...@databricks.com
*Cc:* user@spark.apache.org
*Subject:* Re: Performance when iterating over many parquet files

Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer to this link to see how to build and install parquet-tools https://github.com/Parquet/parquet-mr/issues/321

Cheng

On 9/28/15 4:29 PM, jordan.tho...@accenture.com <mailto:jordan.tho...@accenture.com> wrote:

    Sure. FI would just like to remove ones that fail the basic checks
    done by the Parquet readFooters function, in that their length is
    wrong or magic number is incorrect, which throws exceptions in the
    read method.

    Errors like:

    java.io.IOException: Could not read footer:
    java.lang.RuntimeException: data.parquet is not a Parquet file
    (too small)

    and

    java.io.IOException: Could not read footer:
    java.lang.RuntimeException: data.parquet is not a Parquet file.
    expected magic number at tail [80, 65, 82, 49] but found [54, -4,
    -10, -102]

    Backstory: We had a migration from one cluster to another and
    thousands of incomplete files were transferred.  In addition, they
    are still trying to handle the kickouts from their write methods
    (they are converting from a proprietary binary format).  A lot of
    that is captured in the Splunk logs and will improve in the coming
    weeks as they continue tuning, but on the reading end I want to
    make sure we’re in sync about what needs to be re-converted and
    re-transferred.

    Thanks,

    Jordan

    *From:*Cheng Lian [mailto:lian.cs....@gmail.com]
    *Sent:* Monday, September 28, 2015 6:15 PM
    *To:* Thomas, Jordan <jordan.tho...@accenture.com>
    <mailto:jordan.tho...@accenture.com>; mich...@databricks.com
    <mailto:mich...@databricks.com>
    *Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
    *Subject:* Re: Performance when iterating over many parquet files

    Could you please elaborate on what kind of errors are those bad
    Parquet files causing? In what ways are they miswritten?

    Cheng

    On 9/28/15 4:03 PM, jordan.tho...@accenture.com
    <mailto:jordan.tho...@accenture.com> wrote:

        Ah, yes, I see that it has been turned off now, that’s why it
        wasn’t working.  Thank you, this is helpful!  The problem now
        is to filter out bad (miswritten) Parquet files, as they are
        causing this operation to fail.

        Any suggestions on detecting them quickly and easily?

        *From:*Cheng Lian [mailto:lian.cs....@gmail.com]
        *Sent:* Monday, September 28, 2015 5:56 PM
        *To:* Thomas, Jordan <jordan.tho...@accenture.com>
        <mailto:jordan.tho...@accenture.com>; mich...@databricks.com
        <mailto:mich...@databricks.com>
        *Cc:* user@spark.apache.org <mailto:user@spark.apache.org>
        *Subject:* Re: Performance when iterating over many parquet files

        Also, you may find more details in the programming guide:

        -
        
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
        -
        
http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

        Cheng

        On 9/28/15 3:54 PM, Cheng Lian wrote:

            I guess you're probably using Spark 1.5? Spark SQL does
            support schema merging, but we disabled it by default
            since 1.5 because it introduces extra performance costs
            (it's turned on by default in 1.4 and 1.3).

            You may enable schema merging via either the Parquet data
            source specific option "mergeSchema":

              sqlContext.read.option("mergeSchema", "true").parquet(path)

            or the global SQL option "spark.sql.parquet.mergeSchema":

              sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
              sqlContext.read.parquet(path)

            Cheng

            On 9/28/15 3:45 PM, jordan.tho...@accenture.com
            <mailto:jordan.tho...@accenture.com> wrote:

                Dear Michael,

                Thank you very much for your help.

                I should have mentioned in my original email, I did
                try the sequence notation.  It doesn’t seem to have
                the desired effect.  Maybe I should say that each one
                of these files has a different schema.  When I use
                that call, I’m not ending up with a data frame with
                columns from all of the files taken together, but just
                one of them.  I’m tracing through the code trying to
                understand exactly what is happening with the
                Seq[String] call.  Maybe you know?  Is it trying to do
                some kind of schema merging?

                Also, it seems that even if I could get it to work, it
                would require some parsing of the resulting schemas to
                find the invalid files.  I would like to capture these
                errors on read.

                The parquet files  currently average about 60 MB in
                size, with min about 40 MB and max about 500 or so. I
                could coalesce, but they do correspond to logical
                entities and there are a number of use-case specific
                reasons to keep them separate.

                Thanks,

                Jordan

                *From:*Michael Armbrust [mailto:mich...@databricks.com]
                *Sent:* Monday, September 28, 2015 4:02 PM
                *To:* Thomas, Jordan <jordan.tho...@accenture.com>
                <mailto:jordan.tho...@accenture.com>
                *Cc:* user <user@spark.apache.org>
                <mailto:user@spark.apache.org>
                *Subject:* Re: Performance when iterating over many
                parquet files

                Another note: for best performance you are going to
                want your parquet files to be pretty big (100s of
                mb).  You could coalesce them and write them out for
                more efficient repeat querying.

                On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
                <mich...@databricks.com
                <mailto:mich...@databricks.com>> wrote:

                    sqlContext.read.parquet
                    
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
                    takes lists of files.

                    val fileList =
                    sc.textFile("file_list.txt").collect() // this
                    works but using spark is possibly overkill
                    val dataFrame = sqlContext.read.parquet(fileList: _*)

                    On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
                    <jordan.tho...@accenture.com
                    <mailto:jordan.tho...@accenture.com>> wrote:

                        We are working with use cases where we need to
                        do batch processing on a large
                        number (hundreds of thousands) of Parquet
                        files.  The processing is quite
                        similar per file.  There are a many aggregates
                        that are very SQL-friendly
                        (computing averages, maxima, minima,
                        aggregations on single columns with
                        some selection criteria).  There are also some
                        processing that is more
                        advanced time-series processing (continuous
                        wavelet transforms and the
                        like).  This all seems like a good use case
                        for Spark.

                        But I'm having performance problems. Let's
                        take a look at something very
                        simple, which simply checks whether the
                        parquet files are readable.

                        Code that seems natural but doesn't work:

                        import scala.util.{Try, Success, Failure} val
                        parquetFiles =
                        sc.textFile("file_list.txt") val successes =
                        parquetFiles.map(x => (x,
                        
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                        => x._1)

                        My understanding is that this doesn't work
                        because sqlContext can't be used
                        inside of a transformation like "map" (or
                        inside an action).  That it only
                        makes sense in the driver.  Thus, it becomes a
                        null reference in the above
                        code, so all reads fail.

                        Code that works:

                        import scala.util.{Try, Success, Failure} val
                        parquetFiles =
                        sc.textFile("file_list.txt") val successes =
                        parquetFiles.collect().map(x =>
                        (x,
                        
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                        => x._1)


                        This works because the collect() means that
                        everything happens back on the
                        driver.  So the sqlContext object makes sense
                        and everything works fine.

                        But it is slow.  I'm using yarn-client mode on
                        a 6-node cluster with 17
                        executors, 40 GB ram on driver, 19GB on
                        executors.  And it takes about 1
minute to execute for 100 parquet files. Which is too long. Recall we need
                        to do this across hundreds of thousands of files.

                        I realize it is possible to parallelize after
                        the read:

                        import scala.util.{Try, Success, Failure} val
                        parquetFiles =
                        sc.textFile("file_list.txt") val
                        intermediate_successes =
                        parquetFiles.collect().map(x => (x,
                        Try(sqlContext.read.parquet(x))))
                        val dist_successes = sc.parallelize(successes)
                        val successes =
                        dist_successes.filter(_._2.isSuccess).map(x =>
                        x._1)


                        But this does not improve performance
                        substantially.  It seems the
                        bottleneck is that the reads are happening
                        sequentially.

                        Is there a better way to do this?

                        Thanks,
                        Jordan




                        --
                        View this message in context:
                        
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
                        Sent from the Apache Spark User List mailing
                        list archive at Nabble.com.

                        
---------------------------------------------------------------------
                        To unsubscribe, e-mail:
                        user-unsubscr...@spark.apache.org
                        <mailto:user-unsubscr...@spark.apache.org>
                        For additional commands, e-mail:
                        user-h...@spark.apache.org
                        <mailto:user-h...@spark.apache.org>

                
------------------------------------------------------------------------


                This message is for the designated recipient only and
                may contain privileged, proprietary, or otherwise
                confidential information. If you have received it in
                error, please notify the sender immediately and delete
                the original. Any other use of the e-mail by you is
                prohibited. Where allowed by local law, electronic
                communications with Accenture and its affiliates,
                including e-mail and instant messaging (including
                content), may be scanned by our systems for the
                purposes of information security and assessment of
                internal compliance with Accenture policy.
                
______________________________________________________________________________________

                www.accenture.com <http://www.accenture.com>


Reply via email to