Probably parquet-tools and the following shell script helps:

root="/path/to/your/data"

for f in `find $root -type f -name "*.parquet"`; do
  parquet-schema $f 2&>1 /dev/null
  if [ ! -z $? ]; then echo $f; fi
end

This should print out all non-Parquet files under $root. Please refer to this link to see how to build and install parquet-tools https://github.com/Parquet/parquet-mr/issues/321

Cheng

On 9/28/15 4:29 PM, [email protected] wrote:

Sure. FI would just like to remove ones that fail the basic checks done by the Parquet readFooters function, in that their length is wrong or magic number is incorrect, which throws exceptions in the read method.

Errors like:

java.io.IOException: Could not read footer: java.lang.RuntimeException: data.parquet is not a Parquet file (too small)

and

java.io.IOException: Could not read footer: java.lang.RuntimeException: data.parquet is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [54, -4, -10, -102]

Backstory: We had a migration from one cluster to another and thousands of incomplete files were transferred. In addition, they are still trying to handle the kickouts from their write methods (they are converting from a proprietary binary format). A lot of that is captured in the Splunk logs and will improve in the coming weeks as they continue tuning, but on the reading end I want to make sure we’re in sync about what needs to be re-converted and re-transferred.

Thanks,

Jordan

*From:*Cheng Lian [mailto:[email protected]]
*Sent:* Monday, September 28, 2015 6:15 PM
*To:* Thomas, Jordan <[email protected]>; [email protected]
*Cc:* [email protected]
*Subject:* Re: Performance when iterating over many parquet files

Could you please elaborate on what kind of errors are those bad Parquet files causing? In what ways are they miswritten?

Cheng

On 9/28/15 4:03 PM, [email protected] <mailto:[email protected]> wrote:

    Ah, yes, I see that it has been turned off now, that’s why it
    wasn’t working.  Thank you, this is helpful!  The problem now is
    to filter out bad (miswritten) Parquet files, as they are causing
    this operation to fail.

    Any suggestions on detecting them quickly and easily?

    *From:*Cheng Lian [mailto:[email protected]]
    *Sent:* Monday, September 28, 2015 5:56 PM
    *To:* Thomas, Jordan <[email protected]>
    <mailto:[email protected]>; [email protected]
    <mailto:[email protected]>
    *Cc:* [email protected] <mailto:[email protected]>
    *Subject:* Re: Performance when iterating over many parquet files

    Also, you may find more details in the programming guide:

    -
    
http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging
    -
    http://spark.apache.org/docs/latest/sql-programming-guide.html#configuration

    Cheng

    On 9/28/15 3:54 PM, Cheng Lian wrote:

        I guess you're probably using Spark 1.5? Spark SQL does
        support schema merging, but we disabled it by default since
        1.5 because it introduces extra performance costs (it's turned
        on by default in 1.4 and 1.3).

        You may enable schema merging via either the Parquet data
        source specific option "mergeSchema":

          sqlContext.read.option("mergeSchema", "true").parquet(path)

        or the global SQL option "spark.sql.parquet.mergeSchema":

          sqlContext.sql("SET spark.sql.parquet.mergeSchema=true")
          sqlContext.read.parquet(path)

        Cheng

        On 9/28/15 3:45 PM, [email protected]
        <mailto:[email protected]> wrote:

            Dear Michael,

            Thank you very much for your help.

            I should have mentioned in my original email, I did try
            the sequence notation.  It doesn’t seem to have the
            desired effect.  Maybe I should say that each one of these
            files has a different schema.  When I use that call, I’m
            not ending up with a data frame with columns from all of
            the files taken together, but just one of them.  I’m
            tracing through the code trying to understand exactly what
is happening with the Seq[String] call. Maybe you know? Is it trying to do some kind of schema merging?

            Also, it seems that even if I could get it to work, it
            would require some parsing of the resulting schemas to
            find the invalid files.  I would like to capture these
            errors on read.

            The parquet files  currently average about 60 MB in size,
            with min about 40 MB and max about 500 or so.  I could
            coalesce, but they do correspond to logical entities and
            there are a number of use-case specific reasons to keep
            them separate.

            Thanks,

            Jordan

            *From:*Michael Armbrust [mailto:[email protected]]
            *Sent:* Monday, September 28, 2015 4:02 PM
            *To:* Thomas, Jordan <[email protected]>
            <mailto:[email protected]>
            *Cc:* user <[email protected]>
            <mailto:[email protected]>
            *Subject:* Re: Performance when iterating over many
            parquet files

            Another note: for best performance you are going to want
            your parquet files to be pretty big (100s of mb).  You
            could coalesce them and write them out for more efficient
            repeat querying.

            On Mon, Sep 28, 2015 at 2:00 PM, Michael Armbrust
            <[email protected] <mailto:[email protected]>>
            wrote:

                sqlContext.read.parquet
                
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L258>
                takes lists of files.

                val fileList = sc.textFile("file_list.txt").collect()
                // this works but using spark is possibly overkill
                val dataFrame = sqlContext.read.parquet(fileList: _*)

                On Mon, Sep 28, 2015 at 1:35 PM, jwthomas
                <[email protected]
                <mailto:[email protected]>> wrote:

                    We are working with use cases where we need to do
                    batch processing on a large
number (hundreds of thousands) of Parquet files. The processing is quite
                    similar per file.  There are a many aggregates
                    that are very SQL-friendly
                    (computing averages, maxima, minima, aggregations
                    on single columns with
                    some selection criteria).  There are also some
                    processing that is more
                    advanced time-series processing (continuous
                    wavelet transforms and the
                    like).  This all seems like a good use case for Spark.

                    But I'm having performance problems. Let's take a
                    look at something very
                    simple, which simply checks whether the parquet
                    files are readable.

                    Code that seems natural but doesn't work:

                    import scala.util.{Try, Success, Failure} val
                    parquetFiles =
                    sc.textFile("file_list.txt") val successes =
                    parquetFiles.map(x => (x,
                    
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                    => x._1)

                    My understanding is that this doesn't work because
                    sqlContext can't be used
                    inside of a transformation like "map" (or inside
                    an action).  That it only
                    makes sense in the driver.  Thus, it becomes a
                    null reference in the above
                    code, so all reads fail.

                    Code that works:

                    import scala.util.{Try, Success, Failure} val
                    parquetFiles =
                    sc.textFile("file_list.txt") val successes =
                    parquetFiles.collect().map(x =>
                    (x,
                    
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x
                    => x._1)


                    This works because the collect() means that
                    everything happens back on the
                    driver.  So the sqlContext object makes sense and
                    everything works fine.

                    But it is slow.  I'm using yarn-client mode on a
                    6-node cluster with 17
                    executors, 40 GB ram on driver, 19GB on
                    executors.  And it takes about 1
                    minute to execute for 100 parquet files.  Which is
                    too long.  Recall we need
                    to do this across hundreds of thousands of files.

                    I realize it is possible to parallelize after the
                    read:

                    import scala.util.{Try, Success, Failure} val
                    parquetFiles =
                    sc.textFile("file_list.txt") val
                    intermediate_successes =
                    parquetFiles.collect().map(x => (x,
                    Try(sqlContext.read.parquet(x))))
                    val dist_successes = sc.parallelize(successes) val
                    successes =
                    dist_successes.filter(_._2.isSuccess).map(x => x._1)


                    But this does not improve performance
                    substantially.  It seems the
                    bottleneck is that the reads are happening
                    sequentially.

                    Is there a better way to do this?

                    Thanks,
                    Jordan




                    --
                    View this message in context:
                    
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
                    Sent from the Apache Spark User List mailing list
                    archive at Nabble.com.

                    
---------------------------------------------------------------------
                    To unsubscribe, e-mail:
                    [email protected]
                    <mailto:[email protected]>
                    For additional commands, e-mail:
                    [email protected]
                    <mailto:[email protected]>

            
------------------------------------------------------------------------


            This message is for the designated recipient only and may
            contain privileged, proprietary, or otherwise confidential
            information. If you have received it in error, please
            notify the sender immediately and delete the original. Any
            other use of the e-mail by you is prohibited. Where
            allowed by local law, electronic communications with
            Accenture and its affiliates, including e-mail and instant
            messaging (including content), may be scanned by our
            systems for the purposes of information security and
            assessment of internal compliance with Accenture policy.
            
______________________________________________________________________________________

            www.accenture.com <http://www.accenture.com>


Reply via email to